diff --git a/pkg/agent/loadbalancer/loadbalancer.go b/pkg/agent/loadbalancer/loadbalancer.go index e072bb1b58..f3d10815a7 100644 --- a/pkg/agent/loadbalancer/loadbalancer.go +++ b/pkg/agent/loadbalancer/loadbalancer.go @@ -19,16 +19,17 @@ type LoadBalancer struct { dialer *net.Dialer proxy *tcpproxy.Proxy - configFile string - localAddress string - localServerURL string - originalServerAddress string - ServerURL string - ServerAddresses []string - randomServers []string - currentServerAddress string - nextServerIndex int - Listener net.Listener + serviceName string + configFile string + localAddress string + localServerURL string + defaultServerAddress string + ServerURL string + ServerAddresses []string + randomServers []string + currentServerAddress string + nextServerIndex int + Listener net.Listener } const RandomPort = 0 @@ -55,26 +56,27 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo } localAddress := listener.Addr().String() - originalServerAddress, localServerURL, err := parseURL(serverURL, localAddress) + defaultServerAddress, localServerURL, err := parseURL(serverURL, localAddress) if err != nil { return nil, err } if serverURL == localServerURL { - logrus.Debugf("Initial server URL for load balancer points at local server URL - starting with empty original server address") - originalServerAddress = "" + logrus.Debugf("Initial server URL for load balancer %s points at local server URL - starting with empty default server address", serviceName) + defaultServerAddress = "" } lb := &LoadBalancer{ - dialer: &net.Dialer{}, - configFile: filepath.Join(dataDir, "etc", serviceName+".json"), - localAddress: localAddress, - localServerURL: localServerURL, - originalServerAddress: originalServerAddress, - ServerURL: serverURL, + serviceName: serviceName, + dialer: &net.Dialer{}, + configFile: filepath.Join(dataDir, "etc", serviceName+".json"), + localAddress: localAddress, + localServerURL: localServerURL, + defaultServerAddress: defaultServerAddress, + ServerURL: serverURL, } - lb.setServers([]string{lb.originalServerAddress}) + lb.setServers([]string{lb.defaultServerAddress}) lb.proxy = &tcpproxy.Proxy{ ListenFunc: func(string, string) (net.Listener, error) { @@ -93,11 +95,16 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo if err := lb.proxy.Start(); err != nil { return nil, err } - logrus.Infof("Running load balancer %s -> %v", lb.localAddress, lb.randomServers) + logrus.Infof("Running load balancer %s %s -> %v", serviceName, lb.localAddress, lb.randomServers) return lb, nil } +func (lb *LoadBalancer) SetDefault(serverAddress string) { + logrus.Infof("Updating load balancer %s default server address -> %s", lb.serviceName, serverAddress) + lb.defaultServerAddress = serverAddress +} + func (lb *LoadBalancer) Update(serverAddresses []string) { if lb == nil { return @@ -105,10 +112,10 @@ func (lb *LoadBalancer) Update(serverAddresses []string) { if !lb.setServers(serverAddresses) { return } - logrus.Infof("Updating load balancer server addresses -> %v", lb.randomServers) + logrus.Infof("Updating load balancer %s server addresses -> %v", lb.serviceName, lb.randomServers) if err := lb.writeConfig(); err != nil { - logrus.Warnf("Error updating load balancer config: %s", err) + logrus.Warnf("Error updating load balancer %s config: %s", lb.serviceName, err) } } @@ -128,14 +135,14 @@ func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string if err == nil { return conn, nil } - logrus.Debugf("Dial error from load balancer: %s", err) + logrus.Debugf("Dial error from load balancer %s: %s", lb.serviceName, err) newServer, err := lb.nextServer(targetServer) if err != nil { return nil, err } if targetServer != newServer { - logrus.Debugf("Dial server in load balancer failed over to %s", newServer) + logrus.Debugf("Dial server in load balancer %s failed over to %s", lb.serviceName, newServer) } if ctx.Err() != nil { return nil, ctx.Err() @@ -156,7 +163,7 @@ func onDialError(src net.Conn, dstDialErr error) { src.Close() } -// ResetLoadBalancer will delete the local state file for the load balacner on disk +// ResetLoadBalancer will delete the local state file for the load balancer on disk func ResetLoadBalancer(dataDir, serviceName string) error { stateFile := filepath.Join(dataDir, "etc", serviceName+".json") if err := os.Remove(stateFile); err != nil { diff --git a/pkg/agent/loadbalancer/servers.go b/pkg/agent/loadbalancer/servers.go index cb0393e733..e0de81034b 100644 --- a/pkg/agent/loadbalancer/servers.go +++ b/pkg/agent/loadbalancer/servers.go @@ -7,7 +7,7 @@ import ( ) func (lb *LoadBalancer) setServers(serverAddresses []string) bool { - serverAddresses, hasOriginalServer := sortServers(serverAddresses, lb.originalServerAddress) + serverAddresses, hasOriginalServer := sortServers(serverAddresses, lb.defaultServerAddress) if len(serverAddresses) == 0 { return false } @@ -25,7 +25,7 @@ func (lb *LoadBalancer) setServers(serverAddresses []string) bool { lb.randomServers[i], lb.randomServers[j] = lb.randomServers[j], lb.randomServers[i] }) if !hasOriginalServer { - lb.randomServers = append(lb.randomServers, lb.originalServerAddress) + lb.randomServers = append(lb.randomServers, lb.defaultServerAddress) } lb.currentServerAddress = lb.randomServers[0] lb.nextServerIndex = 1 diff --git a/pkg/agent/proxy/apiproxy.go b/pkg/agent/proxy/apiproxy.go index 2d0503fcb4..99babef776 100644 --- a/pkg/agent/proxy/apiproxy.go +++ b/pkg/agent/proxy/apiproxy.go @@ -15,6 +15,7 @@ import ( type Proxy interface { Update(addresses []string) SetAPIServerPort(ctx context.Context, port int) error + SetSupervisorDefault(address string) SupervisorURL() string SupervisorAddresses() []string APIServerURL() string @@ -135,6 +136,28 @@ func (p *proxy) SetAPIServerPort(ctx context.Context, port int) error { return nil } +// SetSupervisorDefault updates the default (fallback) address for the connection to the +// supervisor. This is most useful on k3s nodes without apiservers, where the local +// supervisor must be used to bootstrap the agent config, but then switched over to +// another node running an apiserver once one is available. +func (p *proxy) SetSupervisorDefault(address string) { + host, port, err := sysnet.SplitHostPort(address) + if err != nil { + logrus.Errorf("Failed to parse address %s, dropping: %v", address, err) + return + } + if p.apiServerEnabled { + port = p.supervisorPort + address = sysnet.JoinHostPort(host, port) + } + p.fallbackSupervisorAddress = address + if p.supervisorLB == nil { + p.supervisorURL = "https://" + address + } else { + p.supervisorLB.SetDefault(address) + } +} + func (p *proxy) SupervisorURL() string { return p.supervisorURL } diff --git a/pkg/agent/run.go b/pkg/agent/run.go index cec764237f..72c772ab8c 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -3,9 +3,10 @@ package agent import ( "context" "fmt" - "net/url" + "net" "os" "path/filepath" + "strconv" "strings" "time" @@ -350,17 +351,8 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, } agentRan = true } - - select { - case address := <-cfg.APIAddressCh: - cfg.ServerURL = address - u, err := url.Parse(cfg.ServerURL) - if err != nil { - logrus.Warn(err) - } - proxy.Update([]string{fmt.Sprintf("%s:%d", u.Hostname(), nodeConfig.ServerHTTPSPort)}) - case <-ctx.Done(): - return ctx.Err() + if err := waitForAPIServerAddresses(ctx, nodeConfig, cfg, proxy); err != nil { + return err } } else if cfg.ClusterReset && proxy.IsAPIServerLBEnabled() { // If we're doing a cluster-reset on RKE2, the kubelet needs to be started early to clean @@ -379,3 +371,26 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, } return nil } + +func waitForAPIServerAddresses(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error { + for { + select { + case <-time.After(5 * time.Second): + logrus.Info("Waiting for apiserver addresses") + case addresses := <-cfg.APIAddressCh: + for i, a := range addresses { + host, _, err := net.SplitHostPort(a) + if err == nil { + addresses[i] = net.JoinHostPort(host, strconv.Itoa(nodeConfig.ServerHTTPSPort)) + if i == 0 { + proxy.SetSupervisorDefault(addresses[i]) + } + } + } + proxy.Update(addresses) + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/pkg/cli/cmds/agent.go b/pkg/cli/cmds/agent.go index 29649ce8b1..859c20ed71 100644 --- a/pkg/cli/cmds/agent.go +++ b/pkg/cli/cmds/agent.go @@ -14,7 +14,7 @@ type Agent struct { TokenFile string ClusterSecret string ServerURL string - APIAddressCh chan string + APIAddressCh chan []string DisableLoadBalancer bool DisableServiceLB bool ETCDAgent bool diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index b05b2823f1..747401dc91 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -394,13 +394,6 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont serverConfig.ControlConfig.DisableScheduler = true serverConfig.ControlConfig.DisableCCM = true - // only close the agentReady channel in case of k3s restoration, because k3s does not start - // the agent until server returns successfully, unlike rke2's agent which starts in parallel - // with the server - if serverConfig.ControlConfig.SupervisorPort == serverConfig.ControlConfig.HTTPSPort { - close(agentReady) - } - dataDir, err := datadir.LocalHome(cfg.DataDir, false) if err != nil { return err @@ -484,10 +477,12 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont } if serverConfig.ControlConfig.DisableAPIServer { + if cfg.ServerURL != "" { + agentConfig.ServerURL = cfg.ServerURL + } // initialize the apiAddress Channel for receiving the api address from etcd - agentConfig.APIAddressCh = make(chan string, 1) - setAPIAddressChannel(ctx, &serverConfig, &agentConfig) - defer close(agentConfig.APIAddressCh) + agentConfig.APIAddressCh = make(chan []string) + go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig) } return agent.Run(ctx, agentConfig) } @@ -533,29 +528,19 @@ func getArgValueFromList(searchArg string, argList []string) string { return value } -// setAPIAddressChannel will try to get the api address key from etcd and when it succeed it will -// set the APIAddressCh channel with its value, the function works for both k3s and rke2 in case -// of k3s we block returning back to the agent.Run until we get the api address, however in rke2 -// the code will not block operation and will run the operation in a goroutine -func setAPIAddressChannel(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) { - // start a goroutine to check for the server ip if set from etcd in case of rke2 - if serverConfig.ControlConfig.HTTPSPort != serverConfig.ControlConfig.SupervisorPort { - go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig) - return - } - getAPIAddressFromEtcd(ctx, serverConfig, agentConfig) -} - -func getAPIAddressFromEtcd(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) { - t := time.NewTicker(5 * time.Second) - defer t.Stop() - for range t.C { - serverAddress, err := etcd.GetAPIServerURLFromETCD(ctx, &serverConfig.ControlConfig) - if err == nil { - agentConfig.ServerURL = "https://" + serverAddress - agentConfig.APIAddressCh <- agentConfig.ServerURL +func getAPIAddressFromEtcd(ctx context.Context, serverConfig server.Config, agentConfig cmds.Agent) { + defer close(agentConfig.APIAddressCh) + for { + toCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + serverAddresses, err := etcd.GetAPIServerURLsFromETCD(toCtx, &serverConfig.ControlConfig) + if err == nil && len(serverAddresses) > 0 { + agentConfig.APIAddressCh <- serverAddresses break } - logrus.Warn(err) + if !errors.Is(err, etcd.ErrAddressNotSet) { + logrus.Warnf("Failed to get apiserver address from etcd: %v", err) + } + <-toCtx.Done() } } diff --git a/pkg/cluster/bootstrap.go b/pkg/cluster/bootstrap.go index 3197645007..4314b242e6 100644 --- a/pkg/cluster/bootstrap.go +++ b/pkg/cluster/bootstrap.go @@ -356,6 +356,7 @@ func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker, buf.Seek(0, 0) } + logrus.Debugf("One or more certificate directories do not exist; writing data to disk from datastore") return bootstrap.WriteToDiskFromStorage(files, crb) } diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index fc9d4eb920..3275080639 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -55,10 +55,12 @@ func Server(ctx context.Context, cfg *config.Control) error { if err := apiServer(ctx, cfg); err != nil { return err } + } - if err := waitForAPIServerInBackground(ctx, cfg.Runtime); err != nil { - return err - } + // Wait for an apiserver to become available before starting additional controllers, + // even if we're not running an apiserver locally. + if err := waitForAPIServerInBackground(ctx, cfg.Runtime); err != nil { + return err } if !cfg.DisableScheduler { diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index f97a963d96..e18bf3c651 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -76,6 +76,8 @@ var ( NodeNameAnnotation = "etcd." + version.Program + ".cattle.io/node-name" NodeAddressAnnotation = "etcd." + version.Program + ".cattle.io/node-address" + + ErrAddressNotSet = errors.New("apiserver addresses not yet set") ) type NodeControllerGetter func() controllerv1.NodeController @@ -283,7 +285,7 @@ func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error { // Start starts the datastore func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) error { - existingCluster, err := e.IsInitialized(ctx, e.config) + isInitialized, err := e.IsInitialized(ctx, e.config) if err != nil { return errors.Wrapf(err, "configuration validation failed") } @@ -295,7 +297,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e go e.manageLearners(ctx) - if existingCluster { + if isInitialized { //check etcd dir permission etcdDir := DBDir(e.config) info, err := os.Stat(etcdDir) @@ -319,9 +321,18 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e } go func() { - <-e.config.Runtime.AgentReady - if err := e.join(ctx, clientAccessInfo); err != nil { - logrus.Fatalf("ETCD join failed: %v", err) + for { + select { + case <-time.After(30 * time.Second): + logrus.Infof("Waiting for agent to become ready before joining ETCD cluster") + case <-e.config.Runtime.AgentReady: + if err := e.join(ctx, clientAccessInfo); err != nil { + logrus.Fatalf("ETCD join failed: %v", err) + } + return + case <-ctx.Done(): + return + } } }() @@ -1786,27 +1797,27 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error) return backupDir, nil } -// GetAPIServerURLFromETCD will try to fetch the version.Program/apiaddresses key from etcd -// when it succeed it will parse the first address in the list and return back an address -func GetAPIServerURLFromETCD(ctx context.Context, cfg *config.Control) (string, error) { +// GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd +func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) { cl, err := GetClient(ctx, cfg.Runtime, endpoint) if err != nil { - return "", err + return nil, err } etcdResp, err := cl.KV.Get(ctx, AddressKey) if err != nil { - return "", err + return nil, err } - if etcdResp.Count < 1 { - return "", fmt.Errorf("servers addresses are not yet set") + if etcdResp.Count == 0 || len(etcdResp.Kvs[0].Value) == 0 { + return nil, ErrAddressNotSet } + var addresses []string if err := json.Unmarshal(etcdResp.Kvs[0].Value, &addresses); err != nil { - return "", fmt.Errorf("failed to unmarshal etcd key: %v", err) + return nil, fmt.Errorf("failed to unmarshal apiserver addresses from etcd: %v", err) } - return addresses[0], nil + return addresses, nil } // GetMembersClientURLs will list through the member lists in etcd and return diff --git a/pkg/server/etcd.go b/pkg/server/etcd.go index 9b1821a452..670b4557d4 100644 --- a/pkg/server/etcd.go +++ b/pkg/server/etcd.go @@ -13,10 +13,11 @@ import ( ) // setETCDLabelsAndAnnotations will set the etcd role label if not exists also it -// sets special annotaitons on the node object which are etcd node id and etcd node +// sets special annotations on the node object which are etcd node id and etcd node // address, the function will also remove the controlplane and master role labels if // they exist on the node func setETCDLabelsAndAnnotations(ctx context.Context, config *Config) error { + <-config.ControlConfig.Runtime.APIServerReady t := time.NewTicker(5 * time.Second) defer t.Stop() for range t.C { diff --git a/pkg/server/router.go b/pkg/server/router.go index 06910cb7ee..20b9ef60b6 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -39,7 +39,6 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler prefix := "/v1-" + version.Program authed := mux.NewRouter() authed.Use(authMiddleware(serverConfig, version.Program+":agent")) - authed.NotFoundHandler = apiserver(serverConfig.Runtime) authed.Path(prefix + "/serving-kubelet.crt").Handler(servingKubeletCert(serverConfig, serverConfig.Runtime.ServingKubeletKey, nodeAuth)) authed.Path(prefix + "/client-kubelet.crt").Handler(clientKubeletCert(serverConfig, serverConfig.Runtime.ClientKubeletKey, nodeAuth)) authed.Path(prefix + "/client-kube-proxy.crt").Handler(fileHandler(serverConfig.Runtime.ClientKubeProxyCert, serverConfig.Runtime.ClientKubeProxyKey)) @@ -49,6 +48,12 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler authed.Path(prefix + "/config").Handler(configHandler(serverConfig, cfg)) authed.Path(prefix + "/readyz").Handler(readyzHandler(serverConfig)) + if cfg.DisableAPIServer { + authed.NotFoundHandler = apiserverDisabled() + } else { + authed.NotFoundHandler = apiserver(serverConfig.Runtime) + } + nodeAuthed := mux.NewRouter() nodeAuthed.Use(authMiddleware(serverConfig, "system:nodes")) nodeAuthed.Path(prefix + "/connect").Handler(serverConfig.Runtime.Tunnel) @@ -88,6 +93,16 @@ func apiserver(runtime *config.ControlRuntime) http.Handler { }) } +func apiserverDisabled() http.Handler { + return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { + data := []byte("apiserver disabled") + resp.WriteHeader(http.StatusServiceUnavailable) + resp.Header().Set("Content-Type", "text/plain") + resp.Header().Set("Content-length", strconv.Itoa(len(data))) + resp.Write(data) + }) +} + func cacerts(serverCA string) http.Handler { var ca []byte return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { diff --git a/pkg/server/server.go b/pkg/server/server.go index ed8e016fa7..fea44af041 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -366,7 +366,13 @@ func writeKubeConfig(certs string, config *Config) error { if ip == "" { ip = "127.0.0.1" } - url := fmt.Sprintf("https://%s:%d", ip, config.ControlConfig.HTTPSPort) + port := config.ControlConfig.HTTPSPort + // on servers without a local apiserver, tunnel access via the loadbalancer + if config.ControlConfig.DisableAPIServer { + ip = "127.0.0.1" + port = config.ControlConfig.APIServerPort + } + url := fmt.Sprintf("https://%s:%d", ip, port) kubeConfig, err := HomeKubeConfig(true, config.Rootless) def := true if err != nil { diff --git a/pkg/util/api.go b/pkg/util/api.go index 33651cd390..33b05cbbcb 100644 --- a/pkg/util/api.go +++ b/pkg/util/api.go @@ -18,6 +18,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" coregetter "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" ) @@ -56,6 +57,10 @@ func WaitForAPIServerReady(ctx context.Context, client clientset.Interface, time err := wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) { healthStatus := 0 + // Idle connections to the apiserver are returned to a global pool between requests. Explicitly + // close these idle connections so that we re-connect through the loadbalancer in case the endpoints + // have changed. + restClient.(*rest.RESTClient).Client.CloseIdleConnections() result := restClient.Get().AbsPath("/readyz").Do(ctx).StatusCode(&healthStatus) if rerr := result.Error(); rerr != nil { lastErr = errors.Wrap(rerr, "failed to get apiserver /readyz status")