k3s/pkg/agent/containerd/containerd.go

166 lines
3.9 KiB
Go
Raw Normal View History

2019-01-01 08:23:01 +00:00
package containerd
import (
"context"
"fmt"
2019-03-02 15:56:27 +00:00
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
"io"
2019-03-02 15:56:27 +00:00
"io/ioutil"
2019-01-01 08:23:01 +00:00
"os"
"os/exec"
2019-03-02 15:56:27 +00:00
"path/filepath"
2019-01-01 08:23:01 +00:00
"strings"
"syscall"
"time"
"github.com/natefinch/lumberjack"
2019-01-09 16:54:15 +00:00
util2 "github.com/rancher/k3s/pkg/agent/util"
"github.com/rancher/k3s/pkg/daemons/config"
2019-01-01 08:23:01 +00:00
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/util"
)
const (
maxMsgSize = 1024 * 1024 * 16
2019-01-09 16:54:15 +00:00
configToml = `
[plugins.opt]
path = "%OPT%"
[plugins.cri]
2019-01-01 08:23:01 +00:00
stream_server_address = "%NODE%"
stream_server_port = "10010"
2019-01-09 16:54:15 +00:00
`
configCNIToml = `
2019-01-01 08:23:01 +00:00
[plugins.cri.cni]
2019-01-09 16:54:15 +00:00
bin_dir = "%CNIBIN%"
conf_dir = "%CNICFG%"
2019-01-01 08:23:01 +00:00
`
)
2019-01-09 16:54:15 +00:00
func Run(ctx context.Context, cfg *config.Node) error {
2019-01-01 08:23:01 +00:00
args := []string{
"containerd",
2019-01-09 16:54:15 +00:00
"-c", cfg.Containerd.Config,
"-a", cfg.Containerd.Address,
"--state", cfg.Containerd.State,
"--root", cfg.Containerd.Root,
}
template := configToml
if !cfg.NoFlannel {
template += configCNIToml
2019-01-01 08:23:01 +00:00
}
2019-01-09 16:54:15 +00:00
template = strings.Replace(template, "%OPT%", cfg.Containerd.Opt, -1)
template = strings.Replace(template, "%CNIBIN%", cfg.AgentConfig.CNIBinDir, -1)
template = strings.Replace(template, "%CNICFG%", cfg.AgentConfig.CNIConfDir, -1)
template = strings.Replace(template, "%NODE%", cfg.AgentConfig.NodeName, -1)
if err := util2.WriteFile(cfg.Containerd.Config, template); err != nil {
2019-01-01 08:23:01 +00:00
return err
}
if os.Getenv("CONTAINERD_LOG_LEVEL") != "" {
args = append(args, "-l", "CONTAINERD_LOG_LEVEL")
}
stdOut := io.Writer(os.Stdout)
stdErr := io.Writer(os.Stderr)
if cfg.Containerd.Log != "" {
logrus.Infof("Logging containerd to %s", cfg.Containerd.Log)
stdOut = &lumberjack.Logger{
Filename: cfg.Containerd.Log,
MaxSize: 50,
MaxBackups: 3,
MaxAge: 28,
Compress: true,
}
stdErr = stdOut
2019-01-01 08:23:01 +00:00
}
go func() {
2019-01-09 16:54:15 +00:00
logrus.Infof("Running containerd %s", config.ArgString(args[1:]))
2019-01-01 08:23:01 +00:00
cmd := exec.Command(args[0], args[1:]...)
cmd.Stdout = stdOut
cmd.Stderr = stdErr
2019-01-01 08:23:01 +00:00
cmd.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGKILL,
}
if err := cmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
}
os.Exit(1)
}()
for {
2019-01-09 16:54:15 +00:00
addr, dailer, err := util.GetAddressAndDialer("unix://" + cfg.Containerd.Address)
2019-01-01 08:23:01 +00:00
if err != nil {
time.Sleep(1 * time.Second)
continue
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
time.Sleep(1 * time.Second)
continue
}
c := runtimeapi.NewRuntimeServiceClient(conn)
_, err = c.Version(ctx, &runtimeapi.VersionRequest{
Version: "0.1.0",
})
if err == nil {
conn.Close()
break
}
conn.Close()
2019-01-09 16:54:15 +00:00
logrus.Infof("Waiting for containerd startup: %v", err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
2019-01-01 08:23:01 +00:00
}
2019-03-04 20:45:30 +00:00
fileInfo, err := os.Stat(cfg.Images)
2019-03-02 15:56:27 +00:00
if err != nil {
2019-03-04 20:45:30 +00:00
logrus.Infof("Cannot find images in %s: %v", cfg.Images, err)
2019-03-02 15:56:27 +00:00
} else {
if fileInfo.IsDir() {
2019-03-04 20:45:30 +00:00
fileInfos, err := ioutil.ReadDir(cfg.Images)
2019-03-02 15:56:27 +00:00
if err != nil {
2019-03-04 20:45:30 +00:00
logrus.Infof("Cannot read images in %s: %v", cfg.Images, err)
2019-03-02 15:56:27 +00:00
}
client, err := containerd.New(cfg.Containerd.Address)
if err != nil {
return err
}
defer client.Close()
ctxContainerD := namespaces.WithNamespace(context.Background(), "k8s.io")
for _, fileInfo := range fileInfos {
if !fileInfo.IsDir() {
2019-03-04 20:45:30 +00:00
filePath := filepath.Join(cfg.Images, fileInfo.Name())
file, err := os.Open(filePath)
2019-03-02 15:56:27 +00:00
if err != nil {
logrus.Errorf("Unable to read %s: %v", filePath, err)
continue
}
logrus.Debugf("Import %s", filePath)
2019-03-04 20:45:30 +00:00
_, err = client.Import(ctxContainerD, file)
2019-03-02 15:56:27 +00:00
if err != nil {
logrus.Errorf("Unable to import %s: %v", filePath, err)
}
}
}
}
}
2019-01-01 08:23:01 +00:00
return nil
}