-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
containerd.go
298 lines (264 loc) · 8.77 KB
/
containerd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
package containerd
import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/reference/docker"
"github.com/k3s-io/k3s/pkg/agent/cri"
util2 "github.com/k3s-io/k3s/pkg/agent/util"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/version"
"github.com/natefinch/lumberjack"
"github.com/pkg/errors"
"github.com/rancher/wharfie/pkg/tarfile"
"github.com/rancher/wrangler/pkg/merr"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)
// Run configures and starts containerd as a child process. Once it is up, images are preloaded
// or pulled from files found in the agent images directory.
func Run(ctx context.Context, cfg *config.Node) error {
if err := setupContainerdConfig(ctx, cfg); err != nil {
return err
}
args := getContainerdArgs(cfg)
stdOut := io.Writer(os.Stdout)
stdErr := io.Writer(os.Stderr)
if cfg.Containerd.Log != "" {
logrus.Infof("Logging containerd to %s", cfg.Containerd.Log)
fileOut := &lumberjack.Logger{
Filename: cfg.Containerd.Log,
MaxSize: 50,
MaxBackups: 3,
MaxAge: 28,
Compress: true,
}
// If k3s is started with --debug, write logs to both the log file and stdout/stderr,
// even if a log path is set.
if cfg.Containerd.Debug {
stdOut = io.MultiWriter(stdOut, fileOut)
stdErr = io.MultiWriter(stdErr, fileOut)
} else {
stdOut = fileOut
stdErr = fileOut
}
}
go func() {
env := []string{}
cenv := []string{}
for _, e := range os.Environ() {
pair := strings.SplitN(e, "=", 2)
switch {
case pair[0] == "NOTIFY_SOCKET":
// elide NOTIFY_SOCKET to prevent spurious notifications to systemd
case pair[0] == "CONTAINERD_LOG_LEVEL":
// Turn CONTAINERD_LOG_LEVEL variable into log-level flag
args = append(args, "--log-level", pair[1])
case strings.HasPrefix(pair[0], "CONTAINERD_"):
// Strip variables with CONTAINERD_ prefix before passing through
// This allows doing things like setting a proxy for image pulls by setting
// CONTAINERD_https_proxy=http://proxy.example.com:8080
pair[0] = strings.TrimPrefix(pair[0], "CONTAINERD_")
cenv = append(cenv, strings.Join(pair, "="))
default:
env = append(env, strings.Join(pair, "="))
}
}
logrus.Infof("Running containerd %s", config.ArgString(args[1:]))
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
cmd.Stdout = stdOut
cmd.Stderr = stdErr
cmd.Env = append(env, cenv...)
addDeathSig(cmd)
if err := cmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
}
os.Exit(1)
}()
if err := cri.WaitForService(ctx, cfg.Containerd.Address, "containerd"); err != nil {
return err
}
return preloadImages(ctx, cfg)
}
// preloadImages reads the contents of the agent images directory, and attempts to
// import into containerd any files found there. Supported compressed types are decompressed, and
// any .txt files are processed as a list of images that should be pre-pulled from remote registries.
// If configured, imported images are retagged as being pulled from additional registries.
func preloadImages(ctx context.Context, cfg *config.Node) error {
fileInfo, err := os.Stat(cfg.Images)
if os.IsNotExist(err) {
return nil
} else if err != nil {
logrus.Errorf("Unable to find images in %s: %v", cfg.Images, err)
return nil
}
if !fileInfo.IsDir() {
return nil
}
fileInfos, err := os.ReadDir(cfg.Images)
if err != nil {
logrus.Errorf("Unable to read images in %s: %v", cfg.Images, err)
return nil
}
client, err := Client(cfg.Containerd.Address)
if err != nil {
return err
}
defer client.Close()
criConn, err := cri.Connection(ctx, cfg.Containerd.Address)
if err != nil {
return err
}
defer criConn.Close()
// Ensure that our images are imported into the correct namespace
ctx = namespaces.WithNamespace(ctx, constants.K8sContainerdNamespace)
// At startup all leases from k3s are cleared
ls := client.LeasesService()
existingLeases, err := ls.List(ctx)
if err != nil {
return err
}
for _, lease := range existingLeases {
if lease.ID == version.Program {
logrus.Debugf("Deleting existing lease: %v", lease)
ls.Delete(ctx, lease)
}
}
// Any images found on import are given a lease that never expires
lease, err := ls.Create(ctx, leases.WithID(version.Program))
if err != nil {
return err
}
// Ensure that our images are locked by the lease
ctx = leases.WithLease(ctx, lease.ID)
for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
continue
}
start := time.Now()
filePath := filepath.Join(cfg.Images, fileInfo.Name())
if err := preloadFile(ctx, cfg, client, criConn, filePath); err != nil {
logrus.Errorf("Error encountered while importing %s: %v", filePath, err)
continue
}
logrus.Infof("Imported images from %s in %s", filePath, time.Since(start))
}
return nil
}
// preloadFile handles loading images from a single tarball or pre-pull image list.
// This is in its own function so that we can ensure that the various readers are properly closed, as some
// decompressing readers need to be explicitly closed and others do not.
func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, criConn *grpc.ClientConn, filePath string) error {
if util2.HasSuffixI(filePath, ".txt") {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
logrus.Infof("Pulling images from %s", filePath)
return prePullImages(ctx, criConn, file)
}
opener, err := tarfile.GetOpener(filePath)
if err != nil {
return err
}
imageReader, err := opener()
if err != nil {
return err
}
defer imageReader.Close()
logrus.Infof("Importing images from %s", filePath)
images, err := client.Import(ctx, imageReader, containerd.WithAllPlatforms(true))
if err != nil {
return err
}
return retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry)
}
// retagImages retags all listed images as having been pulled from the given remote registries.
// If duplicate images exist, they are overwritten. This is most useful when using a private registry
// for all images, as can be configured by the RKE2/Rancher system-default-registry setting.
func retagImages(ctx context.Context, client *containerd.Client, images []images.Image, registries []string) error {
var errs []error
imageService := client.ImageService()
for _, image := range images {
name, err := parseNamedTagged(image.Name)
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to parse image name"))
continue
}
logrus.Infof("Imported %s", image.Name)
for _, registry := range registries {
image.Name = fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag())
if _, err = imageService.Create(ctx, image); err != nil {
if errdefs.IsAlreadyExists(err) {
if err = imageService.Delete(ctx, image.Name); err != nil {
errs = append(errs, errors.Wrap(err, "failed to delete existing image"))
continue
}
if _, err = imageService.Create(ctx, image); err != nil {
errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image"))
continue
}
} else {
errs = append(errs, errors.Wrap(err, "failed to tag image"))
continue
}
}
logrus.Infof("Tagged %s", image.Name)
}
}
return merr.NewErrors(errs...)
}
// parseNamedTagged parses and normalizes an image name, and converts the resulting reference
// to a type that exposes the tag.
func parseNamedTagged(name string) (docker.NamedTagged, error) {
ref, err := docker.ParseNormalizedNamed(name)
if err != nil {
return nil, err
}
tagged, ok := ref.(docker.NamedTagged)
if !ok {
return nil, fmt.Errorf("can't cast %T to NamedTagged", ref)
}
return tagged, nil
}
// prePullImages asks containerd to pull images in a given list, so that they
// are ready when the containers attempt to start later.
func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader) error {
imageClient := runtimeapi.NewImageServiceClient(conn)
scanner := bufio.NewScanner(images)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
resp, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
Image: &runtimeapi.ImageSpec{
Image: line,
},
})
if err == nil && resp.Image != nil {
continue
}
logrus.Infof("Pulling image %s...", line)
_, err = imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
Image: &runtimeapi.ImageSpec{
Image: line,
},
})
if err != nil {
logrus.Errorf("Failed to pull %s: %v", line, err)
}
}
return nil
}