diff --git a/pkg/daemons/agent/agent.go b/pkg/daemons/agent/agent.go index 2e7811ee31..143e3d2c8c 100644 --- a/pkg/daemons/agent/agent.go +++ b/pkg/daemons/agent/agent.go @@ -2,7 +2,6 @@ package agent import ( "bufio" - "context" "math/rand" "os" "path/filepath" @@ -11,11 +10,10 @@ import ( "github.com/opencontainers/runc/libcontainer/system" "github.com/rancher/k3s/pkg/daemons/config" + "github.com/rancher/k3s/pkg/daemons/executor" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/net" "k8s.io/component-base/logs" - proxy "k8s.io/kubernetes/cmd/kube-proxy/app" - kubelet "k8s.io/kubernetes/cmd/kubelet/app" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" _ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration @@ -28,7 +26,9 @@ func Agent(config *config.Agent) error { logs.InitLogs() defer logs.FlushLogs() - startKubelet(config) + if err := startKubelet(config); err != nil { + return err + } if !config.DisableKubeProxy { return startKubeProxy(config) @@ -49,18 +49,11 @@ func startKubeProxy(cfg *config.Agent) error { } args := config.GetArgsList(argsMap, cfg.ExtraKubeProxyArgs) - command := proxy.NewProxyCommand() - command.SetArgs(args) - - go func() { - logrus.Infof("Running kube-proxy %s", config.ArgString(args)) - logrus.Fatalf("kube-proxy exited: %v", command.Execute()) - }() - - return nil + logrus.Infof("Running kube-proxy %s", config.ArgString(args)) + return executor.KubeProxy(args) } -func startKubelet(cfg *config.Agent) { +func startKubelet(cfg *config.Agent) error { argsMap := map[string]string{ "healthz-bind-address": "127.0.0.1", "read-only-port": "0", @@ -163,13 +156,9 @@ func startKubelet(cfg *config.Agent) { } args := config.GetArgsList(argsMap, cfg.ExtraKubeletArgs) - command := kubelet.NewKubeletCommand(context.Background().Done()) - command.SetArgs(args) + logrus.Infof("Running kubelet %s", config.ArgString(args)) - go func() { - logrus.Infof("Running kubelet %s", config.ArgString(args)) - logrus.Fatalf("kubelet exited: %v", command.Execute()) - }() + return executor.Kubelet(args) } func addFeatureGate(current, new string) string { diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index acfb110497..bd1b743f88 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -19,33 +19,31 @@ import ( "text/template" "time" - // registering k3s cloud provider - _ "github.com/rancher/k3s/pkg/cloudprovider" - "github.com/pkg/errors" certutil "github.com/rancher/dynamiclistener/cert" "github.com/rancher/k3s/pkg/clientaccess" "github.com/rancher/k3s/pkg/cluster" "github.com/rancher/k3s/pkg/daemons/config" + "github.com/rancher/k3s/pkg/daemons/executor" "github.com/rancher/k3s/pkg/passwd" "github.com/rancher/k3s/pkg/token" "github.com/rancher/wrangler-api/pkg/generated/controllers/rbac" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1" "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" - _ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration ccmapp "k8s.io/kubernetes/cmd/cloud-controller-manager/app" app2 "k8s.io/kubernetes/cmd/controller-manager/app" - "k8s.io/kubernetes/cmd/kube-apiserver/app" - cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app" - sapp "k8s.io/kubernetes/cmd/kube-scheduler/app" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" "k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/proxy/util" + + // registering k3s cloud provider + _ "github.com/rancher/k3s/pkg/cloudprovider" + // for client metric registration + _ "k8s.io/component-base/metrics/prometheus/restclient" ) var ( @@ -106,10 +104,14 @@ func Server(ctx context.Context, cfg *config.Control) error { runtime.Authenticator = auth if !cfg.NoScheduler { - scheduler(cfg, runtime) + if err := scheduler(cfg, runtime); err != nil { + return err + } } - controllerManager(cfg, runtime) + if err := controllerManager(cfg, runtime); err != nil { + return err + } if !cfg.DisableCCM { cloudControllerManager(ctx, cfg, runtime) @@ -118,7 +120,7 @@ func Server(ctx context.Context, cfg *config.Control) error { return nil } -func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) { +func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) error { argsMap := map[string]string{ "kubeconfig": runtime.KubeConfigController, "service-account-private-key-file": runtime.ServiceKey, @@ -137,17 +139,12 @@ func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) { } args := config.GetArgsList(argsMap, cfg.ExtraControllerArgs) + logrus.Infof("Running kube-controller-manager %s", config.ArgString(args)) - command := cmapp.NewControllerManagerCommand() - command.SetArgs(args) - - go func() { - logrus.Infof("Running kube-controller-manager %s", config.ArgString(args)) - logrus.Fatalf("controller-manager exited: %v", command.Execute()) - }() + return executor.ControllerManager(args) } -func scheduler(cfg *config.Control, runtime *config.ControlRuntime) { +func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error { argsMap := map[string]string{ "kubeconfig": runtime.KubeConfigScheduler, "port": "10251", @@ -159,13 +156,8 @@ func scheduler(cfg *config.Control, runtime *config.ControlRuntime) { } args := config.GetArgsList(argsMap, cfg.ExtraSchedulerAPIArgs) - command := sapp.NewSchedulerCommand() - command.SetArgs(args) - - go func() { - logrus.Infof("Running kube-scheduler %s", config.ArgString(args)) - logrus.Fatalf("scheduler exited: %v", command.Execute()) - }() + logrus.Infof("Running kube-scheduler %s", config.ArgString(args)) + return executor.Scheduler(args) } func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) (authenticator.Request, http.Handler, error) { @@ -212,17 +204,8 @@ func apiServer(ctx context.Context, cfg *config.Control, runtime *config.Control } args := config.GetArgsList(argsMap, cfg.ExtraAPIArgs) - command := app.NewAPIServerCommand(ctx.Done()) - command.SetArgs(args) - - go func() { - logrus.Infof("Running kube-apiserver %s", config.ArgString(args)) - logrus.Fatalf("apiserver exited: %v", command.Execute()) - }() - - startupConfig := <-app.StartupConfig - - return startupConfig.Authenticator, startupConfig.Handler, nil + logrus.Infof("Running kube-apiserver %s", config.ArgString(args)) + return executor.APIServer(ctx, args) } func defaults(config *config.Control) { diff --git a/pkg/daemons/executor/embed.go b/pkg/daemons/executor/embed.go new file mode 100644 index 0000000000..d918a54ae8 --- /dev/null +++ b/pkg/daemons/executor/embed.go @@ -0,0 +1,80 @@ +// +build !no_embedded_executor + +package executor + +import ( + "context" + "net/http" + + "k8s.io/apiserver/pkg/authentication/authenticator" + + proxy "k8s.io/kubernetes/cmd/kube-proxy/app" + kubelet "k8s.io/kubernetes/cmd/kubelet/app" + + "github.com/sirupsen/logrus" + "k8s.io/kubernetes/cmd/kube-apiserver/app" + cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app" + sapp "k8s.io/kubernetes/cmd/kube-scheduler/app" +) + +func init() { + executor = Embedded{} +} + +type Embedded struct{} + +func (Embedded) Kubelet(args []string) error { + command := kubelet.NewKubeletCommand(context.Background().Done()) + command.SetArgs(args) + + go func() { + logrus.Fatalf("kubelet exited: %v", command.Execute()) + }() + + return nil +} + +func (Embedded) KubeProxy(args []string) error { + command := proxy.NewProxyCommand() + command.SetArgs(args) + + go func() { + logrus.Fatalf("kube-proxy exited: %v", command.Execute()) + }() + + return nil +} + +func (Embedded) APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error) { + command := app.NewAPIServerCommand(ctx.Done()) + command.SetArgs(args) + + go func() { + logrus.Fatalf("apiserver exited: %v", command.Execute()) + }() + + startupConfig := <-app.StartupConfig + return startupConfig.Authenticator, startupConfig.Handler, nil +} + +func (Embedded) Scheduler(args []string) error { + command := sapp.NewSchedulerCommand() + command.SetArgs(args) + + go func() { + logrus.Fatalf("scheduler exited: %v", command.Execute()) + }() + + return nil +} + +func (Embedded) ControllerManager(args []string) error { + command := cmapp.NewControllerManagerCommand() + command.SetArgs(args) + + go func() { + logrus.Fatalf("controller-manager exited: %v", command.Execute()) + }() + + return nil +} diff --git a/pkg/daemons/executor/executor.go b/pkg/daemons/executor/executor.go new file mode 100644 index 0000000000..f7fcc2f2ed --- /dev/null +++ b/pkg/daemons/executor/executor.go @@ -0,0 +1,44 @@ +package executor + +import ( + "context" + "net/http" + + "k8s.io/apiserver/pkg/authentication/authenticator" +) + +type Executor interface { + Kubelet(args []string) error + KubeProxy(args []string) error + APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error) + Scheduler(args []string) error + ControllerManager(args []string) error +} + +var ( + executor Executor +) + +func Set(driver Executor) { + executor = driver +} + +func Kubelet(args []string) error { + return executor.Kubelet(args) +} + +func KubeProxy(args []string) error { + return executor.KubeProxy(args) +} + +func APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error) { + return executor.APIServer(ctx, args) +} + +func Scheduler(args []string) error { + return executor.Scheduler(args) +} + +func ControllerManager(args []string) error { + return executor.ControllerManager(args) +}