diff --git a/pkg/agent/config/config.go b/pkg/agent/config/config.go index fb699f87eb..d5cfa4ebab 100644 --- a/pkg/agent/config/config.go +++ b/pkg/agent/config/config.go @@ -362,7 +362,7 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N // If the supervisor and externally-facing apiserver are not on the same port, tell the proxy where to find the apiserver. if controlConfig.SupervisorPort != controlConfig.HTTPSPort { isIPv6 := utilsnet.IsIPv6(net.ParseIP([]string{envInfo.NodeIP.String()}[0])) - if err := proxy.SetAPIServerPort(ctx, controlConfig.HTTPSPort, isIPv6); err != nil { + if err := proxy.SetAPIServerPort(controlConfig.HTTPSPort, isIPv6); err != nil { return nil, errors.Wrapf(err, "failed to setup access to API Server port %d on at %s", controlConfig.HTTPSPort, proxy.SupervisorURL()) } } diff --git a/pkg/agent/loadbalancer/loadbalancer.go b/pkg/agent/loadbalancer/loadbalancer.go index f47f4c38a3..2348fcb087 100644 --- a/pkg/agent/loadbalancer/loadbalancer.go +++ b/pkg/agent/loadbalancer/loadbalancer.go @@ -16,7 +16,10 @@ import ( // server tracks the connections to a server, so that they can be closed when the server is removed. type server struct { + // This mutex protects access to the connections map. All direct access to the map should be protected by it. mutex sync.Mutex + address string + healthCheck func() bool connections map[net.Conn]struct{} } @@ -31,7 +34,9 @@ type serverConn struct { // actually balance connections, but instead fails over to a new server only // when a connection attempt to the currently selected server fails. type LoadBalancer struct { - mutex sync.Mutex + // This mutex protects access to servers map and randomServers list. + // All direct access to the servers map/list should be protected by it. + mutex sync.RWMutex proxy *tcpproxy.Proxy serviceName string @@ -123,28 +128,11 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo } logrus.Infof("Running load balancer %s %s -> %v [default: %s]", serviceName, lb.localAddress, lb.ServerAddresses, lb.defaultServerAddress) + go lb.runHealthChecks(ctx) + return lb, nil } -func (lb *LoadBalancer) SetDefault(serverAddress string) { - lb.mutex.Lock() - defer lb.mutex.Unlock() - - _, hasOriginalServer := sortServers(lb.ServerAddresses, lb.defaultServerAddress) - // if the old default server is not currently in use, remove it from the server map - if server := lb.servers[lb.defaultServerAddress]; server != nil && !hasOriginalServer { - defer server.closeAll() - delete(lb.servers, lb.defaultServerAddress) - } - // if the new default server doesn't have an entry in the map, add one - if _, ok := lb.servers[serverAddress]; !ok { - lb.servers[serverAddress] = &server{connections: make(map[net.Conn]struct{})} - } - - lb.defaultServerAddress = serverAddress - logrus.Infof("Updated load balancer %s default server address -> %s", lb.serviceName, serverAddress) -} - func (lb *LoadBalancer) Update(serverAddresses []string) { if lb == nil { return @@ -166,7 +154,10 @@ func (lb *LoadBalancer) LoadBalancerServerURL() string { return lb.localServerURL } -func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string) (net.Conn, error) { +func (lb *LoadBalancer) dialContext(ctx context.Context, network, _ string) (net.Conn, error) { + lb.mutex.RLock() + defer lb.mutex.RUnlock() + startIndex := lb.nextServerIndex for { targetServer := lb.currentServerAddress @@ -174,7 +165,7 @@ func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string server := lb.servers[targetServer] if server == nil || targetServer == "" { logrus.Debugf("Nil server for load balancer %s: %s", lb.serviceName, targetServer) - } else { + } else if server.healthCheck() { conn, err := server.dialContext(ctx, network, targetServer) if err == nil { return conn, nil diff --git a/pkg/agent/loadbalancer/servers.go b/pkg/agent/loadbalancer/servers.go index 116cc246c4..78ee88d74f 100644 --- a/pkg/agent/loadbalancer/servers.go +++ b/pkg/agent/loadbalancer/servers.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "strconv" + "time" "github.com/k3s-io/k3s/pkg/version" http_dialer "github.com/mwitkow/go-http-dialer" @@ -17,6 +18,7 @@ import ( "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" ) var defaultDialer proxy.Dialer = &net.Dialer{} @@ -73,7 +75,11 @@ func (lb *LoadBalancer) setServers(serverAddresses []string) bool { for addedServer := range newAddresses.Difference(curAddresses) { logrus.Infof("Adding server to load balancer %s: %s", lb.serviceName, addedServer) - lb.servers[addedServer] = &server{connections: make(map[net.Conn]struct{})} + lb.servers[addedServer] = &server{ + address: addedServer, + connections: make(map[net.Conn]struct{}), + healthCheck: func() bool { return true }, + } } for removedServer := range curAddresses.Difference(newAddresses) { @@ -106,8 +112,8 @@ func (lb *LoadBalancer) setServers(serverAddresses []string) bool { } func (lb *LoadBalancer) nextServer(failedServer string) (string, error) { - lb.mutex.Lock() - defer lb.mutex.Unlock() + lb.mutex.RLock() + defer lb.mutex.RUnlock() if len(lb.randomServers) == 0 { return "", errors.New("No servers in load balancer proxy list") @@ -162,10 +168,12 @@ func (s *server) closeAll() { s.mutex.Lock() defer s.mutex.Unlock() - logrus.Debugf("Closing %d connections to load balancer server", len(s.connections)) - for conn := range s.connections { - // Close the connection in a goroutine so that we don't hold the lock while doing so. - go conn.Close() + if l := len(s.connections); l > 0 { + logrus.Infof("Closing %d connections to load balancer server %s", len(s.connections), s.address) + for conn := range s.connections { + // Close the connection in a goroutine so that we don't hold the lock while doing so. + go conn.Close() + } } } @@ -178,3 +186,55 @@ func (sc *serverConn) Close() error { delete(sc.server.connections, sc) return sc.Conn.Close() } + +// SetDefault sets the selected address as the default / fallback address +func (lb *LoadBalancer) SetDefault(serverAddress string) { + lb.mutex.Lock() + defer lb.mutex.Unlock() + + _, hasOriginalServer := sortServers(lb.ServerAddresses, lb.defaultServerAddress) + // if the old default server is not currently in use, remove it from the server map + if server := lb.servers[lb.defaultServerAddress]; server != nil && !hasOriginalServer { + defer server.closeAll() + delete(lb.servers, lb.defaultServerAddress) + } + // if the new default server doesn't have an entry in the map, add one + if _, ok := lb.servers[serverAddress]; !ok { + lb.servers[serverAddress] = &server{ + address: serverAddress, + healthCheck: func() bool { return true }, + connections: make(map[net.Conn]struct{}), + } + } + + lb.defaultServerAddress = serverAddress + logrus.Infof("Updated load balancer %s default server address -> %s", lb.serviceName, serverAddress) +} + +// SetHealthCheck adds a health-check callback to an address, replacing the default no-op function. +func (lb *LoadBalancer) SetHealthCheck(address string, healthCheck func() bool) { + lb.mutex.Lock() + defer lb.mutex.Unlock() + + if server := lb.servers[address]; server != nil { + logrus.Debugf("Added health check for load balancer %s: %s", lb.serviceName, address) + server.healthCheck = healthCheck + } else { + logrus.Errorf("Failed to add health check for load balancer %s: no server found for %s", lb.serviceName, address) + } +} + +// runHealthChecks periodically health-checks all servers. Any servers that fail the health-check will have their +// connections closed, to force clients to switch over to a healthy server. +func (lb *LoadBalancer) runHealthChecks(ctx context.Context) { + wait.Until(func() { + lb.mutex.RLock() + defer lb.mutex.RUnlock() + for _, server := range lb.servers { + if !server.healthCheck() { + defer server.closeAll() + } + } + }, time.Second, ctx.Done()) + logrus.Debugf("Stopped health checking for load balancer %s", lb.serviceName) +} diff --git a/pkg/agent/proxy/apiproxy.go b/pkg/agent/proxy/apiproxy.go index 0cdc583d26..becc2a0def 100644 --- a/pkg/agent/proxy/apiproxy.go +++ b/pkg/agent/proxy/apiproxy.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "net" sysnet "net" "net/url" "strconv" @@ -14,13 +15,14 @@ import ( type Proxy interface { Update(addresses []string) - SetAPIServerPort(ctx context.Context, port int, isIPv6 bool) error + SetAPIServerPort(port int, isIPv6 bool) error SetSupervisorDefault(address string) IsSupervisorLBEnabled() bool SupervisorURL() string SupervisorAddresses() []string APIServerURL() string IsAPIServerLBEnabled() bool + SetHealthCheck(address string, healthCheck func() bool) } // NewSupervisorProxy sets up a new proxy for retrieving supervisor and apiserver addresses. If @@ -38,6 +40,7 @@ func NewSupervisorProxy(ctx context.Context, lbEnabled bool, dataDir, supervisor supervisorURL: supervisorURL, apiServerURL: supervisorURL, lbServerPort: lbServerPort, + context: ctx, } if lbEnabled { @@ -70,6 +73,7 @@ type proxy struct { apiServerEnabled bool apiServerURL string + apiServerPort string supervisorURL string supervisorPort string initialSupervisorURL string @@ -78,6 +82,7 @@ type proxy struct { apiServerLB *loadbalancer.LoadBalancer supervisorLB *loadbalancer.LoadBalancer + context context.Context } func (p *proxy) Update(addresses []string) { @@ -96,6 +101,18 @@ func (p *proxy) Update(addresses []string) { p.supervisorAddresses = supervisorAddresses } +func (p *proxy) SetHealthCheck(address string, healthCheck func() bool) { + if p.supervisorLB != nil { + p.supervisorLB.SetHealthCheck(address, healthCheck) + } + + if p.apiServerLB != nil { + host, _, _ := net.SplitHostPort(address) + address = net.JoinHostPort(host, p.apiServerPort) + p.apiServerLB.SetHealthCheck(address, healthCheck) + } +} + func (p *proxy) setSupervisorPort(addresses []string) []string { var newAddresses []string for _, address := range addresses { @@ -114,12 +131,13 @@ func (p *proxy) setSupervisorPort(addresses []string) []string { // load-balancing is enabled, another load-balancer is started on a port one below the supervisor // load-balancer, and the address of this load-balancer is returned instead of the actual apiserver // addresses. -func (p *proxy) SetAPIServerPort(ctx context.Context, port int, isIPv6 bool) error { +func (p *proxy) SetAPIServerPort(port int, isIPv6 bool) error { u, err := url.Parse(p.initialSupervisorURL) if err != nil { return errors.Wrapf(err, "failed to parse server URL %s", p.initialSupervisorURL) } - u.Host = sysnet.JoinHostPort(u.Hostname(), strconv.Itoa(port)) + p.apiServerPort = strconv.Itoa(port) + u.Host = sysnet.JoinHostPort(u.Hostname(), p.apiServerPort) p.apiServerURL = u.String() p.apiServerEnabled = true @@ -129,7 +147,7 @@ func (p *proxy) SetAPIServerPort(ctx context.Context, port int, isIPv6 bool) err if lbServerPort != 0 { lbServerPort = lbServerPort - 1 } - lb, err := loadbalancer.New(ctx, p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort, isIPv6) + lb, err := loadbalancer.New(p.context, p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort, isIPv6) if err != nil { return err } diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index 0f6acb9da6..6245d52910 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -3,6 +3,7 @@ package tunnel import ( "context" "crypto/tls" + "errors" "fmt" "net" "os" @@ -289,7 +290,9 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan disconnect := map[string]context.CancelFunc{} for _, address := range proxy.SupervisorAddresses() { if _, ok := disconnect[address]; !ok { - disconnect[address] = a.connect(ctx, wg, address, tlsConfig) + conn := a.connect(ctx, wg, address, tlsConfig) + disconnect[address] = conn.cancel + proxy.SetHealthCheck(address, conn.connected) } } @@ -361,7 +364,9 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan for _, address := range proxy.SupervisorAddresses() { validEndpoint[address] = true if _, ok := disconnect[address]; !ok { - disconnect[address] = a.connect(ctx, nil, address, tlsConfig) + conn := a.connect(ctx, nil, address, tlsConfig) + disconnect[address] = conn.cancel + proxy.SetHealthCheck(address, conn.connected) } } @@ -403,32 +408,54 @@ func (a *agentTunnel) authorized(ctx context.Context, proto, address string) boo return false } +type agentConnection struct { + cancel context.CancelFunc + connected func() bool +} + // connect initiates a connection to the remotedialer server. Incoming dial requests from // the server will be checked by the authorizer function prior to being fulfilled. -func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc { +func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) agentConnection { wsURL := fmt.Sprintf("wss://%s/v1-"+version.Program+"/connect", address) ws := &websocket.Dialer{ TLSClientConfig: tlsConfig, } + // Assume that the connection to the server will succeed, to avoid failing health checks while attempting to connect. + // If we cannot connect, connected will be set to false when the initial connection attempt fails. + connected := true + once := sync.Once{} if waitGroup != nil { waitGroup.Add(1) } ctx, cancel := context.WithCancel(rootCtx) + auth := func(proto, address string) bool { + return a.authorized(rootCtx, proto, address) + } + onConnect := func(_ context.Context, _ *remotedialer.Session) error { + connected = true + logrus.WithField("url", wsURL).Info("Remotedialer connected to proxy") + if waitGroup != nil { + once.Do(waitGroup.Done) + } + return nil + } + + // Start remotedialer connect loop in a goroutine to ensure a connection to the target server go func() { for { - remotedialer.ClientConnect(ctx, wsURL, nil, ws, func(proto, address string) bool { - return a.authorized(rootCtx, proto, address) - }, func(_ context.Context, _ *remotedialer.Session) error { - if waitGroup != nil { - once.Do(waitGroup.Done) - } - return nil - }) - + // ConnectToProxy blocks until error or context cancellation + err := remotedialer.ConnectToProxy(ctx, wsURL, nil, auth, ws, onConnect) + connected = false + if err != nil && !errors.Is(err, context.Canceled) { + logrus.WithField("url", wsURL).WithError(err).Error("Remotedialer proxy error; reconecting...") + // wait between reconnection attempts to avoid hammering the server + time.Sleep(endpointDebounceDelay) + } + // If the context has been cancelled, exit the goroutine instead of retrying if ctx.Err() != nil { if waitGroup != nil { once.Do(waitGroup.Done) @@ -438,7 +465,10 @@ func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup } }() - return cancel + return agentConnection{ + cancel: cancel, + connected: func() bool { return connected }, + } } // isKubeletPort returns true if the connection is to a reserved TCP port on a loopback address. diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d87c2d9fa6..22894bfa95 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -56,7 +56,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { clientURL.Host = clientURL.Hostname() + ":2379" clientURLs = append(clientURLs, clientURL.String()) } - etcdProxy, err := etcd.NewETCDProxy(ctx, true, c.config.DataDir, clientURLs[0], utilsnet.IsIPv6CIDR(c.config.ServiceIPRanges[0])) + etcdProxy, err := etcd.NewETCDProxy(ctx, c.config.SupervisorPort, c.config.DataDir, clientURLs[0], utilsnet.IsIPv6CIDR(c.config.ServiceIPRanges[0])) if err != nil { return nil, err } diff --git a/pkg/cluster/managed.go b/pkg/cluster/managed.go index e88ed79512..dcf140e577 100644 --- a/pkg/cluster/managed.go +++ b/pkg/cluster/managed.go @@ -126,7 +126,7 @@ func (c *Cluster) assignManagedDriver(ctx context.Context) error { return nil } -// setupEtcdProxy periodically updates the etcd proxy with the current list of +// setupEtcdProxy starts a goroutine to periodically update the etcd proxy with the current list of // cluster client URLs, as retrieved from etcd. func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) { if c.managedDB == nil { @@ -138,7 +138,7 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) { for range t.C { newAddresses, err := c.managedDB.GetMembersClientURLs(ctx) if err != nil { - logrus.Warnf("failed to get etcd client URLs: %v", err) + logrus.Warnf("Failed to get etcd client URLs: %v", err) continue } // client URLs are a full URI, but the proxy only wants host:port @@ -146,7 +146,7 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) { for _, address := range newAddresses { u, err := url.Parse(address) if err != nil { - logrus.Warnf("failed to parse etcd client URL: %v", err) + logrus.Warnf("Failed to parse etcd client URL: %v", err) continue } hosts = append(hosts, u.Host) @@ -162,7 +162,7 @@ func (c *Cluster) deleteNodePasswdSecret(ctx context.Context) { secretsClient := c.config.Runtime.Core.Core().V1().Secret() if err := nodepassword.Delete(secretsClient, nodeName); err != nil { if apierrors.IsNotFound(err) { - logrus.Debugf("node password secret is not found for node %s", nodeName) + logrus.Debugf("Node password secret is not found for node %s", nodeName) return } logrus.Warnf("failed to delete old node password secret: %v", err) diff --git a/pkg/etcd/etcdproxy.go b/pkg/etcd/etcdproxy.go index 6b0cc966f5..40bee876b1 100644 --- a/pkg/etcd/etcdproxy.go +++ b/pkg/etcd/etcdproxy.go @@ -2,10 +2,18 @@ package etcd import ( "context" + "crypto/tls" + "fmt" + "net" + "net/http" "net/url" + "strconv" + "time" "github.com/k3s-io/k3s/pkg/agent/loadbalancer" "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" ) type Proxy interface { @@ -15,9 +23,17 @@ type Proxy interface { ETCDServerURL() string } +var httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, +} + // NewETCDProxy initializes a new proxy structure that contain a load balancer // which listens on port 2379 and proxy between etcd cluster members -func NewETCDProxy(ctx context.Context, enabled bool, dataDir, etcdURL string, isIPv6 bool) (Proxy, error) { +func NewETCDProxy(ctx context.Context, supervisorPort int, dataDir, etcdURL string, isIPv6 bool) (Proxy, error) { u, err := url.Parse(etcdURL) if err != nil { return nil, errors.Wrap(err, "failed to parse etcd client URL") @@ -27,16 +43,16 @@ func NewETCDProxy(ctx context.Context, enabled bool, dataDir, etcdURL string, is dataDir: dataDir, initialETCDURL: etcdURL, etcdURL: etcdURL, + supervisorPort: supervisorPort, + disconnect: map[string]context.CancelFunc{}, } - if enabled { - lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379, isIPv6) - if err != nil { - return nil, err - } - e.etcdLB = lb - e.etcdLBURL = lb.LoadBalancerServerURL() + lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379, isIPv6) + if err != nil { + return nil, err } + e.etcdLB = lb + e.etcdLBURL = lb.LoadBalancerServerURL() e.fallbackETCDAddress = u.Host e.etcdPort = u.Port() @@ -48,17 +64,34 @@ type etcdproxy struct { dataDir string etcdLBURL string + supervisorPort int initialETCDURL string etcdURL string etcdPort string fallbackETCDAddress string etcdAddresses []string etcdLB *loadbalancer.LoadBalancer + disconnect map[string]context.CancelFunc } func (e *etcdproxy) Update(addresses []string) { - if e.etcdLB != nil { - e.etcdLB.Update(addresses) + e.etcdLB.Update(addresses) + + validEndpoint := map[string]bool{} + for _, address := range e.etcdLB.ServerAddresses { + validEndpoint[address] = true + if _, ok := e.disconnect[address]; !ok { + ctx, cancel := context.WithCancel(context.Background()) + e.disconnect[address] = cancel + e.etcdLB.SetHealthCheck(address, e.createHealthCheck(ctx, address)) + } + } + + for address, cancel := range e.disconnect { + if !validEndpoint[address] { + cancel() + delete(e.disconnect, address) + } } } @@ -76,3 +109,35 @@ func (e *etcdproxy) ETCDAddresses() []string { func (e *etcdproxy) ETCDServerURL() string { return e.etcdURL } + +// start a polling routine that makes periodic requests to the etcd node's supervisor port. +// If the request fails, the node is marked unhealthy. +func (e etcdproxy) createHealthCheck(ctx context.Context, address string) func() bool { + // Assume that the connection to the server will succeed, to avoid failing health checks while attempting to connect. + // If we cannot connect, connected will be set to false when the initial connection attempt fails. + connected := true + + host, _, _ := net.SplitHostPort(address) + url := fmt.Sprintf("https://%s/ping", net.JoinHostPort(host, strconv.Itoa(e.supervisorPort))) + + go wait.JitterUntilWithContext(ctx, func(ctx context.Context) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + resp, err := httpClient.Do(req) + var statusCode int + if resp != nil { + statusCode = resp.StatusCode + } + if err != nil || statusCode != http.StatusOK { + logrus.Debugf("Health check %s failed: %v (StatusCode: %d)", url, err, statusCode) + connected = false + } else { + connected = true + } + }, 5*time.Second, 1.0, true) + + return func() bool { + return connected + } +}