diff --git a/pkg/agent/containerd/containerd.go b/pkg/agent/containerd/containerd.go index d7ccbbd01a..512f0dde7d 100644 --- a/pkg/agent/containerd/containerd.go +++ b/pkg/agent/containerd/containerd.go @@ -1,6 +1,7 @@ package containerd import ( + "bufio" "context" "fmt" "io" @@ -8,6 +9,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "time" "github.com/containerd/containerd" @@ -73,30 +75,18 @@ func Run(ctx context.Context, cfg *config.Node) error { os.Exit(1) }() + first := true for { - addr, dialer, err := util.GetAddressAndDialer("unix://" + cfg.Containerd.Address) - if err != nil { - time.Sleep(1 * time.Second) - continue - } - - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) - if err != nil { - time.Sleep(1 * time.Second) - continue - } - - c := runtimeapi.NewRuntimeServiceClient(conn) - - _, err = c.Version(ctx, &runtimeapi.VersionRequest{ - Version: "0.1.0", - }) + conn, err := criConnection(ctx, cfg.Containerd.Address) if err == nil { conn.Close() break } - conn.Close() - logrus.Infof("Waiting for containerd startup: %v", err) + if first { + first = false + } else { + logrus.Infof("Waiting for containerd startup: %v", err) + } select { case <-ctx.Done(): return ctx.Err() @@ -104,10 +94,33 @@ func Run(ctx context.Context, cfg *config.Node) error { } } - return preloadImages(cfg) + return preloadImages(ctx, cfg) } -func preloadImages(cfg *config.Node) error { +func criConnection(ctx context.Context, address string) (*grpc.ClientConn, error) { + addr, dialer, err := util.GetAddressAndDialer("unix://" + address) + if err != nil { + return nil, err + } + + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) + if err != nil { + return nil, err + } + + c := runtimeapi.NewRuntimeServiceClient(conn) + _, err = c.Version(ctx, &runtimeapi.VersionRequest{ + Version: "0.1.0", + }) + if err != nil { + conn.Close() + return nil, err + } + + return conn, nil +} + +func preloadImages(ctx context.Context, cfg *config.Node) error { fileInfo, err := os.Stat(cfg.Images) if os.IsNotExist(err) { return nil @@ -132,6 +145,12 @@ func preloadImages(cfg *config.Node) error { } defer client.Close() + criConn, err := criConnection(ctx, cfg.Containerd.Address) + if err != nil { + return err + } + defer criConn.Close() + ctxContainerD := namespaces.WithNamespace(context.Background(), "k8s.io") for _, fileInfo := range fileInfos { @@ -147,8 +166,15 @@ func preloadImages(cfg *config.Node) error { continue } + if strings.HasSuffix(fileInfo.Name(), ".txt") { + prePullImages(ctx, criConn, file) + file.Close() + continue + } + logrus.Debugf("Import %s", filePath) _, err = client.Import(ctxContainerD, file) + file.Close() if err != nil { logrus.Errorf("Unable to import %s: %v", filePath, err) } @@ -156,6 +182,32 @@ func preloadImages(cfg *config.Node) error { return nil } +func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader) { + imageClient := runtimeapi.NewImageServiceClient(conn) + scanner := bufio.NewScanner(images) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + resp, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{ + Image: &runtimeapi.ImageSpec{ + Image: line, + }, + }) + if err == nil && resp.Image != nil { + continue + } + + logrus.Infof("Pulling image %s...", line) + _, err = imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{ + Image: &runtimeapi.ImageSpec{ + Image: line, + }, + }) + if err != nil { + logrus.Errorf("Failed to pull %s: %v", line, err) + } + } +} + func setupContainerdConfig(ctx context.Context, cfg *config.Node) error { privRegistries, err := getPrivateRegistries(ctx, cfg) if err != nil {