From 3d146d2f1b438e4387baeab480189bab1bfbfca1 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Wed, 8 Feb 2023 00:37:10 +0000 Subject: [PATCH] Allow for multiple sets of leader-elected controllers Addresses an issue where etcd controllers did not run on etcd-only nodes Signed-off-by: Brad Davidson --- pkg/cli/cert/cert.go | 4 +-- pkg/cli/etcdsnapshot/etcd_snapshot.go | 2 +- pkg/cli/server/server.go | 2 +- pkg/daemons/config/types.go | 24 +++++++++----- pkg/etcd/apiaddresses_controller.go | 4 --- pkg/etcd/etcd.go | 47 ++++++++++++++++----------- pkg/etcd/etcd_test.go | 2 +- pkg/etcd/member_controller.go | 4 --- pkg/etcd/metadata_controller.go | 38 ++++++++++++++++------ pkg/server/cert.go | 9 ++--- pkg/server/server.go | 47 +++++++++++++++------------ tests/unit.go | 2 +- 12 files changed, 110 insertions(+), 75 deletions(-) diff --git a/pkg/cli/cert/cert.go b/pkg/cli/cert/cert.go index b5e2d8a88d..30a73f7028 100644 --- a/pkg/cli/cert/cert.go +++ b/pkg/cli/cert/cert.go @@ -70,7 +70,7 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) (string } sc.ControlConfig.Token = cfg.Token - sc.ControlConfig.Runtime = &config.ControlRuntime{} + sc.ControlConfig.Runtime = config.NewRuntime(nil) return dataDir, nil } @@ -292,7 +292,7 @@ func rotateCA(app *cli.Context, cfg *cmds.Server, sync *cmds.CertRotateCA) error // Set up dummy server config for reading new bootstrap data from disk. tmpServer := &config.Control{ - Runtime: &config.ControlRuntime{}, + Runtime: config.NewRuntime(nil), DataDir: filepath.Dir(sync.CACertPath), } deps.CreateRuntimeCertFiles(tmpServer) diff --git a/pkg/cli/etcdsnapshot/etcd_snapshot.go b/pkg/cli/etcdsnapshot/etcd_snapshot.go index a900fa5e8a..b61be1515b 100644 --- a/pkg/cli/etcdsnapshot/etcd_snapshot.go +++ b/pkg/cli/etcdsnapshot/etcd_snapshot.go @@ -60,7 +60,7 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) error { sc.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder sc.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure sc.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout - sc.ControlConfig.Runtime = &config.ControlRuntime{} + sc.ControlConfig.Runtime = config.NewRuntime(nil) sc.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt") sc.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt") sc.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key") diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index c9f5ca2e06..a7eba96639 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -95,7 +95,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont serverConfig := server.Config{} serverConfig.DisableAgent = cfg.DisableAgent - serverConfig.ControlConfig.Runtime = &config.ControlRuntime{AgentReady: agentReady} + serverConfig.ControlConfig.Runtime = config.NewRuntime(agentReady) serverConfig.ControlConfig.Token = cfg.Token serverConfig.ControlConfig.AgentToken = cfg.AgentToken serverConfig.ControlConfig.JoinURL = cfg.ServerURL diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 3b577bce1d..79a5a023c6 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -1,7 +1,6 @@ package config import ( - "context" "crypto/tls" "fmt" "net" @@ -14,6 +13,7 @@ import ( "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/kine/pkg/endpoint" "github.com/rancher/wrangler/pkg/generated/controllers/core" + "github.com/rancher/wrangler/pkg/leader" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/authentication/authenticator" utilsnet "k8s.io/utils/net" @@ -276,13 +276,13 @@ type ControlRuntimeBootstrap struct { type ControlRuntime struct { ControlRuntimeBootstrap - HTTPBootstrap bool - APIServerReady <-chan struct{} - AgentReady <-chan struct{} - ETCDReady <-chan struct{} - StartupHooksWg *sync.WaitGroup - ClusterControllerStart func(ctx context.Context) error - LeaderElectedClusterControllerStart func(ctx context.Context) error + HTTPBootstrap bool + APIServerReady <-chan struct{} + AgentReady <-chan struct{} + ETCDReady <-chan struct{} + StartupHooksWg *sync.WaitGroup + ClusterControllerStarts map[string]leader.Callback + LeaderElectedClusterControllerStarts map[string]leader.Callback ClientKubeAPICert string ClientKubeAPIKey string @@ -339,6 +339,14 @@ type ControlRuntime struct { EtcdConfig endpoint.ETCDConfig } +func NewRuntime(agentReady <-chan struct{}) *ControlRuntime { + return &ControlRuntime{ + AgentReady: agentReady, + ClusterControllerStarts: map[string]leader.Callback{}, + LeaderElectedClusterControllerStarts: map[string]leader.Callback{}, + } +} + type ArgString []string func (a ArgString) String() string { diff --git a/pkg/etcd/apiaddresses_controller.go b/pkg/etcd/apiaddresses_controller.go index 50483192fd..916f2b2148 100644 --- a/pkg/etcd/apiaddresses_controller.go +++ b/pkg/etcd/apiaddresses_controller.go @@ -17,10 +17,6 @@ import ( ) func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) { - if etcd.config.DisableAPIServer { - return - } - endpoints := etcd.config.Runtime.Core.Core().V1().Endpoints() fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String() lw := &cache.ListWatch{ diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 5efdb6cb54..8e30b74b0f 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -549,30 +549,39 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey - tombstoneFile := filepath.Join(DBDir(e.config), "tombstone") - if _, err := os.Stat(tombstoneFile); err == nil { - logrus.Infof("tombstone file has been detected, removing data dir to rejoin the cluster") - if _, err := backupDirWithRetention(DBDir(e.config), maxBackupRetention); err != nil { - return nil, err + e.config.Runtime.ClusterControllerStarts["etcd-node-metadata"] = func(ctx context.Context) { + registerMetadataHandlers(ctx, e) + } + + // The apiserver endpoint controller needs to run on a node with a local apiserver, + // in order to successfully seed etcd with the endpoint list. + if !e.config.DisableAPIServer { + e.config.Runtime.LeaderElectedClusterControllerStarts["etcd-apiserver-endpoints"] = func(ctx context.Context) { + registerEndpointsHandlers(ctx, e) } } - if err := e.setName(false); err != nil { - return nil, err + // The etcd member-removal controllers should only run on an etcd node. Tombstone file checking + // is also unnecessary if we're not running etcd. + if !e.config.DisableETCD { + tombstoneFile := filepath.Join(DBDir(e.config), "tombstone") + if _, err := os.Stat(tombstoneFile); err == nil { + logrus.Infof("tombstone file has been detected, removing data dir to rejoin the cluster") + if _, err := backupDirWithRetention(DBDir(e.config), maxBackupRetention); err != nil { + return nil, err + } + } + + if err := e.setName(false); err != nil { + return nil, err + } + + e.config.Runtime.LeaderElectedClusterControllerStarts["etcd-member-removal"] = func(ctx context.Context) { + registerMemberHandlers(ctx, e) + } } - e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error { - registerMetadataHandlers(ctx, e) - return nil - } - - e.config.Runtime.LeaderElectedClusterControllerStart = func(ctx context.Context) error { - registerMemberHandlers(ctx, e) - registerEndpointsHandlers(ctx, e) - return nil - } - - return e.handler(handler), err + return e.handler(handler), nil } // setName sets a unique name for this cluster member. The first time this is called, diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 0e7094e012..f71ad1e47b 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -37,7 +37,7 @@ func generateTestConfig() *config.Control { ServiceIPRange: testutil.ServiceIPNet(), } return &config.Control{ - Runtime: &config.ControlRuntime{AgentReady: agentReady}, + Runtime: config.NewRuntime(agentReady), HTTPSPort: 6443, SupervisorPort: 6443, AdvertisePort: 6443, diff --git a/pkg/etcd/member_controller.go b/pkg/etcd/member_controller.go index c26df5c1dc..96c24aa6dd 100644 --- a/pkg/etcd/member_controller.go +++ b/pkg/etcd/member_controller.go @@ -12,10 +12,6 @@ import ( ) func registerMemberHandlers(ctx context.Context, etcd *ETCD) { - if etcd.config.DisableETCD { - return - } - nodes := etcd.config.Runtime.Core.Core().V1().Node() e := &etcdMemberHandler{ etcd: etcd, diff --git a/pkg/etcd/metadata_controller.go b/pkg/etcd/metadata_controller.go index 915f5814da..8481775c4b 100644 --- a/pkg/etcd/metadata_controller.go +++ b/pkg/etcd/metadata_controller.go @@ -11,10 +11,6 @@ import ( ) func registerMetadataHandlers(ctx context.Context, etcd *ETCD) { - if etcd.config.DisableETCD { - return - } - nodes := etcd.config.Runtime.Core.Core().V1().Node() h := &metadataHandler{ etcd: etcd, @@ -22,7 +18,7 @@ func registerMetadataHandlers(ctx context.Context, etcd *ETCD) { ctx: ctx, } - logrus.Infof("Starting managed etcd node label controller") + logrus.Infof("Starting managed etcd node metadata controller") nodes.OnChange(ctx, "managed-etcd-metadata-controller", h.sync) } @@ -39,7 +35,7 @@ func (m *metadataHandler) sync(key string, node *v1.Node) (*v1.Node, error) { nodeName := os.Getenv("NODE_NAME") if nodeName == "" { - logrus.Debug("waiting for node name to be assigned for managed etcd node label controller") + logrus.Debug("waiting for node name to be assigned for managed etcd node metadata controller") m.nodeController.EnqueueAfter(key, 5*time.Second) return node, nil } @@ -52,10 +48,31 @@ func (m *metadataHandler) sync(key string, node *v1.Node) (*v1.Node, error) { } func (m *metadataHandler) handleSelf(node *v1.Node) (*v1.Node, error) { + if m.etcd.config.DisableETCD { + if node.Annotations[NodeNameAnnotation] == "" && + node.Annotations[NodeAddressAnnotation] == "" && + node.Labels[EtcdRoleLabel] == "" { + return node, nil + } + + node = node.DeepCopy() + if node.Annotations == nil { + node.Annotations = map[string]string{} + } + if node.Labels == nil { + node.Labels = map[string]string{} + } + + delete(node.Annotations, NodeNameAnnotation) + delete(node.Annotations, NodeAddressAnnotation) + delete(node.Labels, EtcdRoleLabel) + + return m.nodeController.Update(node) + } + if node.Annotations[NodeNameAnnotation] == m.etcd.name && node.Annotations[NodeAddressAnnotation] == m.etcd.address && - node.Labels[EtcdRoleLabel] == "true" && - node.Labels[ControlPlaneLabel] == "true" { + node.Labels[EtcdRoleLabel] == "true" { return node, nil } @@ -63,12 +80,13 @@ func (m *metadataHandler) handleSelf(node *v1.Node) (*v1.Node, error) { if node.Annotations == nil { node.Annotations = map[string]string{} } + if node.Labels == nil { + node.Labels = map[string]string{} + } node.Annotations[NodeNameAnnotation] = m.etcd.name node.Annotations[NodeAddressAnnotation] = m.etcd.address node.Labels[EtcdRoleLabel] = "true" - node.Labels[MasterLabel] = "true" - node.Labels[ControlPlaneLabel] = "true" return m.nodeController.Update(node) } diff --git a/pkg/server/cert.go b/pkg/server/cert.go index 5ef4520bb1..e90e2c6c4e 100644 --- a/pkg/server/cert.go +++ b/pkg/server/cert.go @@ -54,11 +54,12 @@ func caCertReplace(server *config.Control, buf io.ReadCloser, force bool) error } defer os.RemoveAll(tmpdir) + runtime := config.NewRuntime(nil) + runtime.EtcdConfig = server.Runtime.EtcdConfig + runtime.ServerToken = server.Runtime.ServerToken + tmpServer := &config.Control{ - Runtime: &config.ControlRuntime{ - EtcdConfig: server.Runtime.EtcdConfig, - ServerToken: server.Runtime.ServerToken, - }, + Runtime: runtime, Token: server.Token, DataDir: tmpdir, } diff --git a/pkg/server/server.go b/pkg/server/server.go index a9ea872ca9..72d10c83e2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -6,6 +6,7 @@ import ( "os" "path" "path/filepath" + "runtime/debug" "strconv" "strings" "sync" @@ -123,15 +124,13 @@ func runControllers(ctx context.Context, config *Config) error { } controlConfig.Runtime.Core = sc.Core - if controlConfig.Runtime.ClusterControllerStart != nil { - if err := controlConfig.Runtime.ClusterControllerStart(ctx); err != nil { - return errors.Wrap(err, "failed to start cluster controllers") - } + for name, cb := range controlConfig.Runtime.ClusterControllerStarts { + go runOrDie(ctx, name, cb) } for _, controller := range config.Controllers { if err := controller(ctx, sc); err != nil { - return errors.Wrapf(err, "failed to start custom controller %s", util.GetFunctionName(controller)) + return errors.Wrapf(err, "failed to start %s controller", util.GetFunctionName(controller)) } } @@ -139,18 +138,16 @@ func runControllers(ctx context.Context, config *Config) error { return errors.Wrap(err, "failed to start wranger controllers") } - start := func(ctx context.Context) { + controlConfig.Runtime.LeaderElectedClusterControllerStarts[version.Program] = func(ctx context.Context) { + if controlConfig.DisableAPIServer { + return + } if err := coreControllers(ctx, sc, config); err != nil { panic(err) } - if controlConfig.Runtime.LeaderElectedClusterControllerStart != nil { - if err := controlConfig.Runtime.LeaderElectedClusterControllerStart(ctx); err != nil { - panic(errors.Wrap(err, "failed to start leader elected cluster controllers")) - } - } for _, controller := range config.LeaderControllers { if err := controller(ctx, sc); err != nil { - panic(errors.Wrap(err, "leader controller")) + panic(errors.Wrapf(err, "failed to start %s leader controller", util.GetFunctionName(controller))) } } if err := sc.Start(ctx); err != nil { @@ -163,20 +160,30 @@ func runControllers(ctx context.Context, config *Config) error { go setClusterDNSConfig(ctx, config, sc.Core.Core().V1().ConfigMap()) if controlConfig.NoLeaderElect { - go func() { - start(ctx) - <-ctx.Done() - if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) { - logrus.Fatalf("controllers exited: %v", err) - } - }() + for name, cb := range controlConfig.Runtime.LeaderElectedClusterControllerStarts { + go runOrDie(ctx, name, cb) + } } else { - go leader.RunOrDie(ctx, "", version.Program, sc.K8s, start) + for name, cb := range controlConfig.Runtime.LeaderElectedClusterControllerStarts { + go leader.RunOrDie(ctx, "", name, sc.K8s, cb) + } } return nil } +// runOrDie is similar to leader.RunOrDie, except that it runs the callback +// immediately, without performing leader election. +func runOrDie(ctx context.Context, name string, cb leader.Callback) { + defer func() { + if err := recover(); err != nil { + logrus.WithField("stack", debug.Stack()).Fatalf("%s controller panic: %v", name, err) + } + }() + cb(ctx) + <-ctx.Done() +} + func coreControllers(ctx context.Context, sc *Context, config *Config) error { if err := node.Register(ctx, !config.ControlConfig.Skips["coredns"], diff --git a/tests/unit.go b/tests/unit.go index 6e1ba5e5a2..ee76af0385 100644 --- a/tests/unit.go +++ b/tests/unit.go @@ -43,7 +43,7 @@ func CleanupDataDir(cnf *config.Control) { // GenerateRuntime creates a temporary data dir and configures // config.ControlRuntime with all the appropriate certificate keys. func GenerateRuntime(cnf *config.Control) error { - cnf.Runtime = &config.ControlRuntime{} + cnf.Runtime = config.NewRuntime(nil) if err := GenerateDataDir(cnf); err != nil { return err }