Encapsulate execution logic

This moves all the calls to cobra root commands to one package
so that we can change the behavior of running components as embedded
or external.
This commit is contained in:
Darren Shepherd 2020-04-27 10:09:58 -07:00
parent 1d05e99769
commit afd6f6d7e7
4 changed files with 153 additions and 57 deletions

View File

@ -2,7 +2,6 @@ package agent
import (
"bufio"
"context"
"math/rand"
"os"
"path/filepath"
@ -11,11 +10,10 @@ import (
"github.com/opencontainers/runc/libcontainer/system"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/daemons/executor"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/component-base/logs"
proxy "k8s.io/kubernetes/cmd/kube-proxy/app"
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
_ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration
@ -28,7 +26,9 @@ func Agent(config *config.Agent) error {
logs.InitLogs()
defer logs.FlushLogs()
startKubelet(config)
if err := startKubelet(config); err != nil {
return err
}
if !config.DisableKubeProxy {
return startKubeProxy(config)
@ -49,18 +49,11 @@ func startKubeProxy(cfg *config.Agent) error {
}
args := config.GetArgsList(argsMap, cfg.ExtraKubeProxyArgs)
command := proxy.NewProxyCommand()
command.SetArgs(args)
go func() {
logrus.Infof("Running kube-proxy %s", config.ArgString(args))
logrus.Fatalf("kube-proxy exited: %v", command.Execute())
}()
return nil
logrus.Infof("Running kube-proxy %s", config.ArgString(args))
return executor.KubeProxy(args)
}
func startKubelet(cfg *config.Agent) {
func startKubelet(cfg *config.Agent) error {
argsMap := map[string]string{
"healthz-bind-address": "127.0.0.1",
"read-only-port": "0",
@ -163,13 +156,9 @@ func startKubelet(cfg *config.Agent) {
}
args := config.GetArgsList(argsMap, cfg.ExtraKubeletArgs)
command := kubelet.NewKubeletCommand(context.Background().Done())
command.SetArgs(args)
logrus.Infof("Running kubelet %s", config.ArgString(args))
go func() {
logrus.Infof("Running kubelet %s", config.ArgString(args))
logrus.Fatalf("kubelet exited: %v", command.Execute())
}()
return executor.Kubelet(args)
}
func addFeatureGate(current, new string) string {

View File

@ -19,33 +19,31 @@ import (
"text/template"
"time"
// registering k3s cloud provider
_ "github.com/rancher/k3s/pkg/cloudprovider"
"github.com/pkg/errors"
certutil "github.com/rancher/dynamiclistener/cert"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/cluster"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/daemons/executor"
"github.com/rancher/k3s/pkg/passwd"
"github.com/rancher/k3s/pkg/token"
"github.com/rancher/wrangler-api/pkg/generated/controllers/rbac"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
_ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration
ccmapp "k8s.io/kubernetes/cmd/cloud-controller-manager/app"
app2 "k8s.io/kubernetes/cmd/controller-manager/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
sapp "k8s.io/kubernetes/cmd/kube-scheduler/app"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/proxy/util"
// registering k3s cloud provider
_ "github.com/rancher/k3s/pkg/cloudprovider"
// for client metric registration
_ "k8s.io/component-base/metrics/prometheus/restclient"
)
var (
@ -106,10 +104,14 @@ func Server(ctx context.Context, cfg *config.Control) error {
runtime.Authenticator = auth
if !cfg.NoScheduler {
scheduler(cfg, runtime)
if err := scheduler(cfg, runtime); err != nil {
return err
}
}
controllerManager(cfg, runtime)
if err := controllerManager(cfg, runtime); err != nil {
return err
}
if !cfg.DisableCCM {
cloudControllerManager(ctx, cfg, runtime)
@ -118,7 +120,7 @@ func Server(ctx context.Context, cfg *config.Control) error {
return nil
}
func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) {
func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{
"kubeconfig": runtime.KubeConfigController,
"service-account-private-key-file": runtime.ServiceKey,
@ -137,17 +139,12 @@ func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) {
}
args := config.GetArgsList(argsMap, cfg.ExtraControllerArgs)
logrus.Infof("Running kube-controller-manager %s", config.ArgString(args))
command := cmapp.NewControllerManagerCommand()
command.SetArgs(args)
go func() {
logrus.Infof("Running kube-controller-manager %s", config.ArgString(args))
logrus.Fatalf("controller-manager exited: %v", command.Execute())
}()
return executor.ControllerManager(args)
}
func scheduler(cfg *config.Control, runtime *config.ControlRuntime) {
func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{
"kubeconfig": runtime.KubeConfigScheduler,
"port": "10251",
@ -159,13 +156,8 @@ func scheduler(cfg *config.Control, runtime *config.ControlRuntime) {
}
args := config.GetArgsList(argsMap, cfg.ExtraSchedulerAPIArgs)
command := sapp.NewSchedulerCommand()
command.SetArgs(args)
go func() {
logrus.Infof("Running kube-scheduler %s", config.ArgString(args))
logrus.Fatalf("scheduler exited: %v", command.Execute())
}()
logrus.Infof("Running kube-scheduler %s", config.ArgString(args))
return executor.Scheduler(args)
}
func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) (authenticator.Request, http.Handler, error) {
@ -212,17 +204,8 @@ func apiServer(ctx context.Context, cfg *config.Control, runtime *config.Control
}
args := config.GetArgsList(argsMap, cfg.ExtraAPIArgs)
command := app.NewAPIServerCommand(ctx.Done())
command.SetArgs(args)
go func() {
logrus.Infof("Running kube-apiserver %s", config.ArgString(args))
logrus.Fatalf("apiserver exited: %v", command.Execute())
}()
startupConfig := <-app.StartupConfig
return startupConfig.Authenticator, startupConfig.Handler, nil
logrus.Infof("Running kube-apiserver %s", config.ArgString(args))
return executor.APIServer(ctx, args)
}
func defaults(config *config.Control) {

View File

@ -0,0 +1,80 @@
// +build !no_embedded_executor
package executor
import (
"context"
"net/http"
"k8s.io/apiserver/pkg/authentication/authenticator"
proxy "k8s.io/kubernetes/cmd/kube-proxy/app"
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
"github.com/sirupsen/logrus"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
sapp "k8s.io/kubernetes/cmd/kube-scheduler/app"
)
func init() {
executor = Embedded{}
}
type Embedded struct{}
func (Embedded) Kubelet(args []string) error {
command := kubelet.NewKubeletCommand(context.Background().Done())
command.SetArgs(args)
go func() {
logrus.Fatalf("kubelet exited: %v", command.Execute())
}()
return nil
}
func (Embedded) KubeProxy(args []string) error {
command := proxy.NewProxyCommand()
command.SetArgs(args)
go func() {
logrus.Fatalf("kube-proxy exited: %v", command.Execute())
}()
return nil
}
func (Embedded) APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error) {
command := app.NewAPIServerCommand(ctx.Done())
command.SetArgs(args)
go func() {
logrus.Fatalf("apiserver exited: %v", command.Execute())
}()
startupConfig := <-app.StartupConfig
return startupConfig.Authenticator, startupConfig.Handler, nil
}
func (Embedded) Scheduler(args []string) error {
command := sapp.NewSchedulerCommand()
command.SetArgs(args)
go func() {
logrus.Fatalf("scheduler exited: %v", command.Execute())
}()
return nil
}
func (Embedded) ControllerManager(args []string) error {
command := cmapp.NewControllerManagerCommand()
command.SetArgs(args)
go func() {
logrus.Fatalf("controller-manager exited: %v", command.Execute())
}()
return nil
}

View File

@ -0,0 +1,44 @@
package executor
import (
"context"
"net/http"
"k8s.io/apiserver/pkg/authentication/authenticator"
)
type Executor interface {
Kubelet(args []string) error
KubeProxy(args []string) error
APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error)
Scheduler(args []string) error
ControllerManager(args []string) error
}
var (
executor Executor
)
func Set(driver Executor) {
executor = driver
}
func Kubelet(args []string) error {
return executor.Kubelet(args)
}
func KubeProxy(args []string) error {
return executor.KubeProxy(args)
}
func APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error) {
return executor.APIServer(ctx, args)
}
func Scheduler(args []string) error {
return executor.Scheduler(args)
}
func ControllerManager(args []string) error {
return executor.ControllerManager(args)
}