Allow for multiple sets of leader-elected controllers

Addresses an issue where etcd controllers did not run on etcd-only nodes

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2023-02-08 00:37:10 +00:00 committed by Brad Davidson
parent 0d416d797d
commit 3d146d2f1b
12 changed files with 110 additions and 75 deletions

View File

@ -70,7 +70,7 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) (string
}
sc.ControlConfig.Token = cfg.Token
sc.ControlConfig.Runtime = &config.ControlRuntime{}
sc.ControlConfig.Runtime = config.NewRuntime(nil)
return dataDir, nil
}
@ -292,7 +292,7 @@ func rotateCA(app *cli.Context, cfg *cmds.Server, sync *cmds.CertRotateCA) error
// Set up dummy server config for reading new bootstrap data from disk.
tmpServer := &config.Control{
Runtime: &config.ControlRuntime{},
Runtime: config.NewRuntime(nil),
DataDir: filepath.Dir(sync.CACertPath),
}
deps.CreateRuntimeCertFiles(tmpServer)

View File

@ -60,7 +60,7 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) error {
sc.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder
sc.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure
sc.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout
sc.ControlConfig.Runtime = &config.ControlRuntime{}
sc.ControlConfig.Runtime = config.NewRuntime(nil)
sc.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt")
sc.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt")
sc.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key")

View File

@ -95,7 +95,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig := server.Config{}
serverConfig.DisableAgent = cfg.DisableAgent
serverConfig.ControlConfig.Runtime = &config.ControlRuntime{AgentReady: agentReady}
serverConfig.ControlConfig.Runtime = config.NewRuntime(agentReady)
serverConfig.ControlConfig.Token = cfg.Token
serverConfig.ControlConfig.AgentToken = cfg.AgentToken
serverConfig.ControlConfig.JoinURL = cfg.ServerURL

View File

@ -1,7 +1,6 @@
package config
import (
"context"
"crypto/tls"
"fmt"
"net"
@ -14,6 +13,7 @@ import (
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/kine/pkg/endpoint"
"github.com/rancher/wrangler/pkg/generated/controllers/core"
"github.com/rancher/wrangler/pkg/leader"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/authentication/authenticator"
utilsnet "k8s.io/utils/net"
@ -276,13 +276,13 @@ type ControlRuntimeBootstrap struct {
type ControlRuntime struct {
ControlRuntimeBootstrap
HTTPBootstrap bool
APIServerReady <-chan struct{}
AgentReady <-chan struct{}
ETCDReady <-chan struct{}
StartupHooksWg *sync.WaitGroup
ClusterControllerStart func(ctx context.Context) error
LeaderElectedClusterControllerStart func(ctx context.Context) error
HTTPBootstrap bool
APIServerReady <-chan struct{}
AgentReady <-chan struct{}
ETCDReady <-chan struct{}
StartupHooksWg *sync.WaitGroup
ClusterControllerStarts map[string]leader.Callback
LeaderElectedClusterControllerStarts map[string]leader.Callback
ClientKubeAPICert string
ClientKubeAPIKey string
@ -339,6 +339,14 @@ type ControlRuntime struct {
EtcdConfig endpoint.ETCDConfig
}
func NewRuntime(agentReady <-chan struct{}) *ControlRuntime {
return &ControlRuntime{
AgentReady: agentReady,
ClusterControllerStarts: map[string]leader.Callback{},
LeaderElectedClusterControllerStarts: map[string]leader.Callback{},
}
}
type ArgString []string
func (a ArgString) String() string {

View File

@ -17,10 +17,6 @@ import (
)
func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) {
if etcd.config.DisableAPIServer {
return
}
endpoints := etcd.config.Runtime.Core.Core().V1().Endpoints()
fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
lw := &cache.ListWatch{

View File

@ -549,30 +549,39 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt
e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert
e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey
tombstoneFile := filepath.Join(DBDir(e.config), "tombstone")
if _, err := os.Stat(tombstoneFile); err == nil {
logrus.Infof("tombstone file has been detected, removing data dir to rejoin the cluster")
if _, err := backupDirWithRetention(DBDir(e.config), maxBackupRetention); err != nil {
return nil, err
e.config.Runtime.ClusterControllerStarts["etcd-node-metadata"] = func(ctx context.Context) {
registerMetadataHandlers(ctx, e)
}
// The apiserver endpoint controller needs to run on a node with a local apiserver,
// in order to successfully seed etcd with the endpoint list.
if !e.config.DisableAPIServer {
e.config.Runtime.LeaderElectedClusterControllerStarts["etcd-apiserver-endpoints"] = func(ctx context.Context) {
registerEndpointsHandlers(ctx, e)
}
}
if err := e.setName(false); err != nil {
return nil, err
// The etcd member-removal controllers should only run on an etcd node. Tombstone file checking
// is also unnecessary if we're not running etcd.
if !e.config.DisableETCD {
tombstoneFile := filepath.Join(DBDir(e.config), "tombstone")
if _, err := os.Stat(tombstoneFile); err == nil {
logrus.Infof("tombstone file has been detected, removing data dir to rejoin the cluster")
if _, err := backupDirWithRetention(DBDir(e.config), maxBackupRetention); err != nil {
return nil, err
}
}
if err := e.setName(false); err != nil {
return nil, err
}
e.config.Runtime.LeaderElectedClusterControllerStarts["etcd-member-removal"] = func(ctx context.Context) {
registerMemberHandlers(ctx, e)
}
}
e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error {
registerMetadataHandlers(ctx, e)
return nil
}
e.config.Runtime.LeaderElectedClusterControllerStart = func(ctx context.Context) error {
registerMemberHandlers(ctx, e)
registerEndpointsHandlers(ctx, e)
return nil
}
return e.handler(handler), err
return e.handler(handler), nil
}
// setName sets a unique name for this cluster member. The first time this is called,

View File

@ -37,7 +37,7 @@ func generateTestConfig() *config.Control {
ServiceIPRange: testutil.ServiceIPNet(),
}
return &config.Control{
Runtime: &config.ControlRuntime{AgentReady: agentReady},
Runtime: config.NewRuntime(agentReady),
HTTPSPort: 6443,
SupervisorPort: 6443,
AdvertisePort: 6443,

View File

@ -12,10 +12,6 @@ import (
)
func registerMemberHandlers(ctx context.Context, etcd *ETCD) {
if etcd.config.DisableETCD {
return
}
nodes := etcd.config.Runtime.Core.Core().V1().Node()
e := &etcdMemberHandler{
etcd: etcd,

View File

@ -11,10 +11,6 @@ import (
)
func registerMetadataHandlers(ctx context.Context, etcd *ETCD) {
if etcd.config.DisableETCD {
return
}
nodes := etcd.config.Runtime.Core.Core().V1().Node()
h := &metadataHandler{
etcd: etcd,
@ -22,7 +18,7 @@ func registerMetadataHandlers(ctx context.Context, etcd *ETCD) {
ctx: ctx,
}
logrus.Infof("Starting managed etcd node label controller")
logrus.Infof("Starting managed etcd node metadata controller")
nodes.OnChange(ctx, "managed-etcd-metadata-controller", h.sync)
}
@ -39,7 +35,7 @@ func (m *metadataHandler) sync(key string, node *v1.Node) (*v1.Node, error) {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
logrus.Debug("waiting for node name to be assigned for managed etcd node label controller")
logrus.Debug("waiting for node name to be assigned for managed etcd node metadata controller")
m.nodeController.EnqueueAfter(key, 5*time.Second)
return node, nil
}
@ -52,10 +48,31 @@ func (m *metadataHandler) sync(key string, node *v1.Node) (*v1.Node, error) {
}
func (m *metadataHandler) handleSelf(node *v1.Node) (*v1.Node, error) {
if m.etcd.config.DisableETCD {
if node.Annotations[NodeNameAnnotation] == "" &&
node.Annotations[NodeAddressAnnotation] == "" &&
node.Labels[EtcdRoleLabel] == "" {
return node, nil
}
node = node.DeepCopy()
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
if node.Labels == nil {
node.Labels = map[string]string{}
}
delete(node.Annotations, NodeNameAnnotation)
delete(node.Annotations, NodeAddressAnnotation)
delete(node.Labels, EtcdRoleLabel)
return m.nodeController.Update(node)
}
if node.Annotations[NodeNameAnnotation] == m.etcd.name &&
node.Annotations[NodeAddressAnnotation] == m.etcd.address &&
node.Labels[EtcdRoleLabel] == "true" &&
node.Labels[ControlPlaneLabel] == "true" {
node.Labels[EtcdRoleLabel] == "true" {
return node, nil
}
@ -63,12 +80,13 @@ func (m *metadataHandler) handleSelf(node *v1.Node) (*v1.Node, error) {
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
if node.Labels == nil {
node.Labels = map[string]string{}
}
node.Annotations[NodeNameAnnotation] = m.etcd.name
node.Annotations[NodeAddressAnnotation] = m.etcd.address
node.Labels[EtcdRoleLabel] = "true"
node.Labels[MasterLabel] = "true"
node.Labels[ControlPlaneLabel] = "true"
return m.nodeController.Update(node)
}

View File

@ -54,11 +54,12 @@ func caCertReplace(server *config.Control, buf io.ReadCloser, force bool) error
}
defer os.RemoveAll(tmpdir)
runtime := config.NewRuntime(nil)
runtime.EtcdConfig = server.Runtime.EtcdConfig
runtime.ServerToken = server.Runtime.ServerToken
tmpServer := &config.Control{
Runtime: &config.ControlRuntime{
EtcdConfig: server.Runtime.EtcdConfig,
ServerToken: server.Runtime.ServerToken,
},
Runtime: runtime,
Token: server.Token,
DataDir: tmpdir,
}

View File

@ -6,6 +6,7 @@ import (
"os"
"path"
"path/filepath"
"runtime/debug"
"strconv"
"strings"
"sync"
@ -123,15 +124,13 @@ func runControllers(ctx context.Context, config *Config) error {
}
controlConfig.Runtime.Core = sc.Core
if controlConfig.Runtime.ClusterControllerStart != nil {
if err := controlConfig.Runtime.ClusterControllerStart(ctx); err != nil {
return errors.Wrap(err, "failed to start cluster controllers")
}
for name, cb := range controlConfig.Runtime.ClusterControllerStarts {
go runOrDie(ctx, name, cb)
}
for _, controller := range config.Controllers {
if err := controller(ctx, sc); err != nil {
return errors.Wrapf(err, "failed to start custom controller %s", util.GetFunctionName(controller))
return errors.Wrapf(err, "failed to start %s controller", util.GetFunctionName(controller))
}
}
@ -139,18 +138,16 @@ func runControllers(ctx context.Context, config *Config) error {
return errors.Wrap(err, "failed to start wranger controllers")
}
start := func(ctx context.Context) {
controlConfig.Runtime.LeaderElectedClusterControllerStarts[version.Program] = func(ctx context.Context) {
if controlConfig.DisableAPIServer {
return
}
if err := coreControllers(ctx, sc, config); err != nil {
panic(err)
}
if controlConfig.Runtime.LeaderElectedClusterControllerStart != nil {
if err := controlConfig.Runtime.LeaderElectedClusterControllerStart(ctx); err != nil {
panic(errors.Wrap(err, "failed to start leader elected cluster controllers"))
}
}
for _, controller := range config.LeaderControllers {
if err := controller(ctx, sc); err != nil {
panic(errors.Wrap(err, "leader controller"))
panic(errors.Wrapf(err, "failed to start %s leader controller", util.GetFunctionName(controller)))
}
}
if err := sc.Start(ctx); err != nil {
@ -163,20 +160,30 @@ func runControllers(ctx context.Context, config *Config) error {
go setClusterDNSConfig(ctx, config, sc.Core.Core().V1().ConfigMap())
if controlConfig.NoLeaderElect {
go func() {
start(ctx)
<-ctx.Done()
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
logrus.Fatalf("controllers exited: %v", err)
}
}()
for name, cb := range controlConfig.Runtime.LeaderElectedClusterControllerStarts {
go runOrDie(ctx, name, cb)
}
} else {
go leader.RunOrDie(ctx, "", version.Program, sc.K8s, start)
for name, cb := range controlConfig.Runtime.LeaderElectedClusterControllerStarts {
go leader.RunOrDie(ctx, "", name, sc.K8s, cb)
}
}
return nil
}
// runOrDie is similar to leader.RunOrDie, except that it runs the callback
// immediately, without performing leader election.
func runOrDie(ctx context.Context, name string, cb leader.Callback) {
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", debug.Stack()).Fatalf("%s controller panic: %v", name, err)
}
}()
cb(ctx)
<-ctx.Done()
}
func coreControllers(ctx context.Context, sc *Context, config *Config) error {
if err := node.Register(ctx,
!config.ControlConfig.Skips["coredns"],

View File

@ -43,7 +43,7 @@ func CleanupDataDir(cnf *config.Control) {
// GenerateRuntime creates a temporary data dir and configures
// config.ControlRuntime with all the appropriate certificate keys.
func GenerateRuntime(cnf *config.Control) error {
cnf.Runtime = &config.ControlRuntime{}
cnf.Runtime = config.NewRuntime(nil)
if err := GenerateDataDir(cnf); err != nil {
return err
}