Merge pull request #921 from erikwilson/bump-containerd-shim-runc-v2

Bump containerd shim runc v2
This commit is contained in:
Erik Wilson 2019-10-18 16:14:01 -07:00 committed by GitHub
commit 94941a091e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 157 additions and 90 deletions

View File

@ -15,49 +15,52 @@ type ContainerdConfig struct {
const ContainerdConfigTemplate = `
[plugins.opt]
path = "{{ .NodeConfig.Containerd.Opt }}"
path = "{{ .NodeConfig.Containerd.Opt }}"
[plugins.cri]
stream_server_address = "127.0.0.1"
stream_server_port = "10010"
stream_server_address = "127.0.0.1"
stream_server_port = "10010"
{{- if .IsRunningInUserNS }}
disable_cgroup = true
disable_apparmor = true
restrict_oom_score_adj = true
disable_cgroup = true
disable_apparmor = true
restrict_oom_score_adj = true
{{ end -}}
{{- if .NodeConfig.AgentConfig.PauseImage }}
sandbox_image = "{{ .NodeConfig.AgentConfig.PauseImage }}"
sandbox_image = "{{ .NodeConfig.AgentConfig.PauseImage }}"
{{ end -}}
{{- if not .NodeConfig.NoFlannel }}
[plugins.cri.cni]
bin_dir = "{{ .NodeConfig.AgentConfig.CNIBinDir }}"
conf_dir = "{{ .NodeConfig.AgentConfig.CNIConfDir }}"
[plugins.cri.cni]
bin_dir = "{{ .NodeConfig.AgentConfig.CNIBinDir }}"
conf_dir = "{{ .NodeConfig.AgentConfig.CNIConfDir }}"
{{ end -}}
[plugins.cri.containerd.runtimes.runc]
runtime_type = "io.containerd.runc.v2"
{{ if .PrivateRegistryConfig }}
{{ if .PrivateRegistryConfig.Mirrors }}
[plugins.cri.registry.mirrors]{{end}}
{{range $k, $v := .PrivateRegistryConfig.Mirrors }}
[plugins.cri.registry.mirrors."{{$k}}"]
endpoint = [{{range $i, $j := $v.Endpoints}}{{if $i}}, {{end}}{{printf "%q" .}}{{end}}]
endpoint = [{{range $i, $j := $v.Endpoints}}{{if $i}}, {{end}}{{printf "%q" .}}{{end}}]
{{end}}
{{range $k, $v := .PrivateRegistryConfig.Configs }}
{{ if $v.Auth }}
[plugins.cri.registry.configs."{{$k}}".auth]
{{ if $v.Auth.Username }}username = "{{ $v.Auth.Username }}"{{end}}
{{ if $v.Auth.Password }}password = "{{ $v.Auth.Password }}"{{end}}
{{ if $v.Auth.Auth }}auth = "{{ $v.Auth.Auth }}"{{end}}
{{ if $v.Auth.IdentityToken }}identity_token = "{{ $v.Auth.IdentityToken }}"{{end}}
{{ if $v.Auth.Username }}username = "{{ $v.Auth.Username }}"{{end}}
{{ if $v.Auth.Password }}password = "{{ $v.Auth.Password }}"{{end}}
{{ if $v.Auth.Auth }}auth = "{{ $v.Auth.Auth }}"{{end}}
{{ if $v.Auth.IdentityToken }}identity_token = "{{ $v.Auth.IdentityToken }}"{{end}}
{{end}}
{{ if $v.TLS }}
[plugins.cri.registry.configs."{{$k}}".tls]
{{ if $v.TLS.CAFile }}ca_file = "{{ $v.TLS.CAFile }}"{{end}}
{{ if $v.TLS.CertFile }}cert_file = "{{ $v.TLS.CertFile }}"{{end}}
{{ if $v.TLS.KeyFile }}key_file = "{{ $v.TLS.KeyFile }}"{{end}}
{{ if $v.TLS.CAFile }}ca_file = "{{ $v.TLS.CAFile }}"{{end}}
{{ if $v.TLS.CertFile }}cert_file = "{{ $v.TLS.CertFile }}"{{end}}
{{ if $v.TLS.KeyFile }}key_file = "{{ $v.TLS.KeyFile }}"{{end}}
{{end}}
{{end}}
{{end}}

View File

@ -48,6 +48,7 @@ rm -f \
bin/runc \
bin/containerd-shim \
bin/containerd-shim-runc-v1 \
bin/containerd-shim-runc-v2 \
bin/k3s-server \
bin/kubectl \
bin/crictl \
@ -92,6 +93,6 @@ echo Building containerd-shim
make -C ./vendor/github.com/containerd/containerd bin/containerd-shim
cp -f ./vendor/github.com/containerd/containerd/bin/containerd-shim ./bin/containerd-shim
echo Building containerd-shim-runc-v1
make -C ./vendor/github.com/containerd/containerd bin/containerd-shim-runc-v1
cp -f ./vendor/github.com/containerd/containerd/bin/containerd-shim-runc-v1 ./bin/containerd-shim-runc-v1
echo Building containerd-shim-runc-v2
make -C ./vendor/github.com/containerd/containerd bin/containerd-shim-runc-v2
cp -f ./vendor/github.com/containerd/containerd/bin/containerd-shim-runc-v2 ./bin/containerd-shim-runc-v2

View File

@ -4,7 +4,7 @@ package main
import (
_ "github.com/containerd/containerd/cmd/containerd-shim"
_ "github.com/containerd/containerd/cmd/containerd-shim-runc-v1"
_ "github.com/containerd/containerd/cmd/containerd-shim-runc-v2"
_ "github.com/coreos/go-systemd/activation"
_ "github.com/go-bindata/go-bindata"
_ "github.com/go-bindata/go-bindata/go-bindata"

View File

@ -19,10 +19,10 @@
package main
import (
v1 "github.com/containerd/containerd/runtime/v2/runc/v1"
v2 "github.com/containerd/containerd/runtime/v2/runc/v2"
"github.com/containerd/containerd/runtime/v2/shim"
)
func main() {
shim.Run("io.containerd.runc.v1", v1.New)
shim.Run("io.containerd.runc.v2", v2.New)
}

View File

@ -16,7 +16,7 @@
limitations under the License.
*/
package v1
package v2
import (
"context"
@ -25,6 +25,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
@ -60,6 +61,18 @@ var (
empty = &ptypes.Empty{}
)
// group labels specifies how the shim groups services.
// currently supports a runc.v2 specific .group label and the
// standard k8s pod label. Order matters in this list
var groupLabels = []string{
"io.containerd.runc.v2.group",
"io.kubernetes.cri.sandbox-id",
}
type spec struct {
Annotations map[string]string `json:"annotations,omitempty"`
}
// New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
ep, err := oom.New(publisher)
@ -68,12 +81,13 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func
}
go ep.Run(ctx)
s := &service{
id: id,
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
cancel: shutdown,
id: id,
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
cancel: shutdown,
containers: make(map[string]*runc.Container),
}
go s.processExits()
runcC.Monitor = reaper.Default
@ -96,8 +110,10 @@ type service struct {
ec chan runcC.Exit
ep *oom.Epoller
id string
container *runc.Container
// id only used in cleanup case
id string
containers map[string]*runc.Container
cancel func()
}
@ -122,24 +138,54 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, co
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
cmd.Env = append(os.Environ(), "GOMAXPROCS=4")
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
return cmd, nil
}
func readSpec() (*spec, error) {
f, err := os.Open("config.json")
if err != nil {
return nil, err
}
defer f.Close()
var s spec
if err := json.NewDecoder(f).Decode(&s); err != nil {
return nil, err
}
return &s, nil
}
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress)
if err != nil {
return "", err
}
address, err := shim.SocketAddress(ctx, id)
grouping := id
spec, err := readSpec()
if err != nil {
return "", err
}
for _, group := range groupLabels {
if groupID, ok := spec.Annotations[group]; ok {
grouping = groupID
break
}
}
address, err := shim.SocketAddress(ctx, grouping)
if err != nil {
return "", err
}
socket, err := shim.NewSocket(address)
if err != nil {
if strings.Contains(err.Error(), "address already in use") {
if err := shim.WriteAddress("address", address); err != nil {
return "", err
}
return address, nil
}
return "", err
}
defer socket.Close()
@ -161,9 +207,6 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
}()
// make sure to wait after start
go cmd.Wait()
if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil {
return "", err
}
if err := shim.WriteAddress("address", address); err != nil {
return "", err
}
@ -199,14 +242,16 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
}
func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
path, err := os.Getwd()
cwd, err := os.Getwd()
if err != nil {
return nil, err
}
path := filepath.Join(filepath.Dir(cwd), s.id)
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
runtime, err := runc.ReadRuntime(path)
if err != nil {
return nil, err
@ -236,7 +281,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
return nil, err
}
s.container = container
s.containers[r.ID] = container
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
@ -259,7 +304,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -295,7 +340,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -305,7 +350,11 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
}
// if we deleted our init task, close the platform and send the task delete event
if r.ExecID == "" {
if s.platform != nil {
s.mu.Lock()
delete(s.containers, r.ID)
hasContainers := len(s.containers) > 0
s.mu.Unlock()
if s.platform != nil && !hasContainers {
s.platform.Close()
}
s.send(&eventstypes.TaskDelete{
@ -324,7 +373,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -339,7 +388,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
}
s.send(&eventstypes.TaskExecAdded{
ContainerID: s.container.ID,
ContainerID: container.ID,
ExecID: process.ID(),
})
return empty, nil
@ -347,7 +396,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -359,7 +408,11 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
p, err := s.getProcess(r.ExecID)
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil {
return nil, err
}
@ -383,7 +436,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
sio := p.Stdio()
return &taskAPI.StateResponse{
ID: p.ID(),
Bundle: s.container.Bundle,
Bundle: container.Bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.Stdin,
@ -397,7 +450,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -412,7 +465,7 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -427,7 +480,7 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -439,7 +492,7 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp
// Pids returns all pids inside the container
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -474,7 +527,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -486,7 +539,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -498,7 +551,7 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -510,7 +563,7 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
container, err := s.getContainer()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
@ -529,8 +582,8 @@ func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.Wa
// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
var pid int
if s.container != nil {
pid = s.container.Pid()
if container, err := s.getContainer(r.ID); err == nil {
pid = container.Pid()
}
return &taskAPI.ConnectResponse{
ShimPid: uint32(os.Getpid()),
@ -539,13 +592,23 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
}
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
s.mu.Lock()
// return out if the shim is still servicing containers
if len(s.containers) > 0 {
s.mu.Unlock()
return empty, nil
}
s.cancel()
close(s.events)
return empty, nil
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
cg := s.container.Cgroup()
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
cg := container.Cgroup()
if cg == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
@ -579,27 +642,34 @@ func (s *service) sendL(evt interface{}) {
}
func (s *service) checkProcesses(e runcC.Exit) {
container, err := s.getContainer()
if err != nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
shouldKillAll, err := shouldKillAllOnExit(container.Bundle)
if err != nil {
log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
}
for _, container := range s.containers {
if !container.HasPid(e.Pid) {
continue
}
for _, p := range container.All() {
if p.Pid() == e.Pid {
if shouldKillAll {
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
for _, p := range container.All() {
if p.Pid() != e.Pid {
continue
}
if ip, ok := p.(*process.Init); ok {
shouldKillAll, err := shouldKillAllOnExit(container.Bundle)
if err != nil {
log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
}
// Ensure all children are killed
if shouldKillAll {
if err := ip.KillAll(s.context); err != nil {
logrus.WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
}
p.SetExited(e.Status)
s.sendL(&eventstypes.TaskExit{
ContainerID: container.ID,
@ -610,6 +680,7 @@ func (s *service) checkProcesses(e runcC.Exit) {
})
return
}
return
}
}
@ -623,7 +694,7 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) {
if bundleSpec.Linux != nil {
for _, ns := range bundleSpec.Linux.Namespaces {
if ns.Type == specs.PIDNamespace {
if ns.Type == specs.PIDNamespace && ns.Path == "" {
return false, nil
}
}
@ -633,7 +704,11 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) {
}
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
p, err := s.container.Process("")
container, err := s.getContainer(id)
if err != nil {
return nil, err
}
p, err := container.Process("")
if err != nil {
return nil, errdefs.ToGRPC(err)
}
@ -662,9 +737,9 @@ func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
publisher.Close()
}
func (s *service) getContainer() (*runc.Container, error) {
func (s *service) getContainer(id string) (*runc.Container, error) {
s.mu.Lock()
container := s.container
container := s.containers[id]
s.mu.Unlock()
if container == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
@ -672,18 +747,6 @@ func (s *service) getContainer() (*runc.Container, error) {
return container, nil
}
func (s *service) getProcess(execID string) (process.Process, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
p, err := container.Process(execID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return p, nil
}
// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *service) initPlatform() error {

4
vendor/modules.txt vendored
View File

@ -172,7 +172,7 @@ github.com/containerd/containerd/archive
github.com/containerd/containerd/archive/compression
github.com/containerd/containerd/cio
github.com/containerd/containerd/cmd/containerd-shim
github.com/containerd/containerd/cmd/containerd-shim-runc-v1
github.com/containerd/containerd/cmd/containerd-shim-runc-v2
github.com/containerd/containerd/cmd/containerd/command
github.com/containerd/containerd/cmd/ctr/app
github.com/containerd/containerd/cmd/ctr/commands
@ -251,7 +251,7 @@ github.com/containerd/containerd/runtime/v1/shim/v1
github.com/containerd/containerd/runtime/v2
github.com/containerd/containerd/runtime/v2/runc
github.com/containerd/containerd/runtime/v2/runc/options
github.com/containerd/containerd/runtime/v2/runc/v1
github.com/containerd/containerd/runtime/v2/runc/v2
github.com/containerd/containerd/runtime/v2/shim
github.com/containerd/containerd/runtime/v2/task
github.com/containerd/containerd/services