mirror of
https://github.com/k3s-io/k3s.git
synced 2024-06-07 19:41:36 +00:00
Move cloud-controller-manager into an embedded executor (#3525)
* Move cloud-controller-manager into an embedded executor * Import K3s cloud provider and clean up imports Signed-off-by: Chris Kim <oats87g@gmail.com>
This commit is contained in:
parent
f3d0a857d2
commit
04398a2582
@ -20,23 +20,16 @@ import (
|
||||
"github.com/rancher/k3s/pkg/version"
|
||||
"github.com/rancher/wrangler-api/pkg/generated/controllers/rbac"
|
||||
"github.com/sirupsen/logrus"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
ccm "k8s.io/cloud-provider"
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
ccmapp "k8s.io/cloud-provider/app"
|
||||
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
|
||||
ccmopt "k8s.io/cloud-provider/options"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
app2 "k8s.io/controller-manager/app"
|
||||
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
|
||||
"k8s.io/kubernetes/pkg/proxy/util"
|
||||
|
||||
// registering k3s cloud provider
|
||||
_ "github.com/rancher/k3s/pkg/cloudprovider"
|
||||
// for client metric registration
|
||||
_ "k8s.io/component-base/metrics/prometheus/restclient"
|
||||
)
|
||||
@ -90,7 +83,9 @@ func Server(ctx context.Context, cfg *config.Control) error {
|
||||
}
|
||||
|
||||
if !cfg.DisableCCM {
|
||||
cloudControllerManager(ctx, cfg, runtime)
|
||||
if err := cloudControllerManager(ctx, cfg, runtime); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -347,89 +342,79 @@ func setupStorageBackend(argsMap map[string]string, cfg *config.Control) {
|
||||
}
|
||||
}
|
||||
|
||||
func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) {
|
||||
// Reference: https://github.com/kubernetes/kubernetes/pull/93764
|
||||
// The above-linked change made the in-tree cloud-controller-manager command much more flexible
|
||||
// but much more complicated to wrap. It now validates some of the configuration very early on, before
|
||||
// the CLI args are parsed, so some of the configuration needs to be hardcoded instead of set via flags.
|
||||
// Reference: https://github.com/kubernetes/kubernetes/pull/98210
|
||||
// The above-linked change further clarifies the intent of the example cloud-controller-manager.
|
||||
|
||||
func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error {
|
||||
argsMap := map[string]string{
|
||||
"profiling": "false",
|
||||
"profiling": "false",
|
||||
"allocate-node-cidrs": "true", // ccmOptions.KubeCloudShared.AllocateNodeCIDRs = true
|
||||
"cloud-provider": version.Program, // ccmOptions.KubeCloudShared.CloudProvider.Name = version.Program
|
||||
"cluster-cidr": util2.JoinIPNets(cfg.ClusterIPRanges), // ccmOptions.KubeCloudShared.ClusterCIDR = util2.JoinIPNets(cfg.ClusterIPRanges)
|
||||
"configure-cloud-routes": "false", // ccmOptions.KubeCloudShared.ConfigureCloudRoutes = false
|
||||
"kubeconfig": runtime.KubeConfigCloudController, // ccmOptions.Kubeconfig = runtime.KubeConfigCloudController
|
||||
"node-status-update-frequency": "1m0s", // ccmOptions.NodeStatusUpdateFrequency = metav1.Duration{Duration: 1 * time.Minute}
|
||||
"bind-address": "127.0.0.1", // ccmOptions.SecureServing.BindAddress = localhostIP
|
||||
"port": "0", // ccmOptions.SecureServing.BindPort = 0
|
||||
}
|
||||
if cfg.NoLeaderElect {
|
||||
argsMap["leader-elect"] = "false"
|
||||
}
|
||||
args := config.GetArgsList(argsMap, cfg.ExtraCloudControllerArgs)
|
||||
|
||||
ccmOptions, err := ccmopt.NewCloudControllerManagerOptions()
|
||||
if err != nil {
|
||||
logrus.Fatalf("Unable to initialize cloudcontroller options: %v", err)
|
||||
}
|
||||
logrus.Infof("Running cloud-controller-manager %s", config.ArgString(args))
|
||||
|
||||
ccmOptions.KubeCloudShared.AllocateNodeCIDRs = true
|
||||
ccmOptions.KubeCloudShared.CloudProvider.Name = version.Program
|
||||
ccmOptions.KubeCloudShared.ClusterCIDR = util2.JoinIPNets(cfg.ClusterIPRanges)
|
||||
ccmOptions.KubeCloudShared.ConfigureCloudRoutes = false
|
||||
ccmOptions.Kubeconfig = runtime.KubeConfigCloudController
|
||||
ccmOptions.NodeStatusUpdateFrequency = metav1.Duration{Duration: 1 * time.Minute}
|
||||
ccmOptions.SecureServing.BindAddress = localhostIP
|
||||
ccmOptions.SecureServing.BindPort = 0
|
||||
|
||||
if cfg.NoLeaderElect {
|
||||
ccmOptions.Generic.LeaderElection.LeaderElect = false
|
||||
}
|
||||
|
||||
controllerInitializers := ccmapp.DefaultInitFuncConstructors
|
||||
delete(controllerInitializers, "service")
|
||||
delete(controllerInitializers, "route")
|
||||
|
||||
cloudInitializer := func(config *cloudcontrollerconfig.CompletedConfig) cloudprovider.Interface {
|
||||
cloud, err := ccm.InitCloudProvider(version.Program, runtime.KubeConfigCloudController)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Cloud provider could not be initialized: %v", err)
|
||||
}
|
||||
if cloud == nil {
|
||||
logrus.Fatalf("Cloud provider is nil")
|
||||
}
|
||||
|
||||
cloud.Initialize(config.ClientBuilder, make(chan struct{}))
|
||||
if informerUserCloud, ok := cloud.(ccm.InformerUser); ok {
|
||||
informerUserCloud.SetInformers(config.SharedInformers)
|
||||
}
|
||||
|
||||
return cloud
|
||||
}
|
||||
|
||||
command := ccmapp.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, cliflag.NamedFlagSets{}, wait.NeverStop)
|
||||
command.SetArgs(args)
|
||||
ccmRBACReady := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(ccmRBACReady)
|
||||
|
||||
apiReadyLoop:
|
||||
for {
|
||||
// check for the cloud controller rbac binding
|
||||
if err := checkForCloudControllerPrivileges(runtime); err != nil {
|
||||
logrus.Infof("Waiting for cloudcontroller rbac role to be created")
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logrus.Fatalf("cloud-controller-manager context canceled: %v", ctx.Err())
|
||||
case <-time.After(5 * time.Second):
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-runtime.APIServerReady:
|
||||
break apiReadyLoop
|
||||
case <-time.After(30 * time.Second):
|
||||
logrus.Infof("Waiting for API server to become available")
|
||||
}
|
||||
}
|
||||
|
||||
logrus.Infof("Waiting for cloud-controller-manager privileges to become available")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case err := <-promise(func() error { return checkForCloudControllerPrivileges(runtime, 5*time.Second) }):
|
||||
if err != nil {
|
||||
logrus.Infof("Waiting for cloud-controller-manager privileges to become available")
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
break
|
||||
}
|
||||
logrus.Infof("Running cloud-controller-manager with args %v", config.ArgString(args))
|
||||
logrus.Fatalf("cloud-controller-manager exited: %v", command.ExecuteContext(ctx))
|
||||
}()
|
||||
|
||||
return executor.CloudControllerManager(ccmRBACReady, args)
|
||||
}
|
||||
|
||||
func checkForCloudControllerPrivileges(runtime *config.ControlRuntime) error {
|
||||
func checkForCloudControllerPrivileges(runtime *config.ControlRuntime, timeout time.Duration) error {
|
||||
restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
crb := rbac.NewFactoryFromConfigOrDie(restConfig).Rbac().V1().ClusterRoleBinding()
|
||||
_, err = crb.Get(version.Program+"-cloud-controller-manager", metav1.GetOptions{})
|
||||
err = wait.PollImmediate(time.Second, timeout, func() (bool, error) {
|
||||
crb := rbac.NewFactoryFromConfigOrDie(restConfig).Rbac().V1().ClusterRoleBinding()
|
||||
_, err = crb.Get(version.Program+"-cloud-controller-manager", metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
logrus.Errorf("error encountered waitng for cloud-controller-manager privileges: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -6,17 +6,26 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
|
||||
proxy "k8s.io/kubernetes/cmd/kube-proxy/app"
|
||||
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
|
||||
|
||||
"github.com/rancher/k3s/pkg/cli/cmds"
|
||||
daemonconfig "github.com/rancher/k3s/pkg/daemons/config"
|
||||
"github.com/rancher/k3s/pkg/version"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
ccm "k8s.io/cloud-provider"
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
ccmapp "k8s.io/cloud-provider/app"
|
||||
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
|
||||
ccmopt "k8s.io/cloud-provider/options"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app"
|
||||
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
|
||||
proxy "k8s.io/kubernetes/cmd/kube-proxy/app"
|
||||
sapp "k8s.io/kubernetes/cmd/kube-scheduler/app"
|
||||
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
|
||||
|
||||
// registering k3s cloud provider
|
||||
_ "github.com/rancher/k3s/pkg/cloudprovider"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -87,3 +96,41 @@ func (Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error {
|
||||
ccmOptions, err := ccmopt.NewCloudControllerManagerOptions()
|
||||
if err != nil {
|
||||
logrus.Fatalf("unable to initialize command options: %v", err)
|
||||
}
|
||||
|
||||
cloudInitializer := func(config *cloudcontrollerconfig.CompletedConfig) cloudprovider.Interface {
|
||||
cloud, err := ccm.InitCloudProvider(version.Program, "")
|
||||
if err != nil {
|
||||
logrus.Fatalf("Cloud provider could not be initialized: %v", err)
|
||||
}
|
||||
if cloud == nil {
|
||||
logrus.Fatalf("Cloud provider is nil")
|
||||
}
|
||||
|
||||
cloud.Initialize(config.ClientBuilder, make(chan struct{}))
|
||||
if informerUserCloud, ok := cloud.(ccm.InformerUser); ok {
|
||||
informerUserCloud.SetInformers(config.SharedInformers)
|
||||
}
|
||||
|
||||
return cloud
|
||||
}
|
||||
|
||||
controllerInitializers := ccmapp.DefaultInitFuncConstructors
|
||||
delete(controllerInitializers, "service")
|
||||
delete(controllerInitializers, "route")
|
||||
|
||||
command := ccmapp.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, cliflag.NamedFlagSets{}, wait.NeverStop)
|
||||
command.SetArgs(args)
|
||||
|
||||
go func() {
|
||||
<-ccmRBACReady
|
||||
logrus.Fatalf("cloud-controller-manager exited: %v", command.Execute())
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ type Executor interface {
|
||||
ControllerManager(apiReady <-chan struct{}, args []string) error
|
||||
CurrentETCDOptions() (InitialOptions, error)
|
||||
ETCD(args ETCDConfig) error
|
||||
CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error
|
||||
}
|
||||
|
||||
type ETCDConfig struct {
|
||||
@ -115,3 +116,7 @@ func CurrentETCDOptions() (InitialOptions, error) {
|
||||
func ETCD(args ETCDConfig) error {
|
||||
return executor.ETCD(args)
|
||||
}
|
||||
|
||||
func CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error {
|
||||
return executor.CloudControllerManager(ccmRBACReady, args)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user