2019-01-01 08:23:01 +00:00
package containerd
import (
2020-05-05 21:41:21 +00:00
"bufio"
2020-08-28 19:27:01 +00:00
"compress/bzip2"
"compress/gzip"
2019-01-01 08:23:01 +00:00
"context"
"fmt"
2019-02-08 04:12:49 +00:00
"io"
2019-03-02 15:56:27 +00:00
"io/ioutil"
2019-01-01 08:23:01 +00:00
"os"
"os/exec"
2019-03-02 15:56:27 +00:00
"path/filepath"
2020-05-05 21:41:21 +00:00
"strings"
2019-01-01 08:23:01 +00:00
"time"
2019-03-07 00:34:05 +00:00
"github.com/containerd/containerd"
2021-02-12 07:37:58 +00:00
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
2019-03-07 00:34:05 +00:00
"github.com/containerd/containerd/namespaces"
2021-02-12 07:37:58 +00:00
"github.com/containerd/containerd/reference/docker"
"github.com/klauspost/compress/zstd"
2019-02-08 04:12:49 +00:00
"github.com/natefinch/lumberjack"
2019-03-08 22:47:44 +00:00
"github.com/opencontainers/runc/libcontainer/system"
2020-08-28 19:27:01 +00:00
"github.com/pierrec/lz4"
2020-02-24 20:13:59 +00:00
"github.com/pkg/errors"
2019-04-19 21:08:05 +00:00
"github.com/rancher/k3s/pkg/agent/templates"
2019-01-09 16:54:15 +00:00
util2 "github.com/rancher/k3s/pkg/agent/util"
"github.com/rancher/k3s/pkg/daemons/config"
2021-02-16 23:15:16 +00:00
"github.com/rancher/k3s/pkg/untar"
2020-05-05 22:09:04 +00:00
"github.com/rancher/k3s/pkg/version"
2021-02-12 07:37:58 +00:00
"github.com/rancher/wrangler/pkg/merr"
2019-01-01 08:23:01 +00:00
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
2019-10-07 23:04:58 +00:00
yaml "gopkg.in/yaml.v2"
2019-08-27 04:35:51 +00:00
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
2019-01-01 08:23:01 +00:00
"k8s.io/kubernetes/pkg/kubelet/util"
)
const (
maxMsgSize = 1024 * 1024 * 16
)
2021-02-12 07:37:58 +00:00
// Run configures and starts containerd as a child process. Once it is up, images are preloaded
// or pulled from files found in the agent images directory.
2019-01-09 16:54:15 +00:00
func Run ( ctx context . Context , cfg * config . Node ) error {
2019-01-01 08:23:01 +00:00
args := [ ] string {
"containerd" ,
2019-01-09 16:54:15 +00:00
"-c" , cfg . Containerd . Config ,
"-a" , cfg . Containerd . Address ,
"--state" , cfg . Containerd . State ,
"--root" , cfg . Containerd . Root ,
}
2019-04-19 21:08:05 +00:00
if err := setupContainerdConfig ( ctx , cfg ) ; err != nil {
2019-01-01 08:23:01 +00:00
return err
}
2019-02-08 04:12:49 +00:00
if os . Getenv ( "CONTAINERD_LOG_LEVEL" ) != "" {
2019-03-07 18:20:44 +00:00
args = append ( args , "-l" , os . Getenv ( "CONTAINERD_LOG_LEVEL" ) )
2019-02-08 04:12:49 +00:00
}
stdOut := io . Writer ( os . Stdout )
stdErr := io . Writer ( os . Stderr )
if cfg . Containerd . Log != "" {
logrus . Infof ( "Logging containerd to %s" , cfg . Containerd . Log )
stdOut = & lumberjack . Logger {
Filename : cfg . Containerd . Log ,
MaxSize : 50 ,
MaxBackups : 3 ,
MaxAge : 28 ,
Compress : true ,
}
stdErr = stdOut
2019-01-01 08:23:01 +00:00
}
go func ( ) {
2019-01-09 16:54:15 +00:00
logrus . Infof ( "Running containerd %s" , config . ArgString ( args [ 1 : ] ) )
2019-01-01 08:23:01 +00:00
cmd := exec . Command ( args [ 0 ] , args [ 1 : ] ... )
2019-02-08 04:12:49 +00:00
cmd . Stdout = stdOut
cmd . Stderr = stdErr
2020-07-24 21:23:56 +00:00
cmd . Env = os . Environ ( )
// elide NOTIFY_SOCKET to prevent spurious notifications to systemd
for i := range cmd . Env {
if strings . HasPrefix ( cmd . Env [ i ] , "NOTIFY_SOCKET=" ) {
cmd . Env = append ( cmd . Env [ : i ] , cmd . Env [ i + 1 : ] ... )
break
}
}
2020-02-22 18:39:33 +00:00
addDeathSig ( cmd )
2019-01-01 08:23:01 +00:00
if err := cmd . Run ( ) ; err != nil {
fmt . Fprintf ( os . Stderr , "containerd: %s\n" , err )
}
os . Exit ( 1 )
} ( )
2020-05-05 21:41:21 +00:00
first := true
2019-01-01 08:23:01 +00:00
for {
2020-05-05 21:41:21 +00:00
conn , err := criConnection ( ctx , cfg . Containerd . Address )
2019-01-01 08:23:01 +00:00
if err == nil {
conn . Close ( )
break
}
2020-05-05 21:41:21 +00:00
if first {
first = false
} else {
logrus . Infof ( "Waiting for containerd startup: %v" , err )
}
2019-01-09 16:54:15 +00:00
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
case <- time . After ( time . Second ) :
}
2019-01-01 08:23:01 +00:00
}
2020-09-21 16:56:03 +00:00
logrus . Info ( "Containerd is now running" )
2019-01-01 08:23:01 +00:00
2020-05-05 21:41:21 +00:00
return preloadImages ( ctx , cfg )
2019-03-07 00:34:05 +00:00
}
2021-02-12 07:37:58 +00:00
// criConnection connects to a CRI socket at the given path.
2020-05-05 21:41:21 +00:00
func criConnection ( ctx context . Context , address string ) ( * grpc . ClientConn , error ) {
addr , dialer , err := util . GetAddressAndDialer ( "unix://" + address )
if err != nil {
return nil , err
}
conn , err := grpc . Dial ( addr , grpc . WithInsecure ( ) , grpc . WithTimeout ( 3 * time . Second ) , grpc . WithContextDialer ( dialer ) , grpc . WithDefaultCallOptions ( grpc . MaxCallRecvMsgSize ( maxMsgSize ) ) )
if err != nil {
return nil , err
}
c := runtimeapi . NewRuntimeServiceClient ( conn )
_ , err = c . Version ( ctx , & runtimeapi . VersionRequest {
Version : "0.1.0" ,
} )
if err != nil {
conn . Close ( )
return nil , err
}
return conn , nil
}
2021-02-12 07:37:58 +00:00
// preloadImages reads the contents of the agent images directory, and attempts to
// import into containerd any files found there. Supported compressed types are decompressed, and
// any .txt files are processed as a list of images that should be pre-pulled from remote registries.
// If configured, imported images are retagged as being pulled from additional registries.
2020-05-05 21:41:21 +00:00
func preloadImages ( ctx context . Context , cfg * config . Node ) error {
2019-03-04 20:45:30 +00:00
fileInfo , err := os . Stat ( cfg . Images )
2019-03-07 16:47:37 +00:00
if os . IsNotExist ( err ) {
return nil
} else if err != nil {
2019-03-07 00:34:05 +00:00
logrus . Errorf ( "Unable to find images in %s: %v" , cfg . Images , err )
return nil
}
if ! fileInfo . IsDir ( ) {
return nil
}
fileInfos , err := ioutil . ReadDir ( cfg . Images )
if err != nil {
logrus . Errorf ( "Unable to read images in %s: %v" , cfg . Images , err )
return nil
}
client , err := containerd . New ( cfg . Containerd . Address )
if err != nil {
return err
}
defer client . Close ( )
2020-05-05 21:41:21 +00:00
criConn , err := criConnection ( ctx , cfg . Containerd . Address )
if err != nil {
return err
}
defer criConn . Close ( )
2021-02-12 07:37:58 +00:00
// Ensure that nothing else can modify the image store while we're importing,
// and that our images are imported into the k8s.io namespace
ctx , done , err := client . WithLease ( namespaces . WithNamespace ( ctx , "k8s.io" ) )
if err != nil {
return err
}
defer done ( ctx )
2019-03-07 00:34:05 +00:00
for _ , fileInfo := range fileInfos {
2019-03-02 15:56:27 +00:00
if fileInfo . IsDir ( ) {
2019-03-07 00:34:05 +00:00
continue
2019-03-02 15:56:27 +00:00
}
2021-02-12 07:37:58 +00:00
start := time . Now ( )
2019-03-07 00:34:05 +00:00
filePath := filepath . Join ( cfg . Images , fileInfo . Name ( ) )
2021-02-12 07:37:58 +00:00
if err := preloadFile ( ctx , cfg , client , criConn , filePath ) ; err != nil {
logrus . Errorf ( "Error encountered while importing %s: %v" , filePath , err )
2019-03-07 00:34:05 +00:00
continue
}
2021-02-12 07:37:58 +00:00
logrus . Debugf ( "Imported images from %s in %s" , filePath , time . Since ( start ) )
}
return nil
}
2019-03-07 00:34:05 +00:00
2021-02-12 07:37:58 +00:00
// preloadFile handles loading images from a single tarball or pre-pull image list.
// This is in its own function so that we can ensure that the various readers are properly closed, as some
// decompressing readers need to be explicitly closed and others do not.
func preloadFile ( ctx context . Context , cfg * config . Node , client * containerd . Client , criConn * grpc . ClientConn , filePath string ) error {
file , err := os . Open ( filePath )
if err != nil {
return err
}
defer file . Close ( )
2020-05-05 21:41:21 +00:00
2021-02-12 07:37:58 +00:00
var imageReader io . Reader
switch {
case util2 . HasSuffixI ( filePath , ".txt" ) :
return prePullImages ( ctx , criConn , file )
case util2 . HasSuffixI ( filePath , ".tar" ) :
2020-08-28 19:27:01 +00:00
imageReader = file
2021-02-12 07:37:58 +00:00
case util2 . HasSuffixI ( filePath , ".tar.lz4" ) :
imageReader = lz4 . NewReader ( file )
case util2 . HasSuffixI ( filePath , ".tar.bz2" , ".tbz" ) :
imageReader = bzip2 . NewReader ( file )
case util2 . HasSuffixI ( filePath , ".tar.gz" , ".tgz" ) :
zr , err := gzip . NewReader ( file )
if err != nil {
return err
2020-08-28 19:27:01 +00:00
}
2021-02-12 07:37:58 +00:00
defer zr . Close ( )
imageReader = zr
case util2 . HasSuffixI ( filePath , "tar.zst" , ".tzst" ) :
2021-02-16 23:15:16 +00:00
zr , err := zstd . NewReader ( file , zstd . WithDecoderMaxMemory ( untar . MaxDecoderMemory ) )
2021-02-12 07:37:58 +00:00
if err != nil {
return err
2020-08-28 19:27:01 +00:00
}
2021-02-12 07:37:58 +00:00
defer zr . Close ( )
imageReader = zr
default :
return errors . New ( "unhandled file type" )
}
logrus . Infof ( "Importing images from %s" , filePath )
images , err := client . Import ( ctx , imageReader , containerd . WithAllPlatforms ( true ) )
if err != nil {
return err
}
return retagImages ( ctx , client , images , cfg . AgentConfig . AirgapExtraRegistry )
}
// retagImages retags all listed images as having been pulled from the given remote registries.
// If duplicate images exist, they are overwritten. This is most useful when using a private registry
// for all images, as can be configured by the RKE2/Rancher system-default-registry setting.
func retagImages ( ctx context . Context , client * containerd . Client , images [ ] images . Image , registries [ ] string ) error {
var errs [ ] error
imageService := client . ImageService ( )
for _ , image := range images {
name , err := parseNamedTagged ( image . Name )
2019-03-07 00:34:05 +00:00
if err != nil {
2021-02-12 07:37:58 +00:00
errs = append ( errs , errors . Wrap ( err , "failed to parse image name" ) )
continue
}
logrus . Infof ( "Imported %s" , image . Name )
for _ , registry := range registries {
image . Name = fmt . Sprintf ( "%s/%s:%s" , registry , docker . Path ( name ) , name . Tag ( ) )
if _ , err = imageService . Create ( ctx , image ) ; err != nil {
if errdefs . IsAlreadyExists ( err ) {
if err = imageService . Delete ( ctx , image . Name ) ; err != nil {
errs = append ( errs , errors . Wrap ( err , "failed to delete existing image" ) )
continue
}
if _ , err = imageService . Create ( ctx , image ) ; err != nil {
errs = append ( errs , errors . Wrap ( err , "failed to tag after deleting existing image" ) )
continue
}
} else {
errs = append ( errs , errors . Wrap ( err , "failed to tag image" ) )
continue
}
}
logrus . Infof ( "Tagged %s" , image . Name )
2019-03-07 00:34:05 +00:00
}
}
2021-02-12 07:37:58 +00:00
return merr . NewErrors ( errs ... )
2019-01-01 08:23:01 +00:00
}
2019-04-19 21:08:05 +00:00
2021-02-12 07:37:58 +00:00
// parseNamedTagged parses and normalizes an image name, and converts the resulting reference
// to a type that exposes the tag.
func parseNamedTagged ( name string ) ( docker . NamedTagged , error ) {
ref , err := docker . ParseNormalizedNamed ( name )
if err != nil {
return nil , err
}
tagged , ok := ref . ( docker . NamedTagged )
if ! ok {
return nil , fmt . Errorf ( "can't cast %T to NamedTagged" , ref )
}
return tagged , nil
}
// prePullImages asks containerd to pull images in a given list, so that they
// are ready when the containers attempt to start later.
func prePullImages ( ctx context . Context , conn * grpc . ClientConn , images io . Reader ) error {
2020-05-05 21:41:21 +00:00
imageClient := runtimeapi . NewImageServiceClient ( conn )
scanner := bufio . NewScanner ( images )
for scanner . Scan ( ) {
line := strings . TrimSpace ( scanner . Text ( ) )
resp , err := imageClient . ImageStatus ( ctx , & runtimeapi . ImageStatusRequest {
Image : & runtimeapi . ImageSpec {
Image : line ,
} ,
} )
if err == nil && resp . Image != nil {
continue
}
logrus . Infof ( "Pulling image %s..." , line )
_ , err = imageClient . PullImage ( ctx , & runtimeapi . PullImageRequest {
Image : & runtimeapi . ImageSpec {
Image : line ,
} ,
} )
if err != nil {
logrus . Errorf ( "Failed to pull %s: %v" , line , err )
}
}
2021-02-12 07:37:58 +00:00
return nil
2020-05-05 21:41:21 +00:00
}
2021-02-12 07:37:58 +00:00
// setupContainerdConfig generates the containerd.toml, using a template combined with various
// runtime configurations and registry mirror settings provided by the administrator.
2019-04-19 21:08:05 +00:00
func setupContainerdConfig ( ctx context . Context , cfg * config . Node ) error {
2019-10-07 23:04:58 +00:00
privRegistries , err := getPrivateRegistries ( ctx , cfg )
if err != nil {
return err
}
2019-04-19 21:08:05 +00:00
var containerdTemplate string
containerdConfig := templates . ContainerdConfig {
2019-10-07 23:04:58 +00:00
NodeConfig : cfg ,
IsRunningInUserNS : system . RunningInUserNS ( ) ,
PrivateRegistryConfig : privRegistries ,
2019-04-19 21:08:05 +00:00
}
2020-02-28 17:10:55 +00:00
selEnabled , selConfigured , err := selinuxStatus ( )
2020-02-24 20:13:59 +00:00
if err != nil {
return errors . Wrap ( err , "failed to detect selinux" )
}
2020-08-11 23:17:32 +00:00
switch {
case ! cfg . SELinux && selEnabled :
logrus . Warn ( "SELinux is enabled on this host, but " + version . Program + " has not been started with --selinux - containerd SELinux support is disabled" )
case cfg . SELinux && ! selConfigured :
2020-05-05 22:09:04 +00:00
logrus . Warnf ( "SELinux is enabled for " + version . Program + " but process is not running in context '%s', " + version . Program + "-selinux policy may need to be applied" , SELinuxContextType )
2020-02-28 17:10:55 +00:00
}
2020-02-24 20:13:59 +00:00
2019-04-19 21:08:05 +00:00
containerdTemplateBytes , err := ioutil . ReadFile ( cfg . Containerd . Template )
if err == nil {
logrus . Infof ( "Using containerd template at %s" , cfg . Containerd . Template )
containerdTemplate = string ( containerdTemplateBytes )
} else if os . IsNotExist ( err ) {
containerdTemplate = templates . ContainerdConfigTemplate
} else {
return err
}
parsedTemplate , err := templates . ParseTemplateFromConfig ( containerdTemplate , containerdConfig )
if err != nil {
return err
}
return util2 . WriteFile ( cfg . Containerd . Config , parsedTemplate )
}
2019-10-07 23:04:58 +00:00
2021-02-12 07:37:58 +00:00
// getPrivateRegistries loads the registry mirror configuration from registries.yaml
2019-10-07 23:04:58 +00:00
func getPrivateRegistries ( ctx context . Context , cfg * config . Node ) ( * templates . Registry , error ) {
privRegistries := & templates . Registry { }
privRegistryFile , err := ioutil . ReadFile ( cfg . AgentConfig . PrivateRegistry )
if err != nil {
if os . IsNotExist ( err ) {
return nil , nil
}
return nil , err
}
logrus . Infof ( "Using registry config file at %s" , cfg . AgentConfig . PrivateRegistry )
2019-11-28 11:26:45 +00:00
if err := yaml . Unmarshal ( privRegistryFile , & privRegistries ) ; err != nil {
2019-10-07 23:04:58 +00:00
return nil , err
}
return privRegistries , nil
}