k3s/pkg/agent/containerd/containerd.go

272 lines
6.6 KiB
Go
Raw Normal View History

2019-01-01 08:23:01 +00:00
package containerd
import (
"bufio"
2019-01-01 08:23:01 +00:00
"context"
"fmt"
"io"
2019-03-02 15:56:27 +00:00
"io/ioutil"
2019-01-01 08:23:01 +00:00
"os"
"os/exec"
2019-03-02 15:56:27 +00:00
"path/filepath"
"strings"
2019-01-01 08:23:01 +00:00
"time"
2019-03-07 00:34:05 +00:00
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
"github.com/natefinch/lumberjack"
2019-03-08 22:47:44 +00:00
"github.com/opencontainers/runc/libcontainer/system"
2020-02-24 20:13:59 +00:00
"github.com/pkg/errors"
2019-04-19 21:08:05 +00:00
"github.com/rancher/k3s/pkg/agent/templates"
2019-01-09 16:54:15 +00:00
util2 "github.com/rancher/k3s/pkg/agent/util"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/version"
2019-01-01 08:23:01 +00:00
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
2019-10-07 23:04:58 +00:00
yaml "gopkg.in/yaml.v2"
2019-08-27 04:35:51 +00:00
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
2019-01-01 08:23:01 +00:00
"k8s.io/kubernetes/pkg/kubelet/util"
)
const (
maxMsgSize = 1024 * 1024 * 16
)
2019-01-09 16:54:15 +00:00
func Run(ctx context.Context, cfg *config.Node) error {
2019-01-01 08:23:01 +00:00
args := []string{
"containerd",
2019-01-09 16:54:15 +00:00
"-c", cfg.Containerd.Config,
"-a", cfg.Containerd.Address,
"--state", cfg.Containerd.State,
"--root", cfg.Containerd.Root,
}
2019-04-19 21:08:05 +00:00
if err := setupContainerdConfig(ctx, cfg); err != nil {
2019-01-01 08:23:01 +00:00
return err
}
if os.Getenv("CONTAINERD_LOG_LEVEL") != "" {
2019-03-07 18:20:44 +00:00
args = append(args, "-l", os.Getenv("CONTAINERD_LOG_LEVEL"))
}
stdOut := io.Writer(os.Stdout)
stdErr := io.Writer(os.Stderr)
if cfg.Containerd.Log != "" {
logrus.Infof("Logging containerd to %s", cfg.Containerd.Log)
stdOut = &lumberjack.Logger{
Filename: cfg.Containerd.Log,
MaxSize: 50,
MaxBackups: 3,
MaxAge: 28,
Compress: true,
}
stdErr = stdOut
2019-01-01 08:23:01 +00:00
}
go func() {
2019-01-09 16:54:15 +00:00
logrus.Infof("Running containerd %s", config.ArgString(args[1:]))
2019-01-01 08:23:01 +00:00
cmd := exec.Command(args[0], args[1:]...)
cmd.Stdout = stdOut
cmd.Stderr = stdErr
addDeathSig(cmd)
2019-01-01 08:23:01 +00:00
if err := cmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
}
os.Exit(1)
}()
first := true
2019-01-01 08:23:01 +00:00
for {
conn, err := criConnection(ctx, cfg.Containerd.Address)
2019-01-01 08:23:01 +00:00
if err == nil {
conn.Close()
break
}
if first {
first = false
} else {
logrus.Infof("Waiting for containerd startup: %v", err)
}
2019-01-09 16:54:15 +00:00
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
2019-01-01 08:23:01 +00:00
}
return preloadImages(ctx, cfg)
2019-03-07 00:34:05 +00:00
}
func criConnection(ctx context.Context, address string) (*grpc.ClientConn, error) {
addr, dialer, err := util.GetAddressAndDialer("unix://" + address)
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
return nil, err
}
c := runtimeapi.NewRuntimeServiceClient(conn)
_, err = c.Version(ctx, &runtimeapi.VersionRequest{
Version: "0.1.0",
})
if err != nil {
conn.Close()
return nil, err
}
return conn, nil
}
func preloadImages(ctx context.Context, cfg *config.Node) error {
2019-03-04 20:45:30 +00:00
fileInfo, err := os.Stat(cfg.Images)
2019-03-07 16:47:37 +00:00
if os.IsNotExist(err) {
return nil
} else if err != nil {
2019-03-07 00:34:05 +00:00
logrus.Errorf("Unable to find images in %s: %v", cfg.Images, err)
return nil
}
if !fileInfo.IsDir() {
return nil
}
fileInfos, err := ioutil.ReadDir(cfg.Images)
if err != nil {
logrus.Errorf("Unable to read images in %s: %v", cfg.Images, err)
return nil
}
client, err := containerd.New(cfg.Containerd.Address)
if err != nil {
return err
}
defer client.Close()
criConn, err := criConnection(ctx, cfg.Containerd.Address)
if err != nil {
return err
}
defer criConn.Close()
2019-03-07 00:34:05 +00:00
ctxContainerD := namespaces.WithNamespace(context.Background(), "k8s.io")
for _, fileInfo := range fileInfos {
2019-03-02 15:56:27 +00:00
if fileInfo.IsDir() {
2019-03-07 00:34:05 +00:00
continue
2019-03-02 15:56:27 +00:00
}
2019-03-07 00:34:05 +00:00
filePath := filepath.Join(cfg.Images, fileInfo.Name())
file, err := os.Open(filePath)
if err != nil {
logrus.Errorf("Unable to read %s: %v", filePath, err)
continue
}
if strings.HasSuffix(fileInfo.Name(), ".txt") {
prePullImages(ctx, criConn, file)
file.Close()
continue
}
2019-03-07 00:34:05 +00:00
logrus.Debugf("Import %s", filePath)
_, err = client.Import(ctxContainerD, file)
file.Close()
2019-03-07 00:34:05 +00:00
if err != nil {
logrus.Errorf("Unable to import %s: %v", filePath, err)
}
}
2019-01-01 08:23:01 +00:00
return nil
}
2019-04-19 21:08:05 +00:00
func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader) {
imageClient := runtimeapi.NewImageServiceClient(conn)
scanner := bufio.NewScanner(images)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
resp, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
Image: &runtimeapi.ImageSpec{
Image: line,
},
})
if err == nil && resp.Image != nil {
continue
}
logrus.Infof("Pulling image %s...", line)
_, err = imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
Image: &runtimeapi.ImageSpec{
Image: line,
},
})
if err != nil {
logrus.Errorf("Failed to pull %s: %v", line, err)
}
}
}
2019-04-19 21:08:05 +00:00
func setupContainerdConfig(ctx context.Context, cfg *config.Node) error {
2019-10-07 23:04:58 +00:00
privRegistries, err := getPrivateRegistries(ctx, cfg)
if err != nil {
return err
}
2019-04-19 21:08:05 +00:00
var containerdTemplate string
containerdConfig := templates.ContainerdConfig{
2019-10-07 23:04:58 +00:00
NodeConfig: cfg,
IsRunningInUserNS: system.RunningInUserNS(),
PrivateRegistryConfig: privRegistries,
2019-04-19 21:08:05 +00:00
}
selEnabled, selConfigured, err := selinuxStatus()
2020-02-24 20:13:59 +00:00
if err != nil {
return errors.Wrap(err, "failed to detect selinux")
}
if cfg.DisableSELinux {
containerdConfig.SELinuxEnabled = false
if selEnabled {
logrus.Warn("SELinux is enabled for system but has been disabled for containerd by override")
}
} else {
containerdConfig.SELinuxEnabled = selEnabled
}
if containerdConfig.SELinuxEnabled && !selConfigured {
logrus.Warnf("SELinux is enabled for "+version.Program+" but process is not running in context '%s', "+version.Program+"-selinux policy may need to be applied", SELinuxContextType)
}
2020-02-24 20:13:59 +00:00
2019-04-19 21:08:05 +00:00
containerdTemplateBytes, err := ioutil.ReadFile(cfg.Containerd.Template)
if err == nil {
logrus.Infof("Using containerd template at %s", cfg.Containerd.Template)
containerdTemplate = string(containerdTemplateBytes)
} else if os.IsNotExist(err) {
containerdTemplate = templates.ContainerdConfigTemplate
} else {
return err
}
parsedTemplate, err := templates.ParseTemplateFromConfig(containerdTemplate, containerdConfig)
if err != nil {
return err
}
return util2.WriteFile(cfg.Containerd.Config, parsedTemplate)
}
2019-10-07 23:04:58 +00:00
func getPrivateRegistries(ctx context.Context, cfg *config.Node) (*templates.Registry, error) {
privRegistries := &templates.Registry{}
privRegistryFile, err := ioutil.ReadFile(cfg.AgentConfig.PrivateRegistry)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
logrus.Infof("Using registry config file at %s", cfg.AgentConfig.PrivateRegistry)
2019-11-28 11:26:45 +00:00
if err := yaml.Unmarshal(privRegistryFile, &privRegistries); err != nil {
2019-10-07 23:04:58 +00:00
return nil, err
}
return privRegistries, nil
}