From 90445bd5812ae97707da192e2c42d7b65836ac88 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Tue, 27 Jul 2021 14:56:05 -0700 Subject: [PATCH] Wait until server is ready before configuring kube-proxy (#3716) Signed-off-by: Brad Davidson --- pkg/agent/config/config.go | 71 ++++++++++++++++++++++++++++++++----- pkg/agent/run.go | 18 +++++++--- pkg/daemons/agent/agent.go | 33 ++++++++++------- pkg/daemons/config/types.go | 3 +- pkg/server/router.go | 16 +++++++++ 5 files changed, 114 insertions(+), 27 deletions(-) diff --git a/pkg/agent/config/config.go b/pkg/agent/config/config.go index 347e70e891..077c494d70 100644 --- a/pkg/agent/config/config.go +++ b/pkg/agent/config/config.go @@ -38,22 +38,46 @@ const ( DefaultPodManifestPath = "pod-manifests" ) +// Get returns a pointer to a completed Node configuration struct, +// containing a merging of the local CLI configuration with settings from the server. +// A call to this will bock until agent configuration is successfully returned by the +// server. func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) *config.Node { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() +RETRY: for { agentConfig, err := get(ctx, &agent, proxy) if err != nil { - logrus.Errorf("Failed to configure agent: %v", err) - select { - case <-time.After(5 * time.Second): - continue - case <-ctx.Done(): - logrus.Fatalf("Interrupted") + logrus.Infof("Failed to retrieve agent configuration: %v", err) + for range ticker.C { + continue RETRY } } return agentConfig } } +// KubeProxyDisabled returns a bool indicating whether or not kube-proxy has been disabled in the +// server configuration. The server may not have a complete view of cluster configuration until +// after all startup hooks have completed, so a call to this will block until after the server's +// readyz endpoint returns OK. +func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy) bool { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() +RETRY: + for { + disabled, err := getKubeProxyDisabled(ctx, node, proxy) + if err != nil { + logrus.Infof("Failed to retrieve kube-proxy configuration: %v", err) + for range ticker.C { + continue RETRY + } + } + return disabled + } +} + type HTTPRequester func(u string, client *http.Client, username, password string) ([]byte, error) func Request(path string, info *clientaccess.Info, requester HTTPRequester) ([]byte, error) { @@ -419,6 +443,7 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint, FlannelBackend: controlConfig.FlannelBackend, ServerHTTPSPort: controlConfig.HTTPSPort, + Token: info.String(), } nodeConfig.FlannelIface = flannelIface nodeConfig.Images = filepath.Join(envInfo.DataDir, "agent", "images") @@ -440,7 +465,6 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N nodeConfig.AgentConfig.Snapshotter = envInfo.Snapshotter nodeConfig.AgentConfig.IPSECPSK = controlConfig.IPSECPSK nodeConfig.AgentConfig.StrongSwanDir = filepath.Join(envInfo.DataDir, "agent", "strongswan") - nodeConfig.CACerts = info.CACerts nodeConfig.Containerd.Config = filepath.Join(envInfo.DataDir, "agent", "etc", "containerd", "config.toml") nodeConfig.Containerd.Root = filepath.Join(envInfo.DataDir, "agent", "containerd") if !nodeConfig.Docker && nodeConfig.ContainerRuntimeEndpoint == "" { @@ -571,7 +595,6 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N nodeConfig.AgentConfig.PrivateRegistry = envInfo.PrivateRegistry nodeConfig.AgentConfig.DisableCCM = controlConfig.DisableCCM nodeConfig.AgentConfig.DisableNPC = controlConfig.DisableNPC - nodeConfig.AgentConfig.DisableKubeProxy = controlConfig.DisableKubeProxy nodeConfig.AgentConfig.Rootless = envInfo.Rootless nodeConfig.AgentConfig.PodManifests = filepath.Join(envInfo.DataDir, "agent", DefaultPodManifestPath) nodeConfig.AgentConfig.ProtectKernelDefaults = envInfo.ProtectKernelDefaults @@ -583,6 +606,32 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N return nodeConfig, nil } +// getKubeProxyDisabled attempts to return the DisableKubeProxy setting from the server configuration data. +// It first checks the server readyz endpoint, to ensure that the configuration has stabilized before use. +func getKubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy) (bool, error) { + info, err := clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token) + if err != nil { + return false, err + } + + // 500 error indicates that the health check has failed; other errors (for example 401 Unauthorized) + // indicate that the server is down-level and doesn't support readyz, so we should just use whatever + // the server has for us. + if err := getReadyz(info); err != nil && strings.HasSuffix(err.Error(), "500 Internal Server Error") { + return false, err + } + + controlConfig, err := getConfig(info) + if err != nil { + return false, errors.Wrap(err, "failed to retrieve configuration from server") + } + + return controlConfig.DisableKubeProxy, nil +} + +// getConfig returns server configuration data. Note that this may be mutated during system startup; anything that needs +// to ensure stable system state should check the readyz endpoint first. This is required because RKE2 starts up the +// kubelet early, before the apiserver is available. func getConfig(info *clientaccess.Info) (*config.Control, error) { data, err := info.Get("/v1-" + version.Program + "/config") if err != nil { @@ -593,6 +642,12 @@ func getConfig(info *clientaccess.Info) (*config.Control, error) { return controlControl, json.Unmarshal(data, controlControl) } +// getReadyz returns nil if the server is ready, or an error if not. +func getReadyz(info *clientaccess.Info) error { + _, err := info.Get("/v1-" + version.Program + "/readyz") + return err +} + // validateNetworkConfig ensures that the network configuration values provided by the server make sense. func validateNetworkConfig(nodeConfig *config.Node) error { // Old versions of the server do not send enough information to correctly start the NPC. Users diff --git a/pkg/agent/run.go b/pkg/agent/run.go index ac3df5db01..f16a1cc30d 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -327,14 +327,22 @@ func updateAddressAnnotations(agentConfig *daemonconfig.Agent, nodeAnnotations m // start the agent before the tunnel is setup to allow kubelet to start first and start the pods func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error { var agentRan bool + // IsAPIServerLBEnabled is used as a shortcut for detecting RKE2, where the kubelet needs to + // be run earlier in order to manage static pods. This should probably instead query a + // flag on the executor or something. if cfg.ETCDAgent { - // only in rke2 run the agent before the tunnel setup and check for that later in the function + // ETCDAgent is only set to true on servers that are started with --disable-apiserver. + // In this case, we may be running without an apiserver available in the cluster, and need + // to wait for one to register and post it's address into APIAddressCh so that we can update + // the LB proxy with its address. if proxy.IsAPIServerLBEnabled() { - if err := agent.Agent(&nodeConfig.AgentConfig); err != nil { + // On RKE2, the agent needs to be started early to run the etcd static pod. + if err := agent.Agent(ctx, nodeConfig, proxy); err != nil { return err } agentRan = true } + select { case address := <-cfg.APIAddressCh: cfg.ServerURL = address @@ -347,7 +355,9 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, return ctx.Err() } } else if cfg.ClusterReset && proxy.IsAPIServerLBEnabled() { - if err := agent.Agent(&nodeConfig.AgentConfig); err != nil { + // If we're doing a cluster-reset on RKE2, the kubelet needs to be started early to clean + // up static pods. + if err := agent.Agent(ctx, nodeConfig, proxy); err != nil { return err } agentRan = true @@ -357,7 +367,7 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, return err } if !agentRan { - return agent.Agent(&nodeConfig.AgentConfig) + return agent.Agent(ctx, nodeConfig, proxy) } return nil } diff --git a/pkg/daemons/agent/agent.go b/pkg/daemons/agent/agent.go index a03f4d1520..dc8097cfbd 100644 --- a/pkg/daemons/agent/agent.go +++ b/pkg/daemons/agent/agent.go @@ -1,11 +1,14 @@ package agent import ( + "context" "math/rand" "os" "time" - "github.com/rancher/k3s/pkg/daemons/config" + "github.com/rancher/k3s/pkg/agent/config" + "github.com/rancher/k3s/pkg/agent/proxy" + daemonconfig "github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/daemons/executor" "github.com/sirupsen/logrus" "k8s.io/component-base/logs" @@ -18,34 +21,38 @@ const ( windowsPrefix = "npipe://" ) -func Agent(config *config.Agent) error { +func Agent(ctx context.Context, nodeConfig *daemonconfig.Node, proxy proxy.Proxy) error { rand.Seed(time.Now().UTC().UnixNano()) logs.InitLogs() defer logs.FlushLogs() - if err := startKubelet(config); err != nil { + if err := startKubelet(&nodeConfig.AgentConfig); err != nil { return err } - if !config.DisableKubeProxy { - return startKubeProxy(config) - } + go func() { + if !config.KubeProxyDisabled(ctx, nodeConfig, proxy) { + if err := startKubeProxy(&nodeConfig.AgentConfig); err != nil { + logrus.Fatalf("Failed to start kube-proxy: %v", err) + } + } + }() return nil } -func startKubeProxy(cfg *config.Agent) error { +func startKubeProxy(cfg *daemonconfig.Agent) error { argsMap := kubeProxyArgs(cfg) - args := config.GetArgsList(argsMap, cfg.ExtraKubeProxyArgs) - logrus.Infof("Running kube-proxy %s", config.ArgString(args)) + args := daemonconfig.GetArgsList(argsMap, cfg.ExtraKubeProxyArgs) + logrus.Infof("Running kube-proxy %s", daemonconfig.ArgString(args)) return executor.KubeProxy(args) } -func startKubelet(cfg *config.Agent) error { +func startKubelet(cfg *daemonconfig.Agent) error { argsMap := kubeletArgs(cfg) - args := config.GetArgsList(argsMap, cfg.ExtraKubeletArgs) - logrus.Infof("Running kubelet %s", config.ArgString(args)) + args := daemonconfig.GetArgsList(argsMap, cfg.ExtraKubeletArgs) + logrus.Infof("Running kubelet %s", daemonconfig.ArgString(args)) return executor.Kubelet(args) } @@ -59,7 +66,7 @@ func addFeatureGate(current, new string) string { // ImageCredProvAvailable checks to see if the kubelet image credential provider bin dir and config // files exist and are of the correct types. This is exported so that it may be used by downstream projects. -func ImageCredProvAvailable(cfg *config.Agent) bool { +func ImageCredProvAvailable(cfg *daemonconfig.Agent) bool { if info, err := os.Stat(cfg.ImageCredProvBinDir); err != nil || !info.IsDir() { logrus.Debugf("Kubelet image credential provider bin directory check failed: %v", err) return false diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index dfd882a168..c4c1cc5ebe 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -36,7 +36,7 @@ type Node struct { Containerd Containerd Images string AgentConfig Agent - CACerts []byte + Token string Certificate *tls.Certificate ServerHTTPSPort int } @@ -96,7 +96,6 @@ type Agent struct { AirgapExtraRegistry []string DisableCCM bool DisableNPC bool - DisableKubeProxy bool Rootless bool ProtectKernelDefaults bool } diff --git a/pkg/server/router.go b/pkg/server/router.go index 33195691db..6f2d636031 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -46,6 +46,7 @@ func router(ctx context.Context, config *Config) http.Handler { authed.Path(prefix + "/client-ca.crt").Handler(fileHandler(serverConfig.Runtime.ClientCA)) authed.Path(prefix + "/server-ca.crt").Handler(fileHandler(serverConfig.Runtime.ServerCA)) authed.Path(prefix + "/config").Handler(configHandler(serverConfig)) + authed.Path(prefix + "/readyz").Handler(readyzHandler(serverConfig)) nodeAuthed := mux.NewRouter() nodeAuthed.Use(authMiddleware(serverConfig, "system:nodes")) @@ -266,6 +267,21 @@ func configHandler(server *config.Control) http.Handler { }) } +func readyzHandler(server *config.Control) http.Handler { + return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { + code := http.StatusOK + data := []byte("ok") + if server.Runtime.Core == nil { + code = http.StatusInternalServerError + data = []byte("runtime core not ready") + } + resp.WriteHeader(code) + resp.Header().Set("Content-Type", "text/plain") + resp.Header().Set("Content-length", strconv.Itoa(len(data))) + resp.Write(data) + }) +} + func ping() http.Handler { return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { data := []byte("pong")