Fix adding etcd-only node to existing cluster

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2022-02-16 14:19:58 -08:00 committed by Brad Davidson
parent 7f4aa59014
commit 5014c9e0e8
13 changed files with 164 additions and 93 deletions

View File

@ -19,16 +19,17 @@ type LoadBalancer struct {
dialer *net.Dialer
proxy *tcpproxy.Proxy
configFile string
localAddress string
localServerURL string
originalServerAddress string
ServerURL string
ServerAddresses []string
randomServers []string
currentServerAddress string
nextServerIndex int
Listener net.Listener
serviceName string
configFile string
localAddress string
localServerURL string
defaultServerAddress string
ServerURL string
ServerAddresses []string
randomServers []string
currentServerAddress string
nextServerIndex int
Listener net.Listener
}
const RandomPort = 0
@ -55,26 +56,27 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo
}
localAddress := listener.Addr().String()
originalServerAddress, localServerURL, err := parseURL(serverURL, localAddress)
defaultServerAddress, localServerURL, err := parseURL(serverURL, localAddress)
if err != nil {
return nil, err
}
if serverURL == localServerURL {
logrus.Debugf("Initial server URL for load balancer points at local server URL - starting with empty original server address")
originalServerAddress = ""
logrus.Debugf("Initial server URL for load balancer %s points at local server URL - starting with empty default server address", serviceName)
defaultServerAddress = ""
}
lb := &LoadBalancer{
dialer: &net.Dialer{},
configFile: filepath.Join(dataDir, "etc", serviceName+".json"),
localAddress: localAddress,
localServerURL: localServerURL,
originalServerAddress: originalServerAddress,
ServerURL: serverURL,
serviceName: serviceName,
dialer: &net.Dialer{},
configFile: filepath.Join(dataDir, "etc", serviceName+".json"),
localAddress: localAddress,
localServerURL: localServerURL,
defaultServerAddress: defaultServerAddress,
ServerURL: serverURL,
}
lb.setServers([]string{lb.originalServerAddress})
lb.setServers([]string{lb.defaultServerAddress})
lb.proxy = &tcpproxy.Proxy{
ListenFunc: func(string, string) (net.Listener, error) {
@ -93,11 +95,16 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo
if err := lb.proxy.Start(); err != nil {
return nil, err
}
logrus.Infof("Running load balancer %s -> %v", lb.localAddress, lb.randomServers)
logrus.Infof("Running load balancer %s %s -> %v", serviceName, lb.localAddress, lb.randomServers)
return lb, nil
}
func (lb *LoadBalancer) SetDefault(serverAddress string) {
logrus.Infof("Updating load balancer %s default server address -> %s", lb.serviceName, serverAddress)
lb.defaultServerAddress = serverAddress
}
func (lb *LoadBalancer) Update(serverAddresses []string) {
if lb == nil {
return
@ -105,10 +112,10 @@ func (lb *LoadBalancer) Update(serverAddresses []string) {
if !lb.setServers(serverAddresses) {
return
}
logrus.Infof("Updating load balancer server addresses -> %v", lb.randomServers)
logrus.Infof("Updating load balancer %s server addresses -> %v", lb.serviceName, lb.randomServers)
if err := lb.writeConfig(); err != nil {
logrus.Warnf("Error updating load balancer config: %s", err)
logrus.Warnf("Error updating load balancer %s config: %s", lb.serviceName, err)
}
}
@ -128,14 +135,14 @@ func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string
if err == nil {
return conn, nil
}
logrus.Debugf("Dial error from load balancer: %s", err)
logrus.Debugf("Dial error from load balancer %s: %s", lb.serviceName, err)
newServer, err := lb.nextServer(targetServer)
if err != nil {
return nil, err
}
if targetServer != newServer {
logrus.Debugf("Dial server in load balancer failed over to %s", newServer)
logrus.Debugf("Dial server in load balancer %s failed over to %s", lb.serviceName, newServer)
}
if ctx.Err() != nil {
return nil, ctx.Err()
@ -156,7 +163,7 @@ func onDialError(src net.Conn, dstDialErr error) {
src.Close()
}
// ResetLoadBalancer will delete the local state file for the load balacner on disk
// ResetLoadBalancer will delete the local state file for the load balancer on disk
func ResetLoadBalancer(dataDir, serviceName string) error {
stateFile := filepath.Join(dataDir, "etc", serviceName+".json")
if err := os.Remove(stateFile); err != nil {

View File

@ -7,7 +7,7 @@ import (
)
func (lb *LoadBalancer) setServers(serverAddresses []string) bool {
serverAddresses, hasOriginalServer := sortServers(serverAddresses, lb.originalServerAddress)
serverAddresses, hasOriginalServer := sortServers(serverAddresses, lb.defaultServerAddress)
if len(serverAddresses) == 0 {
return false
}
@ -25,7 +25,7 @@ func (lb *LoadBalancer) setServers(serverAddresses []string) bool {
lb.randomServers[i], lb.randomServers[j] = lb.randomServers[j], lb.randomServers[i]
})
if !hasOriginalServer {
lb.randomServers = append(lb.randomServers, lb.originalServerAddress)
lb.randomServers = append(lb.randomServers, lb.defaultServerAddress)
}
lb.currentServerAddress = lb.randomServers[0]
lb.nextServerIndex = 1

View File

@ -15,6 +15,7 @@ import (
type Proxy interface {
Update(addresses []string)
SetAPIServerPort(ctx context.Context, port int) error
SetSupervisorDefault(address string)
SupervisorURL() string
SupervisorAddresses() []string
APIServerURL() string
@ -135,6 +136,28 @@ func (p *proxy) SetAPIServerPort(ctx context.Context, port int) error {
return nil
}
// SetSupervisorDefault updates the default (fallback) address for the connection to the
// supervisor. This is most useful on k3s nodes without apiservers, where the local
// supervisor must be used to bootstrap the agent config, but then switched over to
// another node running an apiserver once one is available.
func (p *proxy) SetSupervisorDefault(address string) {
host, port, err := sysnet.SplitHostPort(address)
if err != nil {
logrus.Errorf("Failed to parse address %s, dropping: %v", address, err)
return
}
if p.apiServerEnabled {
port = p.supervisorPort
address = sysnet.JoinHostPort(host, port)
}
p.fallbackSupervisorAddress = address
if p.supervisorLB == nil {
p.supervisorURL = "https://" + address
} else {
p.supervisorLB.SetDefault(address)
}
}
func (p *proxy) SupervisorURL() string {
return p.supervisorURL
}

View File

@ -3,9 +3,10 @@ package agent
import (
"context"
"fmt"
"net/url"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"time"
@ -350,17 +351,8 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node,
}
agentRan = true
}
select {
case address := <-cfg.APIAddressCh:
cfg.ServerURL = address
u, err := url.Parse(cfg.ServerURL)
if err != nil {
logrus.Warn(err)
}
proxy.Update([]string{fmt.Sprintf("%s:%d", u.Hostname(), nodeConfig.ServerHTTPSPort)})
case <-ctx.Done():
return ctx.Err()
if err := waitForAPIServerAddresses(ctx, nodeConfig, cfg, proxy); err != nil {
return err
}
} else if cfg.ClusterReset && proxy.IsAPIServerLBEnabled() {
// If we're doing a cluster-reset on RKE2, the kubelet needs to be started early to clean
@ -379,3 +371,26 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node,
}
return nil
}
func waitForAPIServerAddresses(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
for {
select {
case <-time.After(5 * time.Second):
logrus.Info("Waiting for apiserver addresses")
case addresses := <-cfg.APIAddressCh:
for i, a := range addresses {
host, _, err := net.SplitHostPort(a)
if err == nil {
addresses[i] = net.JoinHostPort(host, strconv.Itoa(nodeConfig.ServerHTTPSPort))
if i == 0 {
proxy.SetSupervisorDefault(addresses[i])
}
}
}
proxy.Update(addresses)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}

View File

@ -14,7 +14,7 @@ type Agent struct {
TokenFile string
ClusterSecret string
ServerURL string
APIAddressCh chan string
APIAddressCh chan []string
DisableLoadBalancer bool
DisableServiceLB bool
ETCDAgent bool

View File

@ -394,13 +394,6 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.DisableScheduler = true
serverConfig.ControlConfig.DisableCCM = true
// only close the agentReady channel in case of k3s restoration, because k3s does not start
// the agent until server returns successfully, unlike rke2's agent which starts in parallel
// with the server
if serverConfig.ControlConfig.SupervisorPort == serverConfig.ControlConfig.HTTPSPort {
close(agentReady)
}
dataDir, err := datadir.LocalHome(cfg.DataDir, false)
if err != nil {
return err
@ -484,10 +477,12 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
}
if serverConfig.ControlConfig.DisableAPIServer {
if cfg.ServerURL != "" {
agentConfig.ServerURL = cfg.ServerURL
}
// initialize the apiAddress Channel for receiving the api address from etcd
agentConfig.APIAddressCh = make(chan string, 1)
setAPIAddressChannel(ctx, &serverConfig, &agentConfig)
defer close(agentConfig.APIAddressCh)
agentConfig.APIAddressCh = make(chan []string)
go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
}
return agent.Run(ctx, agentConfig)
}
@ -533,29 +528,19 @@ func getArgValueFromList(searchArg string, argList []string) string {
return value
}
// setAPIAddressChannel will try to get the api address key from etcd and when it succeed it will
// set the APIAddressCh channel with its value, the function works for both k3s and rke2 in case
// of k3s we block returning back to the agent.Run until we get the api address, however in rke2
// the code will not block operation and will run the operation in a goroutine
func setAPIAddressChannel(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) {
// start a goroutine to check for the server ip if set from etcd in case of rke2
if serverConfig.ControlConfig.HTTPSPort != serverConfig.ControlConfig.SupervisorPort {
go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
return
}
getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
}
func getAPIAddressFromEtcd(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
serverAddress, err := etcd.GetAPIServerURLFromETCD(ctx, &serverConfig.ControlConfig)
if err == nil {
agentConfig.ServerURL = "https://" + serverAddress
agentConfig.APIAddressCh <- agentConfig.ServerURL
func getAPIAddressFromEtcd(ctx context.Context, serverConfig server.Config, agentConfig cmds.Agent) {
defer close(agentConfig.APIAddressCh)
for {
toCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
serverAddresses, err := etcd.GetAPIServerURLsFromETCD(toCtx, &serverConfig.ControlConfig)
if err == nil && len(serverAddresses) > 0 {
agentConfig.APIAddressCh <- serverAddresses
break
}
logrus.Warn(err)
if !errors.Is(err, etcd.ErrAddressNotSet) {
logrus.Warnf("Failed to get apiserver address from etcd: %v", err)
}
<-toCtx.Done()
}
}

View File

@ -356,6 +356,7 @@ func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker,
buf.Seek(0, 0)
}
logrus.Debugf("One or more certificate directories do not exist; writing data to disk from datastore")
return bootstrap.WriteToDiskFromStorage(files, crb)
}

View File

@ -55,10 +55,12 @@ func Server(ctx context.Context, cfg *config.Control) error {
if err := apiServer(ctx, cfg); err != nil {
return err
}
}
if err := waitForAPIServerInBackground(ctx, cfg.Runtime); err != nil {
return err
}
// Wait for an apiserver to become available before starting additional controllers,
// even if we're not running an apiserver locally.
if err := waitForAPIServerInBackground(ctx, cfg.Runtime); err != nil {
return err
}
if !cfg.DisableScheduler {

View File

@ -76,6 +76,8 @@ var (
NodeNameAnnotation = "etcd." + version.Program + ".cattle.io/node-name"
NodeAddressAnnotation = "etcd." + version.Program + ".cattle.io/node-address"
ErrAddressNotSet = errors.New("apiserver addresses not yet set")
)
type NodeControllerGetter func() controllerv1.NodeController
@ -283,7 +285,7 @@ func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
// Start starts the datastore
func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
existingCluster, err := e.IsInitialized(ctx, e.config)
isInitialized, err := e.IsInitialized(ctx, e.config)
if err != nil {
return errors.Wrapf(err, "configuration validation failed")
}
@ -295,7 +297,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
go e.manageLearners(ctx)
if existingCluster {
if isInitialized {
//check etcd dir permission
etcdDir := DBDir(e.config)
info, err := os.Stat(etcdDir)
@ -319,9 +321,18 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
}
go func() {
<-e.config.Runtime.AgentReady
if err := e.join(ctx, clientAccessInfo); err != nil {
logrus.Fatalf("ETCD join failed: %v", err)
for {
select {
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for agent to become ready before joining ETCD cluster")
case <-e.config.Runtime.AgentReady:
if err := e.join(ctx, clientAccessInfo); err != nil {
logrus.Fatalf("ETCD join failed: %v", err)
}
return
case <-ctx.Done():
return
}
}
}()
@ -1786,27 +1797,27 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error)
return backupDir, nil
}
// GetAPIServerURLFromETCD will try to fetch the version.Program/apiaddresses key from etcd
// when it succeed it will parse the first address in the list and return back an address
func GetAPIServerURLFromETCD(ctx context.Context, cfg *config.Control) (string, error) {
// GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd
func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) {
cl, err := GetClient(ctx, cfg.Runtime, endpoint)
if err != nil {
return "", err
return nil, err
}
etcdResp, err := cl.KV.Get(ctx, AddressKey)
if err != nil {
return "", err
return nil, err
}
if etcdResp.Count < 1 {
return "", fmt.Errorf("servers addresses are not yet set")
if etcdResp.Count == 0 || len(etcdResp.Kvs[0].Value) == 0 {
return nil, ErrAddressNotSet
}
var addresses []string
if err := json.Unmarshal(etcdResp.Kvs[0].Value, &addresses); err != nil {
return "", fmt.Errorf("failed to unmarshal etcd key: %v", err)
return nil, fmt.Errorf("failed to unmarshal apiserver addresses from etcd: %v", err)
}
return addresses[0], nil
return addresses, nil
}
// GetMembersClientURLs will list through the member lists in etcd and return

View File

@ -13,10 +13,11 @@ import (
)
// setETCDLabelsAndAnnotations will set the etcd role label if not exists also it
// sets special annotaitons on the node object which are etcd node id and etcd node
// sets special annotations on the node object which are etcd node id and etcd node
// address, the function will also remove the controlplane and master role labels if
// they exist on the node
func setETCDLabelsAndAnnotations(ctx context.Context, config *Config) error {
<-config.ControlConfig.Runtime.APIServerReady
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {

View File

@ -39,7 +39,6 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler
prefix := "/v1-" + version.Program
authed := mux.NewRouter()
authed.Use(authMiddleware(serverConfig, version.Program+":agent"))
authed.NotFoundHandler = apiserver(serverConfig.Runtime)
authed.Path(prefix + "/serving-kubelet.crt").Handler(servingKubeletCert(serverConfig, serverConfig.Runtime.ServingKubeletKey, nodeAuth))
authed.Path(prefix + "/client-kubelet.crt").Handler(clientKubeletCert(serverConfig, serverConfig.Runtime.ClientKubeletKey, nodeAuth))
authed.Path(prefix + "/client-kube-proxy.crt").Handler(fileHandler(serverConfig.Runtime.ClientKubeProxyCert, serverConfig.Runtime.ClientKubeProxyKey))
@ -49,6 +48,12 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler
authed.Path(prefix + "/config").Handler(configHandler(serverConfig, cfg))
authed.Path(prefix + "/readyz").Handler(readyzHandler(serverConfig))
if cfg.DisableAPIServer {
authed.NotFoundHandler = apiserverDisabled()
} else {
authed.NotFoundHandler = apiserver(serverConfig.Runtime)
}
nodeAuthed := mux.NewRouter()
nodeAuthed.Use(authMiddleware(serverConfig, "system:nodes"))
nodeAuthed.Path(prefix + "/connect").Handler(serverConfig.Runtime.Tunnel)
@ -88,6 +93,16 @@ func apiserver(runtime *config.ControlRuntime) http.Handler {
})
}
func apiserverDisabled() http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
data := []byte("apiserver disabled")
resp.WriteHeader(http.StatusServiceUnavailable)
resp.Header().Set("Content-Type", "text/plain")
resp.Header().Set("Content-length", strconv.Itoa(len(data)))
resp.Write(data)
})
}
func cacerts(serverCA string) http.Handler {
var ca []byte
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {

View File

@ -366,7 +366,13 @@ func writeKubeConfig(certs string, config *Config) error {
if ip == "" {
ip = "127.0.0.1"
}
url := fmt.Sprintf("https://%s:%d", ip, config.ControlConfig.HTTPSPort)
port := config.ControlConfig.HTTPSPort
// on servers without a local apiserver, tunnel access via the loadbalancer
if config.ControlConfig.DisableAPIServer {
ip = "127.0.0.1"
port = config.ControlConfig.APIServerPort
}
url := fmt.Sprintf("https://%s:%d", ip, port)
kubeConfig, err := HomeKubeConfig(true, config.Rootless)
def := true
if err != nil {

View File

@ -18,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
coregetter "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
)
@ -56,6 +57,10 @@ func WaitForAPIServerReady(ctx context.Context, client clientset.Interface, time
err := wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) {
healthStatus := 0
// Idle connections to the apiserver are returned to a global pool between requests. Explicitly
// close these idle connections so that we re-connect through the loadbalancer in case the endpoints
// have changed.
restClient.(*rest.RESTClient).Client.CloseIdleConnections()
result := restClient.Get().AbsPath("/readyz").Do(ctx).StatusCode(&healthStatus)
if rerr := result.Error(); rerr != nil {
lastErr = errors.Wrap(rerr, "failed to get apiserver /readyz status")