mirror of
https://github.com/k3s-io/k3s.git
synced 2024-06-07 19:41:36 +00:00
Add health-check support to loadbalancer
* Adds support for health-checking loadbalancer servers. If a health-check fails when dialing, all existing connections to the server will be closed. * Wires up a remotedialer tunnel connectivity check as the health check for supervisor/apiserver connections. * Wires up a simple ping request to the supervisor port as the health check for etcd connections. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
parent
edb0440017
commit
c51d7bfbd1
@ -362,7 +362,7 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
|
||||
// If the supervisor and externally-facing apiserver are not on the same port, tell the proxy where to find the apiserver.
|
||||
if controlConfig.SupervisorPort != controlConfig.HTTPSPort {
|
||||
isIPv6 := utilsnet.IsIPv6(net.ParseIP([]string{envInfo.NodeIP.String()}[0]))
|
||||
if err := proxy.SetAPIServerPort(ctx, controlConfig.HTTPSPort, isIPv6); err != nil {
|
||||
if err := proxy.SetAPIServerPort(controlConfig.HTTPSPort, isIPv6); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to setup access to API Server port %d on at %s", controlConfig.HTTPSPort, proxy.SupervisorURL())
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,10 @@ import (
|
||||
|
||||
// server tracks the connections to a server, so that they can be closed when the server is removed.
|
||||
type server struct {
|
||||
// This mutex protects access to the connections map. All direct access to the map should be protected by it.
|
||||
mutex sync.Mutex
|
||||
address string
|
||||
healthCheck func() bool
|
||||
connections map[net.Conn]struct{}
|
||||
}
|
||||
|
||||
@ -31,7 +34,9 @@ type serverConn struct {
|
||||
// actually balance connections, but instead fails over to a new server only
|
||||
// when a connection attempt to the currently selected server fails.
|
||||
type LoadBalancer struct {
|
||||
mutex sync.Mutex
|
||||
// This mutex protects access to servers map and randomServers list.
|
||||
// All direct access to the servers map/list should be protected by it.
|
||||
mutex sync.RWMutex
|
||||
proxy *tcpproxy.Proxy
|
||||
|
||||
serviceName string
|
||||
@ -123,28 +128,11 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo
|
||||
}
|
||||
logrus.Infof("Running load balancer %s %s -> %v [default: %s]", serviceName, lb.localAddress, lb.ServerAddresses, lb.defaultServerAddress)
|
||||
|
||||
go lb.runHealthChecks(ctx)
|
||||
|
||||
return lb, nil
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) SetDefault(serverAddress string) {
|
||||
lb.mutex.Lock()
|
||||
defer lb.mutex.Unlock()
|
||||
|
||||
_, hasOriginalServer := sortServers(lb.ServerAddresses, lb.defaultServerAddress)
|
||||
// if the old default server is not currently in use, remove it from the server map
|
||||
if server := lb.servers[lb.defaultServerAddress]; server != nil && !hasOriginalServer {
|
||||
defer server.closeAll()
|
||||
delete(lb.servers, lb.defaultServerAddress)
|
||||
}
|
||||
// if the new default server doesn't have an entry in the map, add one
|
||||
if _, ok := lb.servers[serverAddress]; !ok {
|
||||
lb.servers[serverAddress] = &server{connections: make(map[net.Conn]struct{})}
|
||||
}
|
||||
|
||||
lb.defaultServerAddress = serverAddress
|
||||
logrus.Infof("Updated load balancer %s default server address -> %s", lb.serviceName, serverAddress)
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) Update(serverAddresses []string) {
|
||||
if lb == nil {
|
||||
return
|
||||
@ -166,7 +154,10 @@ func (lb *LoadBalancer) LoadBalancerServerURL() string {
|
||||
return lb.localServerURL
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
func (lb *LoadBalancer) dialContext(ctx context.Context, network, _ string) (net.Conn, error) {
|
||||
lb.mutex.RLock()
|
||||
defer lb.mutex.RUnlock()
|
||||
|
||||
startIndex := lb.nextServerIndex
|
||||
for {
|
||||
targetServer := lb.currentServerAddress
|
||||
@ -174,7 +165,7 @@ func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string
|
||||
server := lb.servers[targetServer]
|
||||
if server == nil || targetServer == "" {
|
||||
logrus.Debugf("Nil server for load balancer %s: %s", lb.serviceName, targetServer)
|
||||
} else {
|
||||
} else if server.healthCheck() {
|
||||
conn, err := server.dialContext(ctx, network, targetServer)
|
||||
if err == nil {
|
||||
return conn, nil
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
http_dialer "github.com/mwitkow/go-http-dialer"
|
||||
@ -17,6 +18,7 @@ import (
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
var defaultDialer proxy.Dialer = &net.Dialer{}
|
||||
@ -73,7 +75,11 @@ func (lb *LoadBalancer) setServers(serverAddresses []string) bool {
|
||||
|
||||
for addedServer := range newAddresses.Difference(curAddresses) {
|
||||
logrus.Infof("Adding server to load balancer %s: %s", lb.serviceName, addedServer)
|
||||
lb.servers[addedServer] = &server{connections: make(map[net.Conn]struct{})}
|
||||
lb.servers[addedServer] = &server{
|
||||
address: addedServer,
|
||||
connections: make(map[net.Conn]struct{}),
|
||||
healthCheck: func() bool { return true },
|
||||
}
|
||||
}
|
||||
|
||||
for removedServer := range curAddresses.Difference(newAddresses) {
|
||||
@ -106,8 +112,8 @@ func (lb *LoadBalancer) setServers(serverAddresses []string) bool {
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) nextServer(failedServer string) (string, error) {
|
||||
lb.mutex.Lock()
|
||||
defer lb.mutex.Unlock()
|
||||
lb.mutex.RLock()
|
||||
defer lb.mutex.RUnlock()
|
||||
|
||||
if len(lb.randomServers) == 0 {
|
||||
return "", errors.New("No servers in load balancer proxy list")
|
||||
@ -162,10 +168,12 @@ func (s *server) closeAll() {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
logrus.Debugf("Closing %d connections to load balancer server", len(s.connections))
|
||||
for conn := range s.connections {
|
||||
// Close the connection in a goroutine so that we don't hold the lock while doing so.
|
||||
go conn.Close()
|
||||
if l := len(s.connections); l > 0 {
|
||||
logrus.Infof("Closing %d connections to load balancer server %s", len(s.connections), s.address)
|
||||
for conn := range s.connections {
|
||||
// Close the connection in a goroutine so that we don't hold the lock while doing so.
|
||||
go conn.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -178,3 +186,55 @@ func (sc *serverConn) Close() error {
|
||||
delete(sc.server.connections, sc)
|
||||
return sc.Conn.Close()
|
||||
}
|
||||
|
||||
// SetDefault sets the selected address as the default / fallback address
|
||||
func (lb *LoadBalancer) SetDefault(serverAddress string) {
|
||||
lb.mutex.Lock()
|
||||
defer lb.mutex.Unlock()
|
||||
|
||||
_, hasOriginalServer := sortServers(lb.ServerAddresses, lb.defaultServerAddress)
|
||||
// if the old default server is not currently in use, remove it from the server map
|
||||
if server := lb.servers[lb.defaultServerAddress]; server != nil && !hasOriginalServer {
|
||||
defer server.closeAll()
|
||||
delete(lb.servers, lb.defaultServerAddress)
|
||||
}
|
||||
// if the new default server doesn't have an entry in the map, add one
|
||||
if _, ok := lb.servers[serverAddress]; !ok {
|
||||
lb.servers[serverAddress] = &server{
|
||||
address: serverAddress,
|
||||
healthCheck: func() bool { return true },
|
||||
connections: make(map[net.Conn]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
lb.defaultServerAddress = serverAddress
|
||||
logrus.Infof("Updated load balancer %s default server address -> %s", lb.serviceName, serverAddress)
|
||||
}
|
||||
|
||||
// SetHealthCheck adds a health-check callback to an address, replacing the default no-op function.
|
||||
func (lb *LoadBalancer) SetHealthCheck(address string, healthCheck func() bool) {
|
||||
lb.mutex.Lock()
|
||||
defer lb.mutex.Unlock()
|
||||
|
||||
if server := lb.servers[address]; server != nil {
|
||||
logrus.Debugf("Added health check for load balancer %s: %s", lb.serviceName, address)
|
||||
server.healthCheck = healthCheck
|
||||
} else {
|
||||
logrus.Errorf("Failed to add health check for load balancer %s: no server found for %s", lb.serviceName, address)
|
||||
}
|
||||
}
|
||||
|
||||
// runHealthChecks periodically health-checks all servers. Any servers that fail the health-check will have their
|
||||
// connections closed, to force clients to switch over to a healthy server.
|
||||
func (lb *LoadBalancer) runHealthChecks(ctx context.Context) {
|
||||
wait.Until(func() {
|
||||
lb.mutex.RLock()
|
||||
defer lb.mutex.RUnlock()
|
||||
for _, server := range lb.servers {
|
||||
if !server.healthCheck() {
|
||||
defer server.closeAll()
|
||||
}
|
||||
}
|
||||
}, time.Second, ctx.Done())
|
||||
logrus.Debugf("Stopped health checking for load balancer %s", lb.serviceName)
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
sysnet "net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
@ -14,13 +15,14 @@ import (
|
||||
|
||||
type Proxy interface {
|
||||
Update(addresses []string)
|
||||
SetAPIServerPort(ctx context.Context, port int, isIPv6 bool) error
|
||||
SetAPIServerPort(port int, isIPv6 bool) error
|
||||
SetSupervisorDefault(address string)
|
||||
IsSupervisorLBEnabled() bool
|
||||
SupervisorURL() string
|
||||
SupervisorAddresses() []string
|
||||
APIServerURL() string
|
||||
IsAPIServerLBEnabled() bool
|
||||
SetHealthCheck(address string, healthCheck func() bool)
|
||||
}
|
||||
|
||||
// NewSupervisorProxy sets up a new proxy for retrieving supervisor and apiserver addresses. If
|
||||
@ -38,6 +40,7 @@ func NewSupervisorProxy(ctx context.Context, lbEnabled bool, dataDir, supervisor
|
||||
supervisorURL: supervisorURL,
|
||||
apiServerURL: supervisorURL,
|
||||
lbServerPort: lbServerPort,
|
||||
context: ctx,
|
||||
}
|
||||
|
||||
if lbEnabled {
|
||||
@ -70,6 +73,7 @@ type proxy struct {
|
||||
apiServerEnabled bool
|
||||
|
||||
apiServerURL string
|
||||
apiServerPort string
|
||||
supervisorURL string
|
||||
supervisorPort string
|
||||
initialSupervisorURL string
|
||||
@ -78,6 +82,7 @@ type proxy struct {
|
||||
|
||||
apiServerLB *loadbalancer.LoadBalancer
|
||||
supervisorLB *loadbalancer.LoadBalancer
|
||||
context context.Context
|
||||
}
|
||||
|
||||
func (p *proxy) Update(addresses []string) {
|
||||
@ -96,6 +101,18 @@ func (p *proxy) Update(addresses []string) {
|
||||
p.supervisorAddresses = supervisorAddresses
|
||||
}
|
||||
|
||||
func (p *proxy) SetHealthCheck(address string, healthCheck func() bool) {
|
||||
if p.supervisorLB != nil {
|
||||
p.supervisorLB.SetHealthCheck(address, healthCheck)
|
||||
}
|
||||
|
||||
if p.apiServerLB != nil {
|
||||
host, _, _ := net.SplitHostPort(address)
|
||||
address = net.JoinHostPort(host, p.apiServerPort)
|
||||
p.apiServerLB.SetHealthCheck(address, healthCheck)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proxy) setSupervisorPort(addresses []string) []string {
|
||||
var newAddresses []string
|
||||
for _, address := range addresses {
|
||||
@ -114,12 +131,13 @@ func (p *proxy) setSupervisorPort(addresses []string) []string {
|
||||
// load-balancing is enabled, another load-balancer is started on a port one below the supervisor
|
||||
// load-balancer, and the address of this load-balancer is returned instead of the actual apiserver
|
||||
// addresses.
|
||||
func (p *proxy) SetAPIServerPort(ctx context.Context, port int, isIPv6 bool) error {
|
||||
func (p *proxy) SetAPIServerPort(port int, isIPv6 bool) error {
|
||||
u, err := url.Parse(p.initialSupervisorURL)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to parse server URL %s", p.initialSupervisorURL)
|
||||
}
|
||||
u.Host = sysnet.JoinHostPort(u.Hostname(), strconv.Itoa(port))
|
||||
p.apiServerPort = strconv.Itoa(port)
|
||||
u.Host = sysnet.JoinHostPort(u.Hostname(), p.apiServerPort)
|
||||
|
||||
p.apiServerURL = u.String()
|
||||
p.apiServerEnabled = true
|
||||
@ -129,7 +147,7 @@ func (p *proxy) SetAPIServerPort(ctx context.Context, port int, isIPv6 bool) err
|
||||
if lbServerPort != 0 {
|
||||
lbServerPort = lbServerPort - 1
|
||||
}
|
||||
lb, err := loadbalancer.New(ctx, p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort, isIPv6)
|
||||
lb, err := loadbalancer.New(p.context, p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort, isIPv6)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package tunnel
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
@ -289,7 +290,9 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
|
||||
disconnect := map[string]context.CancelFunc{}
|
||||
for _, address := range proxy.SupervisorAddresses() {
|
||||
if _, ok := disconnect[address]; !ok {
|
||||
disconnect[address] = a.connect(ctx, wg, address, tlsConfig)
|
||||
conn := a.connect(ctx, wg, address, tlsConfig)
|
||||
disconnect[address] = conn.cancel
|
||||
proxy.SetHealthCheck(address, conn.connected)
|
||||
}
|
||||
}
|
||||
|
||||
@ -361,7 +364,9 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
|
||||
for _, address := range proxy.SupervisorAddresses() {
|
||||
validEndpoint[address] = true
|
||||
if _, ok := disconnect[address]; !ok {
|
||||
disconnect[address] = a.connect(ctx, nil, address, tlsConfig)
|
||||
conn := a.connect(ctx, nil, address, tlsConfig)
|
||||
disconnect[address] = conn.cancel
|
||||
proxy.SetHealthCheck(address, conn.connected)
|
||||
}
|
||||
}
|
||||
|
||||
@ -403,32 +408,54 @@ func (a *agentTunnel) authorized(ctx context.Context, proto, address string) boo
|
||||
return false
|
||||
}
|
||||
|
||||
type agentConnection struct {
|
||||
cancel context.CancelFunc
|
||||
connected func() bool
|
||||
}
|
||||
|
||||
// connect initiates a connection to the remotedialer server. Incoming dial requests from
|
||||
// the server will be checked by the authorizer function prior to being fulfilled.
|
||||
func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc {
|
||||
func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) agentConnection {
|
||||
wsURL := fmt.Sprintf("wss://%s/v1-"+version.Program+"/connect", address)
|
||||
ws := &websocket.Dialer{
|
||||
TLSClientConfig: tlsConfig,
|
||||
}
|
||||
|
||||
// Assume that the connection to the server will succeed, to avoid failing health checks while attempting to connect.
|
||||
// If we cannot connect, connected will be set to false when the initial connection attempt fails.
|
||||
connected := true
|
||||
|
||||
once := sync.Once{}
|
||||
if waitGroup != nil {
|
||||
waitGroup.Add(1)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(rootCtx)
|
||||
auth := func(proto, address string) bool {
|
||||
return a.authorized(rootCtx, proto, address)
|
||||
}
|
||||
|
||||
onConnect := func(_ context.Context, _ *remotedialer.Session) error {
|
||||
connected = true
|
||||
logrus.WithField("url", wsURL).Info("Remotedialer connected to proxy")
|
||||
if waitGroup != nil {
|
||||
once.Do(waitGroup.Done)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start remotedialer connect loop in a goroutine to ensure a connection to the target server
|
||||
go func() {
|
||||
for {
|
||||
remotedialer.ClientConnect(ctx, wsURL, nil, ws, func(proto, address string) bool {
|
||||
return a.authorized(rootCtx, proto, address)
|
||||
}, func(_ context.Context, _ *remotedialer.Session) error {
|
||||
if waitGroup != nil {
|
||||
once.Do(waitGroup.Done)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// ConnectToProxy blocks until error or context cancellation
|
||||
err := remotedialer.ConnectToProxy(ctx, wsURL, nil, auth, ws, onConnect)
|
||||
connected = false
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
logrus.WithField("url", wsURL).WithError(err).Error("Remotedialer proxy error; reconecting...")
|
||||
// wait between reconnection attempts to avoid hammering the server
|
||||
time.Sleep(endpointDebounceDelay)
|
||||
}
|
||||
// If the context has been cancelled, exit the goroutine instead of retrying
|
||||
if ctx.Err() != nil {
|
||||
if waitGroup != nil {
|
||||
once.Do(waitGroup.Done)
|
||||
@ -438,7 +465,10 @@ func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup
|
||||
}
|
||||
}()
|
||||
|
||||
return cancel
|
||||
return agentConnection{
|
||||
cancel: cancel,
|
||||
connected: func() bool { return connected },
|
||||
}
|
||||
}
|
||||
|
||||
// isKubeletPort returns true if the connection is to a reserved TCP port on a loopback address.
|
||||
|
@ -56,7 +56,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
|
||||
clientURL.Host = clientURL.Hostname() + ":2379"
|
||||
clientURLs = append(clientURLs, clientURL.String())
|
||||
}
|
||||
etcdProxy, err := etcd.NewETCDProxy(ctx, true, c.config.DataDir, clientURLs[0], utilsnet.IsIPv6CIDR(c.config.ServiceIPRanges[0]))
|
||||
etcdProxy, err := etcd.NewETCDProxy(ctx, c.config.SupervisorPort, c.config.DataDir, clientURLs[0], utilsnet.IsIPv6CIDR(c.config.ServiceIPRanges[0]))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ func (c *Cluster) assignManagedDriver(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupEtcdProxy periodically updates the etcd proxy with the current list of
|
||||
// setupEtcdProxy starts a goroutine to periodically update the etcd proxy with the current list of
|
||||
// cluster client URLs, as retrieved from etcd.
|
||||
func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {
|
||||
if c.managedDB == nil {
|
||||
@ -138,7 +138,7 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {
|
||||
for range t.C {
|
||||
newAddresses, err := c.managedDB.GetMembersClientURLs(ctx)
|
||||
if err != nil {
|
||||
logrus.Warnf("failed to get etcd client URLs: %v", err)
|
||||
logrus.Warnf("Failed to get etcd client URLs: %v", err)
|
||||
continue
|
||||
}
|
||||
// client URLs are a full URI, but the proxy only wants host:port
|
||||
@ -146,7 +146,7 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {
|
||||
for _, address := range newAddresses {
|
||||
u, err := url.Parse(address)
|
||||
if err != nil {
|
||||
logrus.Warnf("failed to parse etcd client URL: %v", err)
|
||||
logrus.Warnf("Failed to parse etcd client URL: %v", err)
|
||||
continue
|
||||
}
|
||||
hosts = append(hosts, u.Host)
|
||||
@ -162,7 +162,7 @@ func (c *Cluster) deleteNodePasswdSecret(ctx context.Context) {
|
||||
secretsClient := c.config.Runtime.Core.Core().V1().Secret()
|
||||
if err := nodepassword.Delete(secretsClient, nodeName); err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
logrus.Debugf("node password secret is not found for node %s", nodeName)
|
||||
logrus.Debugf("Node password secret is not found for node %s", nodeName)
|
||||
return
|
||||
}
|
||||
logrus.Warnf("failed to delete old node password secret: %v", err)
|
||||
|
@ -2,10 +2,18 @@ package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/k3s-io/k3s/pkg/agent/loadbalancer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
type Proxy interface {
|
||||
@ -15,9 +23,17 @@ type Proxy interface {
|
||||
ETCDServerURL() string
|
||||
}
|
||||
|
||||
var httpClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// NewETCDProxy initializes a new proxy structure that contain a load balancer
|
||||
// which listens on port 2379 and proxy between etcd cluster members
|
||||
func NewETCDProxy(ctx context.Context, enabled bool, dataDir, etcdURL string, isIPv6 bool) (Proxy, error) {
|
||||
func NewETCDProxy(ctx context.Context, supervisorPort int, dataDir, etcdURL string, isIPv6 bool) (Proxy, error) {
|
||||
u, err := url.Parse(etcdURL)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to parse etcd client URL")
|
||||
@ -27,16 +43,16 @@ func NewETCDProxy(ctx context.Context, enabled bool, dataDir, etcdURL string, is
|
||||
dataDir: dataDir,
|
||||
initialETCDURL: etcdURL,
|
||||
etcdURL: etcdURL,
|
||||
supervisorPort: supervisorPort,
|
||||
disconnect: map[string]context.CancelFunc{},
|
||||
}
|
||||
|
||||
if enabled {
|
||||
lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379, isIPv6)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.etcdLB = lb
|
||||
e.etcdLBURL = lb.LoadBalancerServerURL()
|
||||
lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379, isIPv6)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.etcdLB = lb
|
||||
e.etcdLBURL = lb.LoadBalancerServerURL()
|
||||
|
||||
e.fallbackETCDAddress = u.Host
|
||||
e.etcdPort = u.Port()
|
||||
@ -48,17 +64,34 @@ type etcdproxy struct {
|
||||
dataDir string
|
||||
etcdLBURL string
|
||||
|
||||
supervisorPort int
|
||||
initialETCDURL string
|
||||
etcdURL string
|
||||
etcdPort string
|
||||
fallbackETCDAddress string
|
||||
etcdAddresses []string
|
||||
etcdLB *loadbalancer.LoadBalancer
|
||||
disconnect map[string]context.CancelFunc
|
||||
}
|
||||
|
||||
func (e *etcdproxy) Update(addresses []string) {
|
||||
if e.etcdLB != nil {
|
||||
e.etcdLB.Update(addresses)
|
||||
e.etcdLB.Update(addresses)
|
||||
|
||||
validEndpoint := map[string]bool{}
|
||||
for _, address := range e.etcdLB.ServerAddresses {
|
||||
validEndpoint[address] = true
|
||||
if _, ok := e.disconnect[address]; !ok {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
e.disconnect[address] = cancel
|
||||
e.etcdLB.SetHealthCheck(address, e.createHealthCheck(ctx, address))
|
||||
}
|
||||
}
|
||||
|
||||
for address, cancel := range e.disconnect {
|
||||
if !validEndpoint[address] {
|
||||
cancel()
|
||||
delete(e.disconnect, address)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,3 +109,35 @@ func (e *etcdproxy) ETCDAddresses() []string {
|
||||
func (e *etcdproxy) ETCDServerURL() string {
|
||||
return e.etcdURL
|
||||
}
|
||||
|
||||
// start a polling routine that makes periodic requests to the etcd node's supervisor port.
|
||||
// If the request fails, the node is marked unhealthy.
|
||||
func (e etcdproxy) createHealthCheck(ctx context.Context, address string) func() bool {
|
||||
// Assume that the connection to the server will succeed, to avoid failing health checks while attempting to connect.
|
||||
// If we cannot connect, connected will be set to false when the initial connection attempt fails.
|
||||
connected := true
|
||||
|
||||
host, _, _ := net.SplitHostPort(address)
|
||||
url := fmt.Sprintf("https://%s/ping", net.JoinHostPort(host, strconv.Itoa(e.supervisorPort)))
|
||||
|
||||
go wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
defer cancel()
|
||||
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
resp, err := httpClient.Do(req)
|
||||
var statusCode int
|
||||
if resp != nil {
|
||||
statusCode = resp.StatusCode
|
||||
}
|
||||
if err != nil || statusCode != http.StatusOK {
|
||||
logrus.Debugf("Health check %s failed: %v (StatusCode: %d)", url, err, statusCode)
|
||||
connected = false
|
||||
} else {
|
||||
connected = true
|
||||
}
|
||||
}, 5*time.Second, 1.0, true)
|
||||
|
||||
return func() bool {
|
||||
return connected
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user