Wait until server is ready before configuring kube-proxy (#3716)

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2021-07-27 14:56:05 -07:00 committed by GitHub
parent 21c8a33647
commit 90445bd581
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 114 additions and 27 deletions

View File

@ -38,22 +38,46 @@ const (
DefaultPodManifestPath = "pod-manifests" DefaultPodManifestPath = "pod-manifests"
) )
// Get returns a pointer to a completed Node configuration struct,
// containing a merging of the local CLI configuration with settings from the server.
// A call to this will bock until agent configuration is successfully returned by the
// server.
func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) *config.Node { func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) *config.Node {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
RETRY:
for { for {
agentConfig, err := get(ctx, &agent, proxy) agentConfig, err := get(ctx, &agent, proxy)
if err != nil { if err != nil {
logrus.Errorf("Failed to configure agent: %v", err) logrus.Infof("Failed to retrieve agent configuration: %v", err)
select { for range ticker.C {
case <-time.After(5 * time.Second): continue RETRY
continue
case <-ctx.Done():
logrus.Fatalf("Interrupted")
} }
} }
return agentConfig return agentConfig
} }
} }
// KubeProxyDisabled returns a bool indicating whether or not kube-proxy has been disabled in the
// server configuration. The server may not have a complete view of cluster configuration until
// after all startup hooks have completed, so a call to this will block until after the server's
// readyz endpoint returns OK.
func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy) bool {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
RETRY:
for {
disabled, err := getKubeProxyDisabled(ctx, node, proxy)
if err != nil {
logrus.Infof("Failed to retrieve kube-proxy configuration: %v", err)
for range ticker.C {
continue RETRY
}
}
return disabled
}
}
type HTTPRequester func(u string, client *http.Client, username, password string) ([]byte, error) type HTTPRequester func(u string, client *http.Client, username, password string) ([]byte, error)
func Request(path string, info *clientaccess.Info, requester HTTPRequester) ([]byte, error) { func Request(path string, info *clientaccess.Info, requester HTTPRequester) ([]byte, error) {
@ -419,6 +443,7 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint, ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint,
FlannelBackend: controlConfig.FlannelBackend, FlannelBackend: controlConfig.FlannelBackend,
ServerHTTPSPort: controlConfig.HTTPSPort, ServerHTTPSPort: controlConfig.HTTPSPort,
Token: info.String(),
} }
nodeConfig.FlannelIface = flannelIface nodeConfig.FlannelIface = flannelIface
nodeConfig.Images = filepath.Join(envInfo.DataDir, "agent", "images") nodeConfig.Images = filepath.Join(envInfo.DataDir, "agent", "images")
@ -440,7 +465,6 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
nodeConfig.AgentConfig.Snapshotter = envInfo.Snapshotter nodeConfig.AgentConfig.Snapshotter = envInfo.Snapshotter
nodeConfig.AgentConfig.IPSECPSK = controlConfig.IPSECPSK nodeConfig.AgentConfig.IPSECPSK = controlConfig.IPSECPSK
nodeConfig.AgentConfig.StrongSwanDir = filepath.Join(envInfo.DataDir, "agent", "strongswan") nodeConfig.AgentConfig.StrongSwanDir = filepath.Join(envInfo.DataDir, "agent", "strongswan")
nodeConfig.CACerts = info.CACerts
nodeConfig.Containerd.Config = filepath.Join(envInfo.DataDir, "agent", "etc", "containerd", "config.toml") nodeConfig.Containerd.Config = filepath.Join(envInfo.DataDir, "agent", "etc", "containerd", "config.toml")
nodeConfig.Containerd.Root = filepath.Join(envInfo.DataDir, "agent", "containerd") nodeConfig.Containerd.Root = filepath.Join(envInfo.DataDir, "agent", "containerd")
if !nodeConfig.Docker && nodeConfig.ContainerRuntimeEndpoint == "" { if !nodeConfig.Docker && nodeConfig.ContainerRuntimeEndpoint == "" {
@ -571,7 +595,6 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
nodeConfig.AgentConfig.PrivateRegistry = envInfo.PrivateRegistry nodeConfig.AgentConfig.PrivateRegistry = envInfo.PrivateRegistry
nodeConfig.AgentConfig.DisableCCM = controlConfig.DisableCCM nodeConfig.AgentConfig.DisableCCM = controlConfig.DisableCCM
nodeConfig.AgentConfig.DisableNPC = controlConfig.DisableNPC nodeConfig.AgentConfig.DisableNPC = controlConfig.DisableNPC
nodeConfig.AgentConfig.DisableKubeProxy = controlConfig.DisableKubeProxy
nodeConfig.AgentConfig.Rootless = envInfo.Rootless nodeConfig.AgentConfig.Rootless = envInfo.Rootless
nodeConfig.AgentConfig.PodManifests = filepath.Join(envInfo.DataDir, "agent", DefaultPodManifestPath) nodeConfig.AgentConfig.PodManifests = filepath.Join(envInfo.DataDir, "agent", DefaultPodManifestPath)
nodeConfig.AgentConfig.ProtectKernelDefaults = envInfo.ProtectKernelDefaults nodeConfig.AgentConfig.ProtectKernelDefaults = envInfo.ProtectKernelDefaults
@ -583,6 +606,32 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
return nodeConfig, nil return nodeConfig, nil
} }
// getKubeProxyDisabled attempts to return the DisableKubeProxy setting from the server configuration data.
// It first checks the server readyz endpoint, to ensure that the configuration has stabilized before use.
func getKubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy) (bool, error) {
info, err := clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token)
if err != nil {
return false, err
}
// 500 error indicates that the health check has failed; other errors (for example 401 Unauthorized)
// indicate that the server is down-level and doesn't support readyz, so we should just use whatever
// the server has for us.
if err := getReadyz(info); err != nil && strings.HasSuffix(err.Error(), "500 Internal Server Error") {
return false, err
}
controlConfig, err := getConfig(info)
if err != nil {
return false, errors.Wrap(err, "failed to retrieve configuration from server")
}
return controlConfig.DisableKubeProxy, nil
}
// getConfig returns server configuration data. Note that this may be mutated during system startup; anything that needs
// to ensure stable system state should check the readyz endpoint first. This is required because RKE2 starts up the
// kubelet early, before the apiserver is available.
func getConfig(info *clientaccess.Info) (*config.Control, error) { func getConfig(info *clientaccess.Info) (*config.Control, error) {
data, err := info.Get("/v1-" + version.Program + "/config") data, err := info.Get("/v1-" + version.Program + "/config")
if err != nil { if err != nil {
@ -593,6 +642,12 @@ func getConfig(info *clientaccess.Info) (*config.Control, error) {
return controlControl, json.Unmarshal(data, controlControl) return controlControl, json.Unmarshal(data, controlControl)
} }
// getReadyz returns nil if the server is ready, or an error if not.
func getReadyz(info *clientaccess.Info) error {
_, err := info.Get("/v1-" + version.Program + "/readyz")
return err
}
// validateNetworkConfig ensures that the network configuration values provided by the server make sense. // validateNetworkConfig ensures that the network configuration values provided by the server make sense.
func validateNetworkConfig(nodeConfig *config.Node) error { func validateNetworkConfig(nodeConfig *config.Node) error {
// Old versions of the server do not send enough information to correctly start the NPC. Users // Old versions of the server do not send enough information to correctly start the NPC. Users

View File

@ -327,14 +327,22 @@ func updateAddressAnnotations(agentConfig *daemonconfig.Agent, nodeAnnotations m
// start the agent before the tunnel is setup to allow kubelet to start first and start the pods // start the agent before the tunnel is setup to allow kubelet to start first and start the pods
func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error { func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
var agentRan bool var agentRan bool
// IsAPIServerLBEnabled is used as a shortcut for detecting RKE2, where the kubelet needs to
// be run earlier in order to manage static pods. This should probably instead query a
// flag on the executor or something.
if cfg.ETCDAgent { if cfg.ETCDAgent {
// only in rke2 run the agent before the tunnel setup and check for that later in the function // ETCDAgent is only set to true on servers that are started with --disable-apiserver.
// In this case, we may be running without an apiserver available in the cluster, and need
// to wait for one to register and post it's address into APIAddressCh so that we can update
// the LB proxy with its address.
if proxy.IsAPIServerLBEnabled() { if proxy.IsAPIServerLBEnabled() {
if err := agent.Agent(&nodeConfig.AgentConfig); err != nil { // On RKE2, the agent needs to be started early to run the etcd static pod.
if err := agent.Agent(ctx, nodeConfig, proxy); err != nil {
return err return err
} }
agentRan = true agentRan = true
} }
select { select {
case address := <-cfg.APIAddressCh: case address := <-cfg.APIAddressCh:
cfg.ServerURL = address cfg.ServerURL = address
@ -347,7 +355,9 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node,
return ctx.Err() return ctx.Err()
} }
} else if cfg.ClusterReset && proxy.IsAPIServerLBEnabled() { } else if cfg.ClusterReset && proxy.IsAPIServerLBEnabled() {
if err := agent.Agent(&nodeConfig.AgentConfig); err != nil { // If we're doing a cluster-reset on RKE2, the kubelet needs to be started early to clean
// up static pods.
if err := agent.Agent(ctx, nodeConfig, proxy); err != nil {
return err return err
} }
agentRan = true agentRan = true
@ -357,7 +367,7 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node,
return err return err
} }
if !agentRan { if !agentRan {
return agent.Agent(&nodeConfig.AgentConfig) return agent.Agent(ctx, nodeConfig, proxy)
} }
return nil return nil
} }

View File

@ -1,11 +1,14 @@
package agent package agent
import ( import (
"context"
"math/rand" "math/rand"
"os" "os"
"time" "time"
"github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/agent/config"
"github.com/rancher/k3s/pkg/agent/proxy"
daemonconfig "github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/daemons/executor" "github.com/rancher/k3s/pkg/daemons/executor"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"k8s.io/component-base/logs" "k8s.io/component-base/logs"
@ -18,34 +21,38 @@ const (
windowsPrefix = "npipe://" windowsPrefix = "npipe://"
) )
func Agent(config *config.Agent) error { func Agent(ctx context.Context, nodeConfig *daemonconfig.Node, proxy proxy.Proxy) error {
rand.Seed(time.Now().UTC().UnixNano()) rand.Seed(time.Now().UTC().UnixNano())
logs.InitLogs() logs.InitLogs()
defer logs.FlushLogs() defer logs.FlushLogs()
if err := startKubelet(config); err != nil { if err := startKubelet(&nodeConfig.AgentConfig); err != nil {
return err return err
} }
if !config.DisableKubeProxy { go func() {
return startKubeProxy(config) if !config.KubeProxyDisabled(ctx, nodeConfig, proxy) {
} if err := startKubeProxy(&nodeConfig.AgentConfig); err != nil {
logrus.Fatalf("Failed to start kube-proxy: %v", err)
}
}
}()
return nil return nil
} }
func startKubeProxy(cfg *config.Agent) error { func startKubeProxy(cfg *daemonconfig.Agent) error {
argsMap := kubeProxyArgs(cfg) argsMap := kubeProxyArgs(cfg)
args := config.GetArgsList(argsMap, cfg.ExtraKubeProxyArgs) args := daemonconfig.GetArgsList(argsMap, cfg.ExtraKubeProxyArgs)
logrus.Infof("Running kube-proxy %s", config.ArgString(args)) logrus.Infof("Running kube-proxy %s", daemonconfig.ArgString(args))
return executor.KubeProxy(args) return executor.KubeProxy(args)
} }
func startKubelet(cfg *config.Agent) error { func startKubelet(cfg *daemonconfig.Agent) error {
argsMap := kubeletArgs(cfg) argsMap := kubeletArgs(cfg)
args := config.GetArgsList(argsMap, cfg.ExtraKubeletArgs) args := daemonconfig.GetArgsList(argsMap, cfg.ExtraKubeletArgs)
logrus.Infof("Running kubelet %s", config.ArgString(args)) logrus.Infof("Running kubelet %s", daemonconfig.ArgString(args))
return executor.Kubelet(args) return executor.Kubelet(args)
} }
@ -59,7 +66,7 @@ func addFeatureGate(current, new string) string {
// ImageCredProvAvailable checks to see if the kubelet image credential provider bin dir and config // ImageCredProvAvailable checks to see if the kubelet image credential provider bin dir and config
// files exist and are of the correct types. This is exported so that it may be used by downstream projects. // files exist and are of the correct types. This is exported so that it may be used by downstream projects.
func ImageCredProvAvailable(cfg *config.Agent) bool { func ImageCredProvAvailable(cfg *daemonconfig.Agent) bool {
if info, err := os.Stat(cfg.ImageCredProvBinDir); err != nil || !info.IsDir() { if info, err := os.Stat(cfg.ImageCredProvBinDir); err != nil || !info.IsDir() {
logrus.Debugf("Kubelet image credential provider bin directory check failed: %v", err) logrus.Debugf("Kubelet image credential provider bin directory check failed: %v", err)
return false return false

View File

@ -36,7 +36,7 @@ type Node struct {
Containerd Containerd Containerd Containerd
Images string Images string
AgentConfig Agent AgentConfig Agent
CACerts []byte Token string
Certificate *tls.Certificate Certificate *tls.Certificate
ServerHTTPSPort int ServerHTTPSPort int
} }
@ -96,7 +96,6 @@ type Agent struct {
AirgapExtraRegistry []string AirgapExtraRegistry []string
DisableCCM bool DisableCCM bool
DisableNPC bool DisableNPC bool
DisableKubeProxy bool
Rootless bool Rootless bool
ProtectKernelDefaults bool ProtectKernelDefaults bool
} }

View File

@ -46,6 +46,7 @@ func router(ctx context.Context, config *Config) http.Handler {
authed.Path(prefix + "/client-ca.crt").Handler(fileHandler(serverConfig.Runtime.ClientCA)) authed.Path(prefix + "/client-ca.crt").Handler(fileHandler(serverConfig.Runtime.ClientCA))
authed.Path(prefix + "/server-ca.crt").Handler(fileHandler(serverConfig.Runtime.ServerCA)) authed.Path(prefix + "/server-ca.crt").Handler(fileHandler(serverConfig.Runtime.ServerCA))
authed.Path(prefix + "/config").Handler(configHandler(serverConfig)) authed.Path(prefix + "/config").Handler(configHandler(serverConfig))
authed.Path(prefix + "/readyz").Handler(readyzHandler(serverConfig))
nodeAuthed := mux.NewRouter() nodeAuthed := mux.NewRouter()
nodeAuthed.Use(authMiddleware(serverConfig, "system:nodes")) nodeAuthed.Use(authMiddleware(serverConfig, "system:nodes"))
@ -266,6 +267,21 @@ func configHandler(server *config.Control) http.Handler {
}) })
} }
func readyzHandler(server *config.Control) http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
code := http.StatusOK
data := []byte("ok")
if server.Runtime.Core == nil {
code = http.StatusInternalServerError
data = []byte("runtime core not ready")
}
resp.WriteHeader(code)
resp.Header().Set("Content-Type", "text/plain")
resp.Header().Set("Content-length", strconv.Itoa(len(data)))
resp.Write(data)
})
}
func ping() http.Handler { func ping() http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
data := []byte("pong") data := []byte("pong")