Pin images instead of locking layers with lease

Layer leases never did what we wanted anyways, and this is the new approved interface for ensuring that images do not get GCd

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2023-12-08 19:51:28 +00:00 committed by Brad Davidson
parent df5e983fc8
commit 5c99bdd9bd
1 changed files with 146 additions and 57 deletions

View File

@ -14,9 +14,9 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/pkg/cri/labels"
"github.com/containerd/containerd/reference/docker"
"github.com/k3s-io/k3s/pkg/agent/cri"
util2 "github.com/k3s-io/k3s/pkg/agent/util"
@ -27,10 +27,17 @@ import (
"github.com/rancher/wharfie/pkg/tarfile"
"github.com/rancher/wrangler/pkg/merr"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)
var (
// In addition to using the CRI pinned label, we add our own label to indicate that
// the image was pinned by the import process, so that we can clear the pin on subsequent startups.
// ref: https://github.com/containerd/containerd/blob/release/1.7/pkg/cri/labels/labels.go
k3sPinnedImageLabelKey = "io.cattle." + version.Program + ".pinned"
k3sPinnedImageLabelValue = "pinned"
)
// Run configures and starts containerd as a child process. Once it is up, images are preloaded
// or pulled from files found in the agent images directory.
func Run(ctx context.Context, cfg *config.Node) error {
@ -134,38 +141,29 @@ func preloadImages(ctx context.Context, cfg *config.Node) error {
}
defer client.Close()
// Image pulls must be done using the CRI client, not the containerd client.
// Repository mirrors and rewrites are handled by the CRI service; if you pull directly
// using the containerd image service it will ignore the configured settings.
criConn, err := cri.Connection(ctx, cfg.Containerd.Address)
if err != nil {
return err
}
defer criConn.Close()
imageClient := runtimeapi.NewImageServiceClient(criConn)
// Ensure that our images are imported into the correct namespace
ctx = namespaces.WithNamespace(ctx, constants.K8sContainerdNamespace)
// At startup all leases from k3s are cleared
ls := client.LeasesService()
existingLeases, err := ls.List(ctx)
if err != nil {
return err
// At startup all leases from k3s are cleared; we no longer use leases to lock content
if err := clearLeases(ctx, client); err != nil {
return errors.Wrap(err, "failed to clear leases")
}
for _, lease := range existingLeases {
if lease.ID == version.Program {
logrus.Debugf("Deleting existing lease: %v", lease)
ls.Delete(ctx, lease)
}
// Clear the pinned labels on all images previously pinned by k3s
if err := clearLabels(ctx, client); err != nil {
return errors.Wrap(err, "failed to clear pinned labels")
}
// Any images found on import are given a lease that never expires
lease, err := ls.Create(ctx, leases.WithID(version.Program))
if err != nil {
return err
}
// Ensure that our images are locked by the lease
ctx = leases.WithLease(ctx, lease.ID)
for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
continue
@ -174,7 +172,7 @@ func preloadImages(ctx context.Context, cfg *config.Node) error {
start := time.Now()
filePath := filepath.Join(cfg.Images, fileInfo.Name())
if err := preloadFile(ctx, cfg, client, criConn, filePath); err != nil {
if err := preloadFile(ctx, cfg, client, imageClient, filePath); err != nil {
logrus.Errorf("Error encountered while importing %s: %v", filePath, err)
continue
}
@ -186,7 +184,8 @@ func preloadImages(ctx context.Context, cfg *config.Node) error {
// preloadFile handles loading images from a single tarball or pre-pull image list.
// This is in its own function so that we can ensure that the various readers are properly closed, as some
// decompressing readers need to be explicitly closed and others do not.
func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, criConn *grpc.ClientConn, filePath string) error {
func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, filePath string) error {
var images []images.Image
if util2.HasSuffixI(filePath, ".txt") {
file, err := os.Open(filePath)
if err != nil {
@ -194,28 +193,102 @@ func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Clien
}
defer file.Close()
logrus.Infof("Pulling images from %s", filePath)
return prePullImages(ctx, criConn, file)
images, err = prePullImages(ctx, client, imageClient, file)
if err != nil {
return errors.Wrap(err, "failed to pull images from "+filePath)
}
} else {
opener, err := tarfile.GetOpener(filePath)
if err != nil {
return err
}
imageReader, err := opener()
if err != nil {
return err
}
defer imageReader.Close()
logrus.Infof("Importing images from %s", filePath)
images, err = client.Import(ctx, imageReader, containerd.WithAllPlatforms(true))
if err != nil {
return errors.Wrap(err, "failed to import images from "+filePath)
}
}
opener, err := tarfile.GetOpener(filePath)
if err := labelImages(ctx, client, images); err != nil {
return errors.Wrap(err, "failed to add pinned label to images")
}
if err := retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry); err != nil {
return errors.Wrap(err, "failed to retag images")
}
for _, image := range images {
logrus.Infof("Imported %s", image.Name)
}
return nil
}
// clearLeases deletes any leases left by previous versions of k3s.
// We no longer use leases to lock content; they only locked the
// blobs, not the actual images.
func clearLeases(ctx context.Context, client *containerd.Client) error {
ls := client.LeasesService()
existingLeases, err := ls.List(ctx)
if err != nil {
return err
}
for _, lease := range existingLeases {
if lease.ID == version.Program {
logrus.Debugf("Deleting existing lease: %v", lease)
ls.Delete(ctx, lease)
}
}
return nil
}
imageReader, err := opener()
// clearLabels removes the pinned labels on all images in the image store that were previously pinned by k3s
func clearLabels(ctx context.Context, client *containerd.Client) error {
var errs []error
imageService := client.ImageService()
images, err := imageService.List(ctx, fmt.Sprintf("labels.%q==%s", k3sPinnedImageLabelKey, k3sPinnedImageLabelValue))
if err != nil {
return err
}
defer imageReader.Close()
logrus.Infof("Importing images from %s", filePath)
images, err := client.Import(ctx, imageReader, containerd.WithAllPlatforms(true))
if err != nil {
return err
for _, image := range images {
delete(image.Labels, k3sPinnedImageLabelKey)
delete(image.Labels, labels.PinnedImageLabelKey)
if _, err := imageService.Update(ctx, image, "labels"); err != nil {
errs = append(errs, errors.Wrap(err, "failed to delete labels from image "+image.Name))
}
}
return merr.NewErrors(errs...)
}
return retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry)
// labelImages adds labels to the listed images, indicating that they
// are pinned by k3s and should not be pruned.
func labelImages(ctx context.Context, client *containerd.Client, images []images.Image) error {
var errs []error
imageService := client.ImageService()
for i, image := range images {
if image.Labels[k3sPinnedImageLabelKey] == k3sPinnedImageLabelValue &&
image.Labels[labels.PinnedImageLabelKey] == labels.PinnedImageLabelValue {
continue
}
if image.Labels == nil {
image.Labels = map[string]string{}
}
image.Labels[k3sPinnedImageLabelKey] = k3sPinnedImageLabelValue
image.Labels[labels.PinnedImageLabelKey] = labels.PinnedImageLabelValue
updatedImage, err := imageService.Update(ctx, image, "labels")
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to add labels to image "+image.Name))
} else {
images[i] = updatedImage
}
}
return merr.NewErrors(errs...)
}
// retagImages retags all listed images as having been pulled from the given remote registries.
@ -227,24 +300,27 @@ func retagImages(ctx context.Context, client *containerd.Client, images []images
for _, image := range images {
name, err := parseNamedTagged(image.Name)
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to parse image name"))
errs = append(errs, errors.Wrap(err, "failed to parse tags for image "+image.Name))
continue
}
logrus.Infof("Imported %s", image.Name)
for _, registry := range registries {
image.Name = fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag())
newName := fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag())
if newName == image.Name {
continue
}
image.Name = newName
if _, err = imageService.Create(ctx, image); err != nil {
if errdefs.IsAlreadyExists(err) {
if err = imageService.Delete(ctx, image.Name); err != nil {
errs = append(errs, errors.Wrap(err, "failed to delete existing image"))
errs = append(errs, errors.Wrap(err, "failed to delete existing image "+image.Name))
continue
}
if _, err = imageService.Create(ctx, image); err != nil {
errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image"))
errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image "+image.Name))
continue
}
} else {
errs = append(errs, errors.Wrap(err, "failed to tag image"))
errs = append(errs, errors.Wrap(err, "failed to tag image "+image.Name))
continue
}
}
@ -269,30 +345,43 @@ func parseNamedTagged(name string) (docker.NamedTagged, error) {
}
// prePullImages asks containerd to pull images in a given list, so that they
// are ready when the containers attempt to start later.
func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader) error {
imageClient := runtimeapi.NewImageServiceClient(conn)
scanner := bufio.NewScanner(images)
// are ready when the containers attempt to start later. If the image already exists,
// or is successfully pulled, information about the image is retrieved from the image store.
// NOTE: Pulls MUST be done via CRI API, not containerd API, in order to use mirrors and rewrites.
func prePullImages(ctx context.Context, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, imageList io.Reader) ([]images.Image, error) {
errs := []error{}
images := []images.Image{}
imageService := client.ImageService()
scanner := bufio.NewScanner(imageList)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
resp, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
name := strings.TrimSpace(scanner.Text())
if _, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
Image: &runtimeapi.ImageSpec{
Image: line,
Image: name,
},
})
if err == nil && resp.Image != nil {
}); err == nil {
logrus.Infof("Image %s has already been pulled", name)
if image, err := imageService.Get(ctx, name); err != nil {
errs = append(errs, err)
} else {
images = append(images, image)
}
continue
}
logrus.Infof("Pulling image %s...", line)
_, err = imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
logrus.Infof("Pulling image %s", name)
if _, err := imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
Image: &runtimeapi.ImageSpec{
Image: line,
Image: name,
},
})
if err != nil {
logrus.Errorf("Failed to pull %s: %v", line, err)
}); err != nil {
errs = append(errs, err)
} else {
if image, err := imageService.Get(ctx, name); err != nil {
errs = append(errs, err)
} else {
images = append(images, image)
}
}
}
return nil
return images, merr.NewErrors(errs...)
}