Merge pull request #1696 from ibuildthecloud/executor

Encapsulate execution logic
This commit is contained in:
Darren Shepherd 2020-05-05 16:01:31 -07:00 committed by GitHub
commit b86256bf65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 153 additions and 57 deletions

View File

@ -2,7 +2,6 @@ package agent
import (
"bufio"
"context"
"math/rand"
"os"
"path/filepath"
@ -11,11 +10,10 @@ import (
"github.com/opencontainers/runc/libcontainer/system"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/daemons/executor"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/component-base/logs"
proxy "k8s.io/kubernetes/cmd/kube-proxy/app"
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
_ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration
@ -28,7 +26,9 @@ func Agent(config *config.Agent) error {
logs.InitLogs()
defer logs.FlushLogs()
startKubelet(config)
if err := startKubelet(config); err != nil {
return err
}
if !config.DisableKubeProxy {
return startKubeProxy(config)
@ -49,18 +49,11 @@ func startKubeProxy(cfg *config.Agent) error {
}
args := config.GetArgsList(argsMap, cfg.ExtraKubeProxyArgs)
command := proxy.NewProxyCommand()
command.SetArgs(args)
go func() {
logrus.Infof("Running kube-proxy %s", config.ArgString(args))
logrus.Fatalf("kube-proxy exited: %v", command.Execute())
}()
return nil
logrus.Infof("Running kube-proxy %s", config.ArgString(args))
return executor.KubeProxy(args)
}
func startKubelet(cfg *config.Agent) {
func startKubelet(cfg *config.Agent) error {
argsMap := map[string]string{
"healthz-bind-address": "127.0.0.1",
"read-only-port": "0",
@ -163,13 +156,9 @@ func startKubelet(cfg *config.Agent) {
}
args := config.GetArgsList(argsMap, cfg.ExtraKubeletArgs)
command := kubelet.NewKubeletCommand(context.Background().Done())
command.SetArgs(args)
logrus.Infof("Running kubelet %s", config.ArgString(args))
go func() {
logrus.Infof("Running kubelet %s", config.ArgString(args))
logrus.Fatalf("kubelet exited: %v", command.Execute())
}()
return executor.Kubelet(args)
}
func addFeatureGate(current, new string) string {

View File

@ -19,33 +19,31 @@ import (
"text/template"
"time"
// registering k3s cloud provider
_ "github.com/rancher/k3s/pkg/cloudprovider"
"github.com/pkg/errors"
certutil "github.com/rancher/dynamiclistener/cert"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/cluster"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/daemons/executor"
"github.com/rancher/k3s/pkg/passwd"
"github.com/rancher/k3s/pkg/token"
"github.com/rancher/wrangler-api/pkg/generated/controllers/rbac"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
_ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration
ccmapp "k8s.io/kubernetes/cmd/cloud-controller-manager/app"
app2 "k8s.io/kubernetes/cmd/controller-manager/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
sapp "k8s.io/kubernetes/cmd/kube-scheduler/app"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/proxy/util"
// registering k3s cloud provider
_ "github.com/rancher/k3s/pkg/cloudprovider"
// for client metric registration
_ "k8s.io/component-base/metrics/prometheus/restclient"
)
var (
@ -106,10 +104,14 @@ func Server(ctx context.Context, cfg *config.Control) error {
runtime.Authenticator = auth
if !cfg.NoScheduler {
scheduler(cfg, runtime)
if err := scheduler(cfg, runtime); err != nil {
return err
}
}
controllerManager(cfg, runtime)
if err := controllerManager(cfg, runtime); err != nil {
return err
}
if !cfg.DisableCCM {
cloudControllerManager(ctx, cfg, runtime)
@ -118,7 +120,7 @@ func Server(ctx context.Context, cfg *config.Control) error {
return nil
}
func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) {
func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{
"kubeconfig": runtime.KubeConfigController,
"service-account-private-key-file": runtime.ServiceKey,
@ -137,17 +139,12 @@ func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) {
}
args := config.GetArgsList(argsMap, cfg.ExtraControllerArgs)
logrus.Infof("Running kube-controller-manager %s", config.ArgString(args))
command := cmapp.NewControllerManagerCommand()
command.SetArgs(args)
go func() {
logrus.Infof("Running kube-controller-manager %s", config.ArgString(args))
logrus.Fatalf("controller-manager exited: %v", command.Execute())
}()
return executor.ControllerManager(args)
}
func scheduler(cfg *config.Control, runtime *config.ControlRuntime) {
func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{
"kubeconfig": runtime.KubeConfigScheduler,
"port": "10251",
@ -159,13 +156,8 @@ func scheduler(cfg *config.Control, runtime *config.ControlRuntime) {
}
args := config.GetArgsList(argsMap, cfg.ExtraSchedulerAPIArgs)
command := sapp.NewSchedulerCommand()
command.SetArgs(args)
go func() {
logrus.Infof("Running kube-scheduler %s", config.ArgString(args))
logrus.Fatalf("scheduler exited: %v", command.Execute())
}()
logrus.Infof("Running kube-scheduler %s", config.ArgString(args))
return executor.Scheduler(args)
}
func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) (authenticator.Request, http.Handler, error) {
@ -212,17 +204,8 @@ func apiServer(ctx context.Context, cfg *config.Control, runtime *config.Control
}
args := config.GetArgsList(argsMap, cfg.ExtraAPIArgs)
command := app.NewAPIServerCommand(ctx.Done())
command.SetArgs(args)
go func() {
logrus.Infof("Running kube-apiserver %s", config.ArgString(args))
logrus.Fatalf("apiserver exited: %v", command.Execute())
}()
startupConfig := <-app.StartupConfig
return startupConfig.Authenticator, startupConfig.Handler, nil
logrus.Infof("Running kube-apiserver %s", config.ArgString(args))
return executor.APIServer(ctx, args)
}
func defaults(config *config.Control) {

View File

@ -0,0 +1,80 @@
// +build !no_embedded_executor
package executor
import (
"context"
"net/http"
"k8s.io/apiserver/pkg/authentication/authenticator"
proxy "k8s.io/kubernetes/cmd/kube-proxy/app"
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
"github.com/sirupsen/logrus"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
sapp "k8s.io/kubernetes/cmd/kube-scheduler/app"
)
func init() {
executor = Embedded{}
}
type Embedded struct{}
func (Embedded) Kubelet(args []string) error {
command := kubelet.NewKubeletCommand(context.Background().Done())
command.SetArgs(args)
go func() {
logrus.Fatalf("kubelet exited: %v", command.Execute())
}()
return nil
}
func (Embedded) KubeProxy(args []string) error {
command := proxy.NewProxyCommand()
command.SetArgs(args)
go func() {
logrus.Fatalf("kube-proxy exited: %v", command.Execute())
}()
return nil
}
func (Embedded) APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error) {
command := app.NewAPIServerCommand(ctx.Done())
command.SetArgs(args)
go func() {
logrus.Fatalf("apiserver exited: %v", command.Execute())
}()
startupConfig := <-app.StartupConfig
return startupConfig.Authenticator, startupConfig.Handler, nil
}
func (Embedded) Scheduler(args []string) error {
command := sapp.NewSchedulerCommand()
command.SetArgs(args)
go func() {
logrus.Fatalf("scheduler exited: %v", command.Execute())
}()
return nil
}
func (Embedded) ControllerManager(args []string) error {
command := cmapp.NewControllerManagerCommand()
command.SetArgs(args)
go func() {
logrus.Fatalf("controller-manager exited: %v", command.Execute())
}()
return nil
}

View File

@ -0,0 +1,44 @@
package executor
import (
"context"
"net/http"
"k8s.io/apiserver/pkg/authentication/authenticator"
)
type Executor interface {
Kubelet(args []string) error
KubeProxy(args []string) error
APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error)
Scheduler(args []string) error
ControllerManager(args []string) error
}
var (
executor Executor
)
func Set(driver Executor) {
executor = driver
}
func Kubelet(args []string) error {
return executor.Kubelet(args)
}
func KubeProxy(args []string) error {
return executor.KubeProxy(args)
}
func APIServer(ctx context.Context, args []string) (authenticator.Request, http.Handler, error) {
return executor.APIServer(ctx, args)
}
func Scheduler(args []string) error {
return executor.Scheduler(args)
}
func ControllerManager(args []string) error {
return executor.ControllerManager(args)
}