k3s/pkg/agent/containerd/containerd.go
Brian Downs ba70c41cce
Initial Logging Output Update (#2246)
This attempts to update logging statements to make them consistent
through out the code base. It also adds additional context to messages
where possible, simplifies messages, and updates level where necessary.
2020-09-21 09:56:03 -07:00

296 lines
7.4 KiB
Go

package containerd
import (
"bufio"
"compress/bzip2"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
"github.com/natefinch/lumberjack"
"github.com/opencontainers/runc/libcontainer/system"
"github.com/pierrec/lz4"
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/agent/templates"
util2 "github.com/rancher/k3s/pkg/agent/util"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/version"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
yaml "gopkg.in/yaml.v2"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/util"
)
const (
maxMsgSize = 1024 * 1024 * 16
)
func Run(ctx context.Context, cfg *config.Node) error {
args := []string{
"containerd",
"-c", cfg.Containerd.Config,
"-a", cfg.Containerd.Address,
"--state", cfg.Containerd.State,
"--root", cfg.Containerd.Root,
}
if err := setupContainerdConfig(ctx, cfg); err != nil {
return err
}
if os.Getenv("CONTAINERD_LOG_LEVEL") != "" {
args = append(args, "-l", os.Getenv("CONTAINERD_LOG_LEVEL"))
}
stdOut := io.Writer(os.Stdout)
stdErr := io.Writer(os.Stderr)
if cfg.Containerd.Log != "" {
logrus.Infof("Logging containerd to %s", cfg.Containerd.Log)
stdOut = &lumberjack.Logger{
Filename: cfg.Containerd.Log,
MaxSize: 50,
MaxBackups: 3,
MaxAge: 28,
Compress: true,
}
stdErr = stdOut
}
go func() {
logrus.Infof("Running containerd %s", config.ArgString(args[1:]))
cmd := exec.Command(args[0], args[1:]...)
cmd.Stdout = stdOut
cmd.Stderr = stdErr
cmd.Env = os.Environ()
// elide NOTIFY_SOCKET to prevent spurious notifications to systemd
for i := range cmd.Env {
if strings.HasPrefix(cmd.Env[i], "NOTIFY_SOCKET=") {
cmd.Env = append(cmd.Env[:i], cmd.Env[i+1:]...)
break
}
}
addDeathSig(cmd)
if err := cmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
}
os.Exit(1)
}()
first := true
for {
conn, err := criConnection(ctx, cfg.Containerd.Address)
if err == nil {
conn.Close()
break
}
if first {
first = false
} else {
logrus.Infof("Waiting for containerd startup: %v", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
}
logrus.Info("Containerd is now running")
return preloadImages(ctx, cfg)
}
func criConnection(ctx context.Context, address string) (*grpc.ClientConn, error) {
addr, dialer, err := util.GetAddressAndDialer("unix://" + address)
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
return nil, err
}
c := runtimeapi.NewRuntimeServiceClient(conn)
_, err = c.Version(ctx, &runtimeapi.VersionRequest{
Version: "0.1.0",
})
if err != nil {
conn.Close()
return nil, err
}
return conn, nil
}
func preloadImages(ctx context.Context, cfg *config.Node) error {
fileInfo, err := os.Stat(cfg.Images)
if os.IsNotExist(err) {
return nil
} else if err != nil {
logrus.Errorf("Unable to find images in %s: %v", cfg.Images, err)
return nil
}
if !fileInfo.IsDir() {
return nil
}
fileInfos, err := ioutil.ReadDir(cfg.Images)
if err != nil {
logrus.Errorf("Unable to read images in %s: %v", cfg.Images, err)
return nil
}
client, err := containerd.New(cfg.Containerd.Address)
if err != nil {
return err
}
defer client.Close()
criConn, err := criConnection(ctx, cfg.Containerd.Address)
if err != nil {
return err
}
defer criConn.Close()
ctxContainerD := namespaces.WithNamespace(context.Background(), "k8s.io")
for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
continue
}
filePath := filepath.Join(cfg.Images, fileInfo.Name())
file, err := os.Open(filePath)
if err != nil {
logrus.Errorf("Unable to read %s: %v", filePath, err)
continue
}
if strings.HasSuffix(fileInfo.Name(), ".txt") {
prePullImages(ctx, criConn, file)
file.Close()
continue
}
logrus.Debugf("Import %s", filePath)
var imageReader io.Reader
imageReader = file
if strings.HasSuffix(fileInfo.Name(), ".tar.bz2") {
imageReader = bzip2.NewReader(file)
}
if strings.HasSuffix(fileInfo.Name(), ".tar.lz4") {
imageReader = lz4.NewReader(file)
}
if strings.HasSuffix(fileInfo.Name(), ".tar.gz") {
// WARNING: gzip reader close does not close the underlying image
imageReader, err = gzip.NewReader(file)
if err != nil {
logrus.Errorf("Unable to import %s: %v", filePath, err)
file.Close()
continue
}
}
_, err = client.Import(ctxContainerD, imageReader, containerd.WithAllPlatforms(true))
file.Close()
if err != nil {
logrus.Errorf("Unable to import %s: %v", filePath, err)
}
}
return nil
}
func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader) {
imageClient := runtimeapi.NewImageServiceClient(conn)
scanner := bufio.NewScanner(images)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
resp, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
Image: &runtimeapi.ImageSpec{
Image: line,
},
})
if err == nil && resp.Image != nil {
continue
}
logrus.Infof("Pulling image %s...", line)
_, err = imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
Image: &runtimeapi.ImageSpec{
Image: line,
},
})
if err != nil {
logrus.Errorf("Failed to pull %s: %v", line, err)
}
}
}
func setupContainerdConfig(ctx context.Context, cfg *config.Node) error {
privRegistries, err := getPrivateRegistries(ctx, cfg)
if err != nil {
return err
}
var containerdTemplate string
containerdConfig := templates.ContainerdConfig{
NodeConfig: cfg,
IsRunningInUserNS: system.RunningInUserNS(),
PrivateRegistryConfig: privRegistries,
}
selEnabled, selConfigured, err := selinuxStatus()
if err != nil {
return errors.Wrap(err, "failed to detect selinux")
}
switch {
case !cfg.SELinux && selEnabled:
logrus.Warn("SELinux is enabled on this host, but " + version.Program + " has not been started with --selinux - containerd SELinux support is disabled")
case cfg.SELinux && !selConfigured:
logrus.Warnf("SELinux is enabled for "+version.Program+" but process is not running in context '%s', "+version.Program+"-selinux policy may need to be applied", SELinuxContextType)
}
containerdTemplateBytes, err := ioutil.ReadFile(cfg.Containerd.Template)
if err == nil {
logrus.Infof("Using containerd template at %s", cfg.Containerd.Template)
containerdTemplate = string(containerdTemplateBytes)
} else if os.IsNotExist(err) {
containerdTemplate = templates.ContainerdConfigTemplate
} else {
return err
}
parsedTemplate, err := templates.ParseTemplateFromConfig(containerdTemplate, containerdConfig)
if err != nil {
return err
}
return util2.WriteFile(cfg.Containerd.Config, parsedTemplate)
}
func getPrivateRegistries(ctx context.Context, cfg *config.Node) (*templates.Registry, error) {
privRegistries := &templates.Registry{}
privRegistryFile, err := ioutil.ReadFile(cfg.AgentConfig.PrivateRegistry)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
logrus.Infof("Using registry config file at %s", cfg.AgentConfig.PrivateRegistry)
if err := yaml.Unmarshal(privRegistryFile, &privRegistries); err != nil {
return nil, err
}
return privRegistries, nil
}