Wait for cri-dockerd socket (#6812)

* Wait for cri-dockerd socket
* Consolidate cri utility functions

Signed-off-by: Derek Nola <derek.nola@suse.com>
This commit is contained in:
Derek Nola 2023-01-27 13:16:59 -08:00 committed by GitHub
parent 1c6fde9a52
commit 0d4caf4e24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 119 additions and 87 deletions

View File

@ -6,7 +6,6 @@ package containerd
import (
"context"
"os"
"time"
"github.com/containerd/containerd"
overlayutils "github.com/containerd/containerd/snapshots/overlay/overlayutils"
@ -23,8 +22,6 @@ import (
"github.com/rancher/wharfie/pkg/registries"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/kubelet/util"
)
@ -102,30 +99,6 @@ func setupContainerdConfig(ctx context.Context, cfg *config.Node) error {
return util2.WriteFile(cfg.Containerd.Config, parsedTemplate)
}
// criConnection connects to a CRI socket at the given path.
func CriConnection(ctx context.Context, address string) (*grpc.ClientConn, error) {
addr, dialer, err := util.GetAddressAndDialer(socketPrefix + address)
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
return nil, err
}
c := runtimeapi.NewRuntimeServiceClient(conn)
_, err = c.Version(ctx, &runtimeapi.VersionRequest{
Version: "0.1.0",
})
if err != nil {
conn.Close()
return nil, err
}
return conn, nil
}
func Client(address string) (*containerd.Client, error) {
addr, _, err := util.GetAddressAndDialer(socketPrefix + address)
if err != nil {

View File

@ -6,7 +6,6 @@ package containerd
import (
"context"
"os"
"time"
"github.com/containerd/containerd"
"github.com/k3s-io/k3s/pkg/agent/templates"
@ -16,8 +15,6 @@ import (
"github.com/pkg/errors"
"github.com/rancher/wharfie/pkg/registries"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/kubelet/util"
)
@ -68,30 +65,6 @@ func setupContainerdConfig(ctx context.Context, cfg *config.Node) error {
return util2.WriteFile(cfg.Containerd.Config, parsedTemplate)
}
// criConnection connects to a CRI socket at the given path.
func CriConnection(ctx context.Context, address string) (*grpc.ClientConn, error) {
addr, dialer, err := util.GetAddressAndDialer(address)
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
return nil, err
}
c := runtimeapi.NewRuntimeServiceClient(conn)
_, err = c.Version(ctx, &runtimeapi.VersionRequest{
Version: "0.1.0",
})
if err != nil {
conn.Close()
return nil, err
}
return conn, nil
}
func Client(address string) (*containerd.Client, error) {
addr, _, err := util.GetAddressAndDialer(address)
if err != nil {

View File

@ -18,6 +18,7 @@ import (
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/reference/docker"
"github.com/k3s-io/k3s/pkg/agent/cri"
util2 "github.com/k3s-io/k3s/pkg/agent/util"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/version"
@ -30,10 +31,6 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)
const (
maxMsgSize = 1024 * 1024 * 16
)
// Run configures and starts containerd as a child process. Once it is up, images are preloaded
// or pulled from files found in the agent images directory.
func Run(ctx context.Context, cfg *config.Node) error {
@ -101,39 +98,13 @@ func Run(ctx context.Context, cfg *config.Node) error {
os.Exit(1)
}()
if err := WaitForContainerd(ctx, cfg.Containerd.Address); err != nil {
if err := cri.WaitForService(ctx, cfg.Containerd.Address, "containerd"); err != nil {
return err
}
return preloadImages(ctx, cfg)
}
// WaitForContainerd blocks in a retry loop until the Containerd CRI service
// is functional at the provided socket address. It will return only on success,
// or when the context is cancelled.
func WaitForContainerd(ctx context.Context, address string) error {
first := true
for {
conn, err := CriConnection(ctx, address)
if err == nil {
conn.Close()
break
}
if first {
first = false
} else {
logrus.Infof("Waiting for containerd startup: %v", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
}
logrus.Info("Containerd is now running")
return nil
}
// preloadImages reads the contents of the agent images directory, and attempts to
// import into containerd any files found there. Supported compressed types are decompressed, and
// any .txt files are processed as a list of images that should be pre-pulled from remote registries.
@ -163,7 +134,7 @@ func preloadImages(ctx context.Context, cfg *config.Node) error {
}
defer client.Close()
criConn, err := CriConnection(ctx, cfg.Containerd.Address)
criConn, err := cri.Connection(ctx, cfg.Containerd.Address)
if err != nil {
return err
}

36
pkg/agent/cri/cri.go Normal file
View File

@ -0,0 +1,36 @@
package cri
import (
"context"
"time"
"github.com/sirupsen/logrus"
)
const maxMsgSize = 1024 * 1024 * 16
// WaitForService blocks in a retry loop until the CRI service
// is functional at the provided socket address. It will return only on success,
// or when the context is cancelled.
func WaitForService(ctx context.Context, address string, service string) error {
first := true
for {
conn, err := Connection(ctx, address)
if err == nil {
conn.Close()
break
}
if first {
first = false
} else {
logrus.Infof("Waiting for %s startup: %v", service, err)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
}
logrus.Infof("%s is now running", service)
return nil
}

View File

@ -0,0 +1,39 @@
//go:build linux
// +build linux
package cri
import (
"context"
"time"
"google.golang.org/grpc"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
k8sutil "k8s.io/kubernetes/pkg/kubelet/util"
)
const socketPrefix = "unix://"
// Connection connects to a CRI socket at the given path.
func Connection(ctx context.Context, address string) (*grpc.ClientConn, error) {
addr, dialer, err := k8sutil.GetAddressAndDialer(socketPrefix + address)
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
return nil, err
}
c := runtimeapi.NewRuntimeServiceClient(conn)
_, err = c.Version(ctx, &runtimeapi.VersionRequest{
Version: "0.1.0",
})
if err != nil {
conn.Close()
return nil, err
}
return conn, nil
}

View File

@ -0,0 +1,37 @@
//go:build windows
// +build windows
package cri
import (
"context"
"time"
"google.golang.org/grpc"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/kubelet/util"
)
// Connection connects to a CRI socket at the given path.
func Connection(ctx context.Context, address string) (*grpc.ClientConn, error) {
addr, dialer, err := util.GetAddressAndDialer(address)
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
return nil, err
}
c := runtimeapi.NewRuntimeServiceClient(conn)
_, err = c.Version(ctx, &runtimeapi.VersionRequest{
Version: "0.1.0",
})
if err != nil {
conn.Close()
return nil, err
}
return conn, nil
}

View File

@ -11,9 +11,12 @@ import (
"github.com/Mirantis/cri-dockerd/cmd"
"github.com/Mirantis/cri-dockerd/cmd/version"
"github.com/k3s-io/k3s/pkg/agent/cri"
"github.com/k3s-io/k3s/pkg/cgroups"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/sirupsen/logrus"
utilsnet "k8s.io/utils/net"
)
@ -37,7 +40,7 @@ func Run(ctx context.Context, cfg *config.Node) error {
logrus.Fatalf("cri-dockerd exited: %v", command.ExecuteContext(ctx))
}()
return nil
return cri.WaitForService(ctx, cfg.CRIDockerd.Address, "cri-dockerd")
}
func getDockerCRIArgs(cfg *config.Node) []string {