2019-01-01 08:23:01 +00:00
package containerd
import (
2020-05-05 21:41:21 +00:00
"bufio"
2020-08-28 19:27:01 +00:00
"compress/bzip2"
"compress/gzip"
2019-01-01 08:23:01 +00:00
"context"
"fmt"
2019-02-08 04:12:49 +00:00
"io"
2019-03-02 15:56:27 +00:00
"io/ioutil"
2019-01-01 08:23:01 +00:00
"os"
"os/exec"
2019-03-02 15:56:27 +00:00
"path/filepath"
2020-05-05 21:41:21 +00:00
"strings"
2019-01-01 08:23:01 +00:00
"time"
2019-03-07 00:34:05 +00:00
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
2019-02-08 04:12:49 +00:00
"github.com/natefinch/lumberjack"
2019-03-08 22:47:44 +00:00
"github.com/opencontainers/runc/libcontainer/system"
2020-08-28 19:27:01 +00:00
"github.com/pierrec/lz4"
2020-02-24 20:13:59 +00:00
"github.com/pkg/errors"
2019-04-19 21:08:05 +00:00
"github.com/rancher/k3s/pkg/agent/templates"
2019-01-09 16:54:15 +00:00
util2 "github.com/rancher/k3s/pkg/agent/util"
"github.com/rancher/k3s/pkg/daemons/config"
2020-05-05 22:09:04 +00:00
"github.com/rancher/k3s/pkg/version"
2019-01-01 08:23:01 +00:00
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
2019-10-07 23:04:58 +00:00
yaml "gopkg.in/yaml.v2"
2019-08-27 04:35:51 +00:00
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
2019-01-01 08:23:01 +00:00
"k8s.io/kubernetes/pkg/kubelet/util"
)
const (
maxMsgSize = 1024 * 1024 * 16
)
2019-01-09 16:54:15 +00:00
func Run ( ctx context . Context , cfg * config . Node ) error {
2019-01-01 08:23:01 +00:00
args := [ ] string {
"containerd" ,
2019-01-09 16:54:15 +00:00
"-c" , cfg . Containerd . Config ,
"-a" , cfg . Containerd . Address ,
"--state" , cfg . Containerd . State ,
"--root" , cfg . Containerd . Root ,
}
2019-04-19 21:08:05 +00:00
if err := setupContainerdConfig ( ctx , cfg ) ; err != nil {
2019-01-01 08:23:01 +00:00
return err
}
2019-02-08 04:12:49 +00:00
if os . Getenv ( "CONTAINERD_LOG_LEVEL" ) != "" {
2019-03-07 18:20:44 +00:00
args = append ( args , "-l" , os . Getenv ( "CONTAINERD_LOG_LEVEL" ) )
2019-02-08 04:12:49 +00:00
}
stdOut := io . Writer ( os . Stdout )
stdErr := io . Writer ( os . Stderr )
if cfg . Containerd . Log != "" {
logrus . Infof ( "Logging containerd to %s" , cfg . Containerd . Log )
stdOut = & lumberjack . Logger {
Filename : cfg . Containerd . Log ,
MaxSize : 50 ,
MaxBackups : 3 ,
MaxAge : 28 ,
Compress : true ,
}
stdErr = stdOut
2019-01-01 08:23:01 +00:00
}
go func ( ) {
2019-01-09 16:54:15 +00:00
logrus . Infof ( "Running containerd %s" , config . ArgString ( args [ 1 : ] ) )
2019-01-01 08:23:01 +00:00
cmd := exec . Command ( args [ 0 ] , args [ 1 : ] ... )
2019-02-08 04:12:49 +00:00
cmd . Stdout = stdOut
cmd . Stderr = stdErr
2020-07-24 21:23:56 +00:00
cmd . Env = os . Environ ( )
// elide NOTIFY_SOCKET to prevent spurious notifications to systemd
for i := range cmd . Env {
if strings . HasPrefix ( cmd . Env [ i ] , "NOTIFY_SOCKET=" ) {
cmd . Env = append ( cmd . Env [ : i ] , cmd . Env [ i + 1 : ] ... )
break
}
}
2020-02-22 18:39:33 +00:00
addDeathSig ( cmd )
2019-01-01 08:23:01 +00:00
if err := cmd . Run ( ) ; err != nil {
fmt . Fprintf ( os . Stderr , "containerd: %s\n" , err )
}
os . Exit ( 1 )
} ( )
2020-05-05 21:41:21 +00:00
first := true
2019-01-01 08:23:01 +00:00
for {
2020-05-05 21:41:21 +00:00
conn , err := criConnection ( ctx , cfg . Containerd . Address )
2019-01-01 08:23:01 +00:00
if err == nil {
conn . Close ( )
break
}
2020-05-05 21:41:21 +00:00
if first {
first = false
} else {
logrus . Infof ( "Waiting for containerd startup: %v" , err )
}
2019-01-09 16:54:15 +00:00
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
case <- time . After ( time . Second ) :
}
2019-01-01 08:23:01 +00:00
}
2020-05-05 21:41:21 +00:00
return preloadImages ( ctx , cfg )
2019-03-07 00:34:05 +00:00
}
2020-05-05 21:41:21 +00:00
func criConnection ( ctx context . Context , address string ) ( * grpc . ClientConn , error ) {
addr , dialer , err := util . GetAddressAndDialer ( "unix://" + address )
if err != nil {
return nil , err
}
conn , err := grpc . Dial ( addr , grpc . WithInsecure ( ) , grpc . WithTimeout ( 3 * time . Second ) , grpc . WithContextDialer ( dialer ) , grpc . WithDefaultCallOptions ( grpc . MaxCallRecvMsgSize ( maxMsgSize ) ) )
if err != nil {
return nil , err
}
c := runtimeapi . NewRuntimeServiceClient ( conn )
_ , err = c . Version ( ctx , & runtimeapi . VersionRequest {
Version : "0.1.0" ,
} )
if err != nil {
conn . Close ( )
return nil , err
}
return conn , nil
}
func preloadImages ( ctx context . Context , cfg * config . Node ) error {
2019-03-04 20:45:30 +00:00
fileInfo , err := os . Stat ( cfg . Images )
2019-03-07 16:47:37 +00:00
if os . IsNotExist ( err ) {
return nil
} else if err != nil {
2019-03-07 00:34:05 +00:00
logrus . Errorf ( "Unable to find images in %s: %v" , cfg . Images , err )
return nil
}
if ! fileInfo . IsDir ( ) {
return nil
}
fileInfos , err := ioutil . ReadDir ( cfg . Images )
if err != nil {
logrus . Errorf ( "Unable to read images in %s: %v" , cfg . Images , err )
return nil
}
client , err := containerd . New ( cfg . Containerd . Address )
if err != nil {
return err
}
defer client . Close ( )
2020-05-05 21:41:21 +00:00
criConn , err := criConnection ( ctx , cfg . Containerd . Address )
if err != nil {
return err
}
defer criConn . Close ( )
2019-03-07 00:34:05 +00:00
ctxContainerD := namespaces . WithNamespace ( context . Background ( ) , "k8s.io" )
for _ , fileInfo := range fileInfos {
2019-03-02 15:56:27 +00:00
if fileInfo . IsDir ( ) {
2019-03-07 00:34:05 +00:00
continue
2019-03-02 15:56:27 +00:00
}
2019-03-07 00:34:05 +00:00
filePath := filepath . Join ( cfg . Images , fileInfo . Name ( ) )
file , err := os . Open ( filePath )
if err != nil {
logrus . Errorf ( "Unable to read %s: %v" , filePath , err )
continue
}
2020-05-05 21:41:21 +00:00
if strings . HasSuffix ( fileInfo . Name ( ) , ".txt" ) {
prePullImages ( ctx , criConn , file )
file . Close ( )
continue
}
2019-03-07 00:34:05 +00:00
logrus . Debugf ( "Import %s" , filePath )
2020-08-28 19:27:01 +00:00
var imageReader io . Reader
imageReader = file
if strings . HasSuffix ( fileInfo . Name ( ) , ".tar.bz2" ) {
imageReader = bzip2 . NewReader ( file )
}
if strings . HasSuffix ( fileInfo . Name ( ) , ".tar.lz4" ) {
imageReader = lz4 . NewReader ( file )
}
if strings . HasSuffix ( fileInfo . Name ( ) , ".tar.gz" ) {
// WARNING: gzip reader close does not close the underlying image
imageReader , err = gzip . NewReader ( file )
if err != nil {
logrus . Errorf ( "Unable to import %s: %v" , filePath , err )
file . Close ( )
continue
}
}
2020-09-03 00:09:16 +00:00
_ , err = client . Import ( ctxContainerD , imageReader , containerd . WithAllPlatforms ( true ) )
2020-05-05 21:41:21 +00:00
file . Close ( )
2019-03-07 00:34:05 +00:00
if err != nil {
logrus . Errorf ( "Unable to import %s: %v" , filePath , err )
}
}
2019-01-01 08:23:01 +00:00
return nil
}
2019-04-19 21:08:05 +00:00
2020-05-05 21:41:21 +00:00
func prePullImages ( ctx context . Context , conn * grpc . ClientConn , images io . Reader ) {
imageClient := runtimeapi . NewImageServiceClient ( conn )
scanner := bufio . NewScanner ( images )
for scanner . Scan ( ) {
line := strings . TrimSpace ( scanner . Text ( ) )
resp , err := imageClient . ImageStatus ( ctx , & runtimeapi . ImageStatusRequest {
Image : & runtimeapi . ImageSpec {
Image : line ,
} ,
} )
if err == nil && resp . Image != nil {
continue
}
logrus . Infof ( "Pulling image %s..." , line )
_ , err = imageClient . PullImage ( ctx , & runtimeapi . PullImageRequest {
Image : & runtimeapi . ImageSpec {
Image : line ,
} ,
} )
if err != nil {
logrus . Errorf ( "Failed to pull %s: %v" , line , err )
}
}
}
2019-04-19 21:08:05 +00:00
func setupContainerdConfig ( ctx context . Context , cfg * config . Node ) error {
2019-10-07 23:04:58 +00:00
privRegistries , err := getPrivateRegistries ( ctx , cfg )
if err != nil {
return err
}
2019-04-19 21:08:05 +00:00
var containerdTemplate string
containerdConfig := templates . ContainerdConfig {
2019-10-07 23:04:58 +00:00
NodeConfig : cfg ,
IsRunningInUserNS : system . RunningInUserNS ( ) ,
PrivateRegistryConfig : privRegistries ,
2019-04-19 21:08:05 +00:00
}
2020-02-28 17:10:55 +00:00
selEnabled , selConfigured , err := selinuxStatus ( )
2020-02-24 20:13:59 +00:00
if err != nil {
return errors . Wrap ( err , "failed to detect selinux" )
}
2020-08-11 23:17:32 +00:00
switch {
case ! cfg . SELinux && selEnabled :
logrus . Warn ( "SELinux is enabled on this host, but " + version . Program + " has not been started with --selinux - containerd SELinux support is disabled" )
case cfg . SELinux && ! selConfigured :
2020-05-05 22:09:04 +00:00
logrus . Warnf ( "SELinux is enabled for " + version . Program + " but process is not running in context '%s', " + version . Program + "-selinux policy may need to be applied" , SELinuxContextType )
2020-02-28 17:10:55 +00:00
}
2020-02-24 20:13:59 +00:00
2019-04-19 21:08:05 +00:00
containerdTemplateBytes , err := ioutil . ReadFile ( cfg . Containerd . Template )
if err == nil {
logrus . Infof ( "Using containerd template at %s" , cfg . Containerd . Template )
containerdTemplate = string ( containerdTemplateBytes )
} else if os . IsNotExist ( err ) {
containerdTemplate = templates . ContainerdConfigTemplate
} else {
return err
}
parsedTemplate , err := templates . ParseTemplateFromConfig ( containerdTemplate , containerdConfig )
if err != nil {
return err
}
return util2 . WriteFile ( cfg . Containerd . Config , parsedTemplate )
}
2019-10-07 23:04:58 +00:00
func getPrivateRegistries ( ctx context . Context , cfg * config . Node ) ( * templates . Registry , error ) {
privRegistries := & templates . Registry { }
privRegistryFile , err := ioutil . ReadFile ( cfg . AgentConfig . PrivateRegistry )
if err != nil {
if os . IsNotExist ( err ) {
return nil , nil
}
return nil , err
}
logrus . Infof ( "Using registry config file at %s" , cfg . AgentConfig . PrivateRegistry )
2019-11-28 11:26:45 +00:00
if err := yaml . Unmarshal ( privRegistryFile , & privRegistries ) ; err != nil {
2019-10-07 23:04:58 +00:00
return nil , err
}
return privRegistries , nil
}