diff --git a/pkg/agent/config/config.go b/pkg/agent/config/config.go index c29cdcd95f..37d90da88d 100644 --- a/pkg/agent/config/config.go +++ b/pkg/agent/config/config.go @@ -406,6 +406,7 @@ func get(envInfo *cmds.Agent, proxy proxy.Proxy) (*config.Node, error) { SELinux: envInfo.EnableSELinux, ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint, FlannelBackend: controlConfig.FlannelBackend, + ServerHTTPSPort: controlConfig.HTTPSPort, } nodeConfig.FlannelIface = flannelIface nodeConfig.Images = filepath.Join(envInfo.DataDir, "agent", "images") diff --git a/pkg/agent/loadbalancer/loadbalancer.go b/pkg/agent/loadbalancer/loadbalancer.go index 134e38a4c2..01490781b4 100644 --- a/pkg/agent/loadbalancer/loadbalancer.go +++ b/pkg/agent/loadbalancer/loadbalancer.go @@ -5,6 +5,7 @@ import ( "errors" "net" "path/filepath" + "strconv" "sync" "github.com/google/tcpproxy" @@ -26,15 +27,19 @@ type LoadBalancer struct { randomServers []string currentServerAddress string nextServerIndex int + Listener net.Listener } +const RandomPort = 0 + var ( SupervisorServiceName = version.Program + "-agent-load-balancer" APIServerServiceName = version.Program + "-api-server-agent-load-balancer" + ETCDServerServiceName = version.Program + "-etcd-server-load-balancer" ) -func New(dataDir, serviceName, serverURL string) (_lb *LoadBalancer, _err error) { - listener, err := net.Listen("tcp", "127.0.0.1:0") +func New(dataDir, serviceName, serverURL string, lbServerPort int) (_lb *LoadBalancer, _err error) { + listener, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(lbServerPort)) defer func() { if _err != nil { logrus.Warnf("Error starting load balancer: %s", _err) diff --git a/pkg/agent/loadbalancer/loadbalancer_test.go b/pkg/agent/loadbalancer/loadbalancer_test.go index 41aab80865..44326366d9 100644 --- a/pkg/agent/loadbalancer/loadbalancer_test.go +++ b/pkg/agent/loadbalancer/loadbalancer_test.go @@ -105,7 +105,7 @@ func TestFailOver(t *testing.T) { DataDir: tmpDir, } - lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL) + lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort) if err != nil { assertEqual(t, err, nil) } @@ -156,7 +156,7 @@ func TestFailFast(t *testing.T) { DataDir: tmpDir, } - lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL) + lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort) if err != nil { assertEqual(t, err, nil) } diff --git a/pkg/agent/proxy/apiproxy.go b/pkg/agent/proxy/apiproxy.go index e818f2774c..573d217fb4 100644 --- a/pkg/agent/proxy/apiproxy.go +++ b/pkg/agent/proxy/apiproxy.go @@ -17,19 +17,21 @@ type Proxy interface { SupervisorURL() string SupervisorAddresses() []string APIServerURL() string + IsAPIServerLBEnabled() bool } -func NewAPIProxy(enabled bool, dataDir, supervisorURL string) (Proxy, error) { - p := &proxy{ +func NewAPIProxy(enabled bool, dataDir, supervisorURL string, lbServerPort int) (Proxy, error) { + p := proxy{ lbEnabled: enabled, dataDir: dataDir, initialSupervisorURL: supervisorURL, supervisorURL: supervisorURL, apiServerURL: supervisorURL, + lbServerPort: lbServerPort, } if enabled { - lb, err := loadbalancer.New(dataDir, loadbalancer.SupervisorServiceName, supervisorURL) + lb, err := loadbalancer.New(dataDir, loadbalancer.SupervisorServiceName, supervisorURL, p.lbServerPort) if err != nil { return nil, err } @@ -45,12 +47,13 @@ func NewAPIProxy(enabled bool, dataDir, supervisorURL string) (Proxy, error) { p.fallbackSupervisorAddress = u.Host p.supervisorPort = u.Port() - return p, nil + return &p, nil } type proxy struct { - dataDir string - lbEnabled bool + dataDir string + lbEnabled bool + lbServerPort int initialSupervisorURL string supervisorURL string @@ -71,14 +74,12 @@ func (p *proxy) Update(addresses []string) { if p.apiServerEnabled { supervisorAddresses = p.setSupervisorPort(supervisorAddresses) } - if p.apiServerLB != nil { p.apiServerLB.Update(apiServerAddresses) } if p.supervisorLB != nil { p.supervisorLB.Update(supervisorAddresses) } - p.supervisorAddresses = supervisorAddresses } @@ -106,7 +107,11 @@ func (p *proxy) StartAPIServerProxy(port int) error { p.apiServerEnabled = true if p.lbEnabled { - lb, err := loadbalancer.New(p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL) + lbServerPort := p.lbServerPort + if lbServerPort != 0 { + lbServerPort = lbServerPort + 1 + } + lb, err := loadbalancer.New(p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort) if err != nil { return err } @@ -131,3 +136,7 @@ func (p *proxy) SupervisorAddresses() []string { func (p *proxy) APIServerURL() string { return p.apiServerURL } + +func (p *proxy) IsAPIServerLBEnabled() bool { + return p.apiServerLB != nil +} diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 3749b13405..cd7d428fbe 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/ioutil" + "net/url" "os" "path/filepath" "strings" @@ -89,12 +90,7 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { return err } } - - if err := tunnel.Setup(ctx, nodeConfig, proxy); err != nil { - return err - } - - if err := agent.Agent(&nodeConfig.AgentConfig); err != nil { + if err := setupTunnelAndRunAgent(ctx, nodeConfig, cfg, proxy); err != nil { return err } @@ -148,7 +144,7 @@ func Run(ctx context.Context, cfg cmds.Agent) error { return err } - proxy, err := proxy.NewAPIProxy(!cfg.DisableLoadBalancer, agentDir, cfg.ServerURL) + proxy, err := proxy.NewAPIProxy(!cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort) if err != nil { return err } @@ -167,7 +163,6 @@ func Run(ctx context.Context, cfg cmds.Agent) error { cfg.Token = newToken.String() break } - systemd.SdNotify(true, "READY=1\n") return run(ctx, cfg, proxy) } @@ -300,3 +295,39 @@ func updateAddressLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string] result = labels.Merge(nodeLabels, result) return result, !equality.Semantic.DeepEqual(nodeLabels, result) } + +// setupTunnelAndRunAgent should start the setup tunnel before starting kubelet and kubeproxy +// there are special case for etcd agents, it will wait until it can find the apiaddress from +// the address channel and update the proxy with the servers addresses, if in rke2 we need to +// start the agent before the tunnel is setup to allow kubelet to start first and start the pods +func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error { + var agentRan bool + if cfg.ETCDAgent { + // only in rke2 run the agent before the tunnel setup and check for that later in the function + if proxy.IsAPIServerLBEnabled() { + if err := agent.Agent(&nodeConfig.AgentConfig); err != nil { + return err + } + agentRan = true + } + select { + case address := <-cfg.APIAddressCh: + cfg.ServerURL = address + u, err := url.Parse(cfg.ServerURL) + if err != nil { + logrus.Warn(err) + } + proxy.Update([]string{fmt.Sprintf("%s:%d", u.Hostname(), nodeConfig.ServerHTTPSPort)}) + case <-ctx.Done(): + return ctx.Err() + } + } + + if err := tunnel.Setup(ctx, nodeConfig, proxy); err != nil { + return err + } + if !agentRan { + return agent.Agent(&nodeConfig.AgentConfig) + } + return nil +} diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index 11cfa137d7..28e740662d 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -75,7 +75,10 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { endpoint, _ := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{}) if endpoint != nil { - proxy.Update(getAddresses(endpoint)) + addresses := getAddresses(endpoint) + if len(addresses) > 0 { + proxy.Update(getAddresses(endpoint)) + } } disconnect := map[string]context.CancelFunc{} diff --git a/pkg/apiaddresses/controller.go b/pkg/apiaddresses/controller.go new file mode 100644 index 0000000000..122e067aa6 --- /dev/null +++ b/pkg/apiaddresses/controller.go @@ -0,0 +1,87 @@ +package apiaddresses + +import ( + "bytes" + "context" + "encoding/json" + "net" + "strconv" + + "github.com/rancher/k3s/pkg/daemons/config" + "github.com/rancher/k3s/pkg/etcd" + "github.com/rancher/k3s/pkg/version" + controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" + etcdv3 "go.etcd.io/etcd/clientv3" + v1 "k8s.io/api/core/v1" +) + +type EndpointsControllerGetter func() controllerv1.EndpointsController + +func Register(ctx context.Context, runtime *config.ControlRuntime, endpoints controllerv1.EndpointsController) error { + h := &handler{ + endpointsController: endpoints, + runtime: runtime, + ctx: ctx, + } + endpoints.OnChange(ctx, version.Program+"-apiserver-lb-controller", h.sync) + + cl, err := etcd.GetClient(h.ctx, h.runtime, "https://127.0.0.1:2379") + if err != nil { + return err + } + + h.etcdClient = cl + + return nil +} + +type handler struct { + endpointsController controllerv1.EndpointsController + runtime *config.ControlRuntime + ctx context.Context + etcdClient *etcdv3.Client +} + +// This controller will update the version.program/apiaddresses etcd key with a list of +// api addresses endpoints found in the kubernetes service in the default namespace +func (h *handler) sync(key string, endpoint *v1.Endpoints) (*v1.Endpoints, error) { + if endpoint == nil { + return nil, nil + } + + if endpoint.Namespace != "default" && endpoint.Name != "kubernetes" { + return nil, nil + } + + w := &bytes.Buffer{} + if err := json.NewEncoder(w).Encode(getAddresses(endpoint)); err != nil { + return nil, err + } + + _, err := h.etcdClient.Put(h.ctx, etcd.AddressKey, w.String()) + if err != nil { + return nil, err + } + + return endpoint, nil +} + +func getAddresses(endpoint *v1.Endpoints) []string { + serverAddresses := []string{} + if endpoint == nil { + return serverAddresses + } + for _, subset := range endpoint.Subsets { + var port string + if len(subset.Ports) > 0 { + port = strconv.Itoa(int(subset.Ports[0].Port)) + } + if port == "" { + port = "443" + } + for _, address := range subset.Addresses { + serverAddresses = append(serverAddresses, net.JoinHostPort(address.IP, port)) + } + } + return serverAddresses +} diff --git a/pkg/cli/cmds/agent.go b/pkg/cli/cmds/agent.go index c9470ce8ee..3f031ff628 100644 --- a/pkg/cli/cmds/agent.go +++ b/pkg/cli/cmds/agent.go @@ -14,7 +14,10 @@ type Agent struct { TokenFile string ClusterSecret string ServerURL string + APIAddressCh chan string DisableLoadBalancer bool + ETCDAgent bool + LBServerPort int ResolvConf string DataDir string NodeIP string @@ -155,6 +158,14 @@ var ( Destination: &AgentConfig.EnableSELinux, EnvVar: version.ProgramUpper + "_SELINUX", } + LBServerPort = cli.IntFlag{ + Name: "lb-server-port", + Usage: "(agent/node) Internal Loadbalancer port", + Hidden: false, + Destination: &AgentConfig.LBServerPort, + EnvVar: version.ProgramUpper + "_LB_SERVER_PORT", + Value: 0, + } ) func CheckSELinuxFlags(ctx *cli.Context) error { diff --git a/pkg/cli/cmds/etcd.go b/pkg/cli/cmds/etcd.go deleted file mode 100644 index f51acb7bb4..0000000000 --- a/pkg/cli/cmds/etcd.go +++ /dev/null @@ -1,7 +0,0 @@ -// +build !no_etcd - -package cmds - -const ( - hideClusterFlags = false -) diff --git a/pkg/cli/cmds/server.go b/pkg/cli/cmds/server.go index 080124cc02..a7859b4cfd 100644 --- a/pkg/cli/cmds/server.go +++ b/pkg/cli/cmds/server.go @@ -10,6 +10,7 @@ import ( const ( defaultSnapshotRentention = 5 defaultSnapshotIntervalHours = 12 + hideClusterFlags = true ) type Server struct { @@ -54,6 +55,9 @@ type Server struct { DisableCCM bool DisableNPC bool DisableKubeProxy bool + DisableAPIServer bool + DisableControllerManager bool + DisableETCD bool ClusterInit bool ClusterReset bool ClusterResetRestorePath string @@ -280,6 +284,21 @@ func NewServerCommand(action func(*cli.Context) error) cli.Command { Usage: "(components) Disable " + version.Program + " default network policy controller", Destination: &ServerConfig.DisableNPC, }, + cli.BoolFlag{ + Name: "disable-api-server", + Usage: "(components) Disable running api server", + Destination: &ServerConfig.DisableAPIServer, + }, + cli.BoolFlag{ + Name: "disable-controller-manager", + Usage: "(components) Disable running kube-controller-manager", + Destination: &ServerConfig.DisableControllerManager, + }, + cli.BoolFlag{ + Name: "disable-etcd", + Usage: "(components) Disable running etcd", + Destination: &ServerConfig.DisableETCD, + }, NodeNameFlag, WithNodeIDFlag, NodeLabels, diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index 83c5bcdf1f..86558ac321 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "strings" + "time" systemd "github.com/coreos/go-systemd/daemon" "github.com/erikdubbelboer/gspt" @@ -14,6 +15,7 @@ import ( "github.com/rancher/k3s/pkg/agent" "github.com/rancher/k3s/pkg/cli/cmds" "github.com/rancher/k3s/pkg/datadir" + "github.com/rancher/k3s/pkg/etcd" "github.com/rancher/k3s/pkg/netutil" "github.com/rancher/k3s/pkg/rootless" "github.com/rancher/k3s/pkg/server" @@ -31,6 +33,10 @@ import ( _ "github.com/mattn/go-sqlite3" // ensure we have sqlite ) +const ( + lbServerPort = 6444 +) + func Run(app *cli.Context) error { if err := cmds.InitLogging(); err != nil { return err @@ -86,7 +92,6 @@ func run(app *cli.Context, cfg *cmds.Server) error { serverConfig.ControlConfig.DataDir = cfg.DataDir serverConfig.ControlConfig.KubeConfigOutput = cfg.KubeConfigOutput serverConfig.ControlConfig.KubeConfigMode = cfg.KubeConfigMode - serverConfig.ControlConfig.NoScheduler = cfg.DisableScheduler serverConfig.Rootless = cfg.Rootless serverConfig.ControlConfig.SANs = knownIPs(cfg.TLSSan) serverConfig.ControlConfig.BindAddress = cfg.BindAddress @@ -109,6 +114,10 @@ func run(app *cli.Context, cfg *cmds.Server) error { serverConfig.ControlConfig.DisableCCM = cfg.DisableCCM serverConfig.ControlConfig.DisableNPC = cfg.DisableNPC serverConfig.ControlConfig.DisableKubeProxy = cfg.DisableKubeProxy + serverConfig.ControlConfig.DisableETCD = cfg.DisableETCD + serverConfig.ControlConfig.DisableAPIServer = cfg.DisableAPIServer + serverConfig.ControlConfig.DisableScheduler = cfg.DisableScheduler + serverConfig.ControlConfig.DisableControllerManager = cfg.DisableControllerManager serverConfig.ControlConfig.ClusterInit = cfg.ClusterInit serverConfig.ControlConfig.EncryptSecrets = cfg.EncryptSecrets serverConfig.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName @@ -119,7 +128,7 @@ func run(app *cli.Context, cfg *cmds.Server) error { serverConfig.ControlConfig.EtcdExposeMetrics = cfg.EtcdExposeMetrics if cfg.ClusterResetRestorePath != "" && !cfg.ClusterReset { - return errors.New("Invalid flag use. --cluster-reset required with --cluster-reset-restore-path") + return errors.New("invalid flag use. --cluster-reset required with --cluster-reset-restore-path") } serverConfig.ControlConfig.ClusterReset = cfg.ClusterReset @@ -129,6 +138,13 @@ func run(app *cli.Context, cfg *cmds.Server) error { serverConfig.ControlConfig.SupervisorPort = serverConfig.ControlConfig.HTTPSPort } + if serverConfig.ControlConfig.DisableAPIServer { + serverConfig.ControlConfig.APIServerPort = lbServerPort + if serverConfig.ControlConfig.SupervisorPort != serverConfig.ControlConfig.HTTPSPort { + serverConfig.ControlConfig.APIServerPort = lbServerPort + 1 + } + } + if cmds.AgentConfig.FlannelIface != "" && cmds.AgentConfig.NodeIP == "" { cmds.AgentConfig.NodeIP = netutil.GetIPFromInterface(cmds.AgentConfig.FlannelIface) } @@ -245,6 +261,7 @@ func run(app *cli.Context, cfg *cmds.Server) error { os.Unsetenv("NOTIFY_SOCKET") ctx := signals.SetupSignalHandler(context.Background()) + if err := server.StartServer(ctx, &serverConfig); err != nil { return err } @@ -280,13 +297,24 @@ func run(app *cli.Context, cfg *cmds.Server) error { agentConfig.DataDir = filepath.Dir(serverConfig.ControlConfig.DataDir) agentConfig.ServerURL = url agentConfig.Token = token - agentConfig.DisableLoadBalancer = true + agentConfig.DisableLoadBalancer = !serverConfig.ControlConfig.DisableAPIServer + agentConfig.ETCDAgent = serverConfig.ControlConfig.DisableAPIServer + agentConfig.Rootless = cfg.Rootless + if agentConfig.Rootless { // let agent specify Rootless kubelet flags, but not unshare twice agentConfig.RootlessAlreadyUnshared = true } + if serverConfig.ControlConfig.DisableAPIServer { + // setting LBServerPort to a prespecified port to initialize the kubeconfigs with the right address + agentConfig.LBServerPort = lbServerPort + // initialize the apiAddress Channel for receiving the api address from etcd + agentConfig.APIAddressCh = make(chan string, 1) + setAPIAddressChannel(ctx, &serverConfig, &agentConfig) + defer close(agentConfig.APIAddressCh) + } return agent.Run(ctx, agentConfig) } @@ -311,3 +339,30 @@ func getArgValueFromList(searchArg string, argList []string) string { } return value } + +// setAPIAddressChannel will try to get the api address key from etcd and when it succeed it will +// set the APIAddressCh channel with its value, the function works for both k3s and rke2 in case +// of k3s we block returning back to the agent.Run until we get the api address, however in rke2 +// the code will not block operation and will run the operation in a goroutine +func setAPIAddressChannel(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) { + // start a goroutine to check for the server ip if set from etcd in case of rke2 + if serverConfig.ControlConfig.HTTPSPort != serverConfig.ControlConfig.SupervisorPort { + go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig) + return + } + getAPIAddressFromEtcd(ctx, serverConfig, agentConfig) +} + +func getAPIAddressFromEtcd(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) { + t := time.NewTicker(5 * time.Second) + defer t.Stop() + for range t.C { + serverAddress, err := etcd.GetAPIServerURLFromETCD(ctx, &serverConfig.ControlConfig) + if err == nil { + agentConfig.ServerURL = "https://" + serverAddress + agentConfig.APIAddressCh <- agentConfig.ServerURL + break + } + logrus.Warn(err) + } +} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 12310adb4f..1ffec23e62 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "net/url" "strings" "github.com/k3s-io/kine/pkg/client" @@ -10,6 +11,7 @@ import ( "github.com/rancher/k3s/pkg/clientaccess" "github.com/rancher/k3s/pkg/cluster/managed" "github.com/rancher/k3s/pkg/daemons/config" + "github.com/rancher/k3s/pkg/etcd" ) type Cluster struct { @@ -34,6 +36,31 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { return nil, errors.Wrap(err, "init cluster datastore and https") } + if c.config.DisableETCD { + ready := make(chan struct{}) + defer close(ready) + + // try to get /db/info urls first before attempting to use join url + clientURLs, _, err := etcd.ClientURLs(ctx, c.clientAccessInfo) + if err != nil { + return nil, err + } + if len(clientURLs) < 1 { + clientURL, err := url.Parse(c.config.JoinURL) + if err != nil { + return nil, err + } + clientURL.Host = clientURL.Hostname() + ":2379" + clientURLs = append(clientURLs, clientURL.String()) + } + etcdProxy, err := etcd.NewETCDProxy(true, c.config.DataDir, clientURLs[0]) + if err != nil { + return nil, err + } + c.setupEtcdProxy(ctx, etcdProxy) + return ready, nil + } + // start managed database (if necessary) if err := c.start(ctx); err != nil { return nil, errors.Wrap(err, "start managed database") diff --git a/pkg/cluster/managed.go b/pkg/cluster/managed.go index 542ae33514..6a9de29509 100644 --- a/pkg/cluster/managed.go +++ b/pkg/cluster/managed.go @@ -126,3 +126,23 @@ func (c *Cluster) assignManagedDriver(ctx context.Context) error { return nil } + +// setupEtcdProxy +func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) { + if c.managedDB == nil { + return + } + go func() { + t := time.NewTicker(30 * time.Second) + defer t.Stop() + for range t.C { + newAddresses, err := c.managedDB.GetMembersClientURLs(ctx) + if err != nil { + logrus.Warnf("failed to get etcd client URLs: %v", err) + continue + } + etcdProxy.Update(newAddresses) + + } + }() +} diff --git a/pkg/cluster/managed/drivers.go b/pkg/cluster/managed/drivers.go index a92c60c3e5..ac43c9e2d3 100644 --- a/pkg/cluster/managed/drivers.go +++ b/pkg/cluster/managed/drivers.go @@ -22,6 +22,7 @@ type Driver interface { Restore(ctx context.Context) error EndpointName() string Snapshot(ctx context.Context, config *config.Control) error + GetMembersClientURLs(ctx context.Context) ([]string, error) } func RegisterDriver(d Driver) { diff --git a/pkg/daemons/agent/agent.go b/pkg/daemons/agent/agent.go index d3b8a3a5b6..5c360557d9 100644 --- a/pkg/daemons/agent/agent.go +++ b/pkg/daemons/agent/agent.go @@ -30,7 +30,6 @@ func Agent(config *config.Agent) error { logs.InitLogs() defer logs.FlushLogs() - if err := startKubelet(config); err != nil { return err } diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index edad438d86..bc5888e43f 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -38,6 +38,7 @@ type Node struct { AgentConfig Agent CACerts []byte Certificate *tls.Certificate + ServerHTTPSPort int } type Containerd struct { @@ -115,7 +116,6 @@ type Control struct { Skips map[string]bool Disables map[string]bool Datastore endpoint.Config - NoScheduler bool ExtraAPIArgs []string ExtraControllerArgs []string ExtraCloudControllerArgs []string @@ -128,6 +128,10 @@ type Control struct { DisableCCM bool DisableNPC bool DisableKubeProxy bool + DisableAPIServer bool + DisableControllerManager bool + DisableScheduler bool + DisableETCD bool ClusterInit bool ClusterReset bool ClusterResetRestorePath string diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 86cc1c61ba..149ee72fde 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -95,15 +95,20 @@ func Server(ctx context.Context, cfg *config.Control) error { cfg.Runtime.Tunnel = setupTunnel() util.DisableProxyHostnameCheck = true - auth, handler, err := apiServer(ctx, cfg, runtime) - if err != nil { - return err - } + var auth authenticator.Request + var handler http.Handler + var err error - if err := waitForAPIServerInBackground(ctx, runtime); err != nil { - return err - } + if !cfg.DisableAPIServer { + auth, handler, err = apiServer(ctx, cfg, runtime) + if err != nil { + return err + } + if err := waitForAPIServerInBackground(ctx, runtime); err != nil { + return err + } + } basicAuth, err := basicAuthenticator(runtime.PasswdFile) if err != nil { return err @@ -112,14 +117,15 @@ func Server(ctx context.Context, cfg *config.Control) error { runtime.Authenticator = combineAuthenticators(basicAuth, auth) runtime.Handler = handler - if !cfg.NoScheduler { + if !cfg.DisableScheduler { if err := scheduler(cfg, runtime); err != nil { return err } } - - if err := controllerManager(cfg, runtime); err != nil { - return err + if !cfg.DisableControllerManager { + if err := controllerManager(cfg, runtime); err != nil { + return err + } } if !cfg.DisableCCM { @@ -976,7 +982,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *c select { case <-ctx.Done(): logrus.Fatalf("cloud-controller-manager context canceled: %v", ctx.Err()) - case <-time.After(time.Second): + case <-time.After(5 * time.Second): continue } } diff --git a/pkg/etcd/controller.go b/pkg/etcd/controller.go index 7e417efc44..5da364aafa 100644 --- a/pkg/etcd/controller.go +++ b/pkg/etcd/controller.go @@ -11,8 +11,8 @@ import ( ) const ( - nodeID = "etcd.k3s.cattle.io/node-name" - nodeAddress = "etcd.k3s.cattle.io/node-address" + NodeID = "etcd.k3s.cattle.io/node-name" + NodeAddress = "etcd.k3s.cattle.io/node-address" master = "node-role.kubernetes.io/master" controlPlane = "node-role.kubernetes.io/control-plane" etcdRole = "node-role.kubernetes.io/etcd" @@ -56,10 +56,11 @@ func (h *handler) sync(key string, node *v1.Node) (*v1.Node, error) { } func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) { - if node.Annotations[nodeID] == h.etcd.name && - node.Annotations[nodeAddress] == h.etcd.address && + if node.Annotations[NodeID] == h.etcd.name && + node.Annotations[NodeAddress] == h.etcd.address && node.Labels[etcdRole] == "true" && - node.Labels[controlPlane] == "true" { + node.Labels[controlPlane] == "true" || + h.etcd.config.DisableETCD { return node, nil } @@ -67,8 +68,8 @@ func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) { if node.Annotations == nil { node.Annotations = map[string]string{} } - node.Annotations[nodeID] = h.etcd.name - node.Annotations[nodeAddress] = h.etcd.address + node.Annotations[NodeID] = h.etcd.name + node.Annotations[NodeAddress] = h.etcd.address node.Labels[etcdRole] = "true" node.Labels[master] = "true" node.Labels[controlPlane] = "true" @@ -81,11 +82,10 @@ func (h *handler) onRemove(key string, node *v1.Node) (*v1.Node, error) { return node, nil } - id := node.Annotations[nodeID] - address := node.Annotations[nodeAddress] - if address == "" { + id := node.Annotations[NodeID] + address, ok := node.Annotations[NodeAddress] + if !ok { return node, nil } - return node, h.etcd.removePeer(h.ctx, id, address) } diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 0e220a7f9c..971d4aa412 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -58,14 +58,18 @@ func NewETCD() *ETCD { } } -var learnerProgressKey = version.Program + "/etcd/learnerProgress" +var ( + learnerProgressKey = version.Program + "/etcd/learnerProgress" + // AddressKey will contain the value of api addresses list + AddressKey = version.Program + "/apiaddresses" +) const ( snapshotPrefix = "etcd-snapshot-" endpoint = "https://127.0.0.1:2379" testTimeout = time.Second * 10 manageTickerTime = time.Second * 15 - learnerMaxStallTime = time.Minute * 1 + learnerMaxStallTime = time.Minute * 5 // defaultDialTimeout is intentionally short so that connections timeout within the testTimeout defined above defaultDialTimeout = 2 * time.Second @@ -206,11 +210,6 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e return errors.Wrapf(err, "configuration validation failed") } - e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error { - Register(ctx, e, e.config.Runtime.Core.Core().V1().Node()) - return nil - } - if !e.config.EtcdDisableSnapshots { e.setSnapshotFunction(ctx) e.cron.Start() @@ -246,12 +245,12 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e // join attempts to add a member to an existing cluster func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) error { - clientURLs, memberList, err := e.clientURLs(ctx, clientAccessInfo) + clientURLs, memberList, err := ClientURLs(ctx, clientAccessInfo) if err != nil { return err } - client, err := getClient(ctx, e.runtime, clientURLs...) + client, err := GetClient(ctx, e.runtime, clientURLs...) if err != nil { return err } @@ -316,13 +315,13 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt e.config = config e.runtime = config.Runtime - client, err := getClient(ctx, e.runtime, endpoint) + client, err := GetClient(ctx, e.runtime, endpoint) if err != nil { return nil, err } e.client = client - address, err := getAdvertiseAddress(config.PrivateIP) + address, err := GetAdvertiseAddress(config.PrivateIP) if err != nil { return nil, err } @@ -335,6 +334,10 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt if err := e.setName(false); err != nil { return nil, err } + e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error { + Register(ctx, e, e.config.Runtime.Core.Core().V1().Node()) + return nil + } tombstoneFile := filepath.Join(etcdDBDir(e.config), "tombstone") if _, err := os.Stat(tombstoneFile); err == nil { @@ -405,7 +408,7 @@ func (e *ETCD) infoHandler() http.Handler { } // getClient returns an etcd client connected to the specified endpoints -func getClient(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*etcd.Client, error) { +func GetClient(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*etcd.Client, error) { cfg, err := getClientConfig(ctx, runtime, endpoints...) if err != nil { return nil, err @@ -450,7 +453,7 @@ func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) { } // getAdvertiseAddress returns the IP address best suited for advertising to clients -func getAdvertiseAddress(advertiseIP string) (string, error) { +func GetAdvertiseAddress(advertiseIP string) (string, error) { ip := advertiseIP if ip == "" { ipAddr, err := utilnet.ChooseHostInterface() @@ -684,7 +687,7 @@ func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress) } // clientURLs returns a list of all non-learner etcd cluster member client access URLs -func (e *ETCD) clientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info) ([]string, Members, error) { +func ClientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info) ([]string, Members, error) { var memberList Members resp, err := clientaccess.Get("/db/info", clientAccessInfo) if err != nil { @@ -737,7 +740,7 @@ func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) err if e.config == nil { e.config = config } - client, err := getClient(ctx, e.config.Runtime, endpoint) + client, err := GetClient(ctx, e.config.Runtime, endpoint) if err != nil { return err } @@ -892,3 +895,49 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error) } return backupDir, nil } + +// GetAPIServerURLFromETCD will try to fetch the version.Program/apiaddresses key from etcd +// when it succeed it will parse the first address in the list and return back an address +func GetAPIServerURLFromETCD(ctx context.Context, cfg *config.Control) (string, error) { + if cfg.Runtime == nil { + return "", fmt.Errorf("runtime is not ready yet") + } + cl, err := GetClient(ctx, cfg.Runtime, endpoint) + if err != nil { + return "", err + } + etcdResp, err := cl.KV.Get(ctx, AddressKey) + if err != nil { + return "", err + } + + if etcdResp.Count < 1 { + return "", fmt.Errorf("servers addresses are not yet set") + } + var addresses []string + if err := json.Unmarshal(etcdResp.Kvs[0].Value, &addresses); err != nil { + return "", fmt.Errorf("failed to unmarshal etcd key: %v", err) + } + + return addresses[0], nil +} + +// GetMembersClientURLs will list through the member lists in etcd and return +// back a combined list of client urls for each member in the cluster +func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { + ctx, cancel := context.WithTimeout(ctx, testTimeout) + defer cancel() + + members, err := e.client.MemberList(ctx) + if err != nil { + return nil, err + } + + var memberUrls []string + for _, member := range members.Members { + for _, clientURL := range member.ClientURLs { + memberUrls = append(memberUrls, string(clientURL)) + } + } + return memberUrls, nil +} diff --git a/pkg/etcd/etcdproxy.go b/pkg/etcd/etcdproxy.go new file mode 100644 index 0000000000..4380635783 --- /dev/null +++ b/pkg/etcd/etcdproxy.go @@ -0,0 +1,76 @@ +package etcd + +import ( + "net/url" + + "github.com/pkg/errors" + "github.com/rancher/k3s/pkg/agent/loadbalancer" +) + +type Proxy interface { + Update(addresses []string) + ETCDURL() string + ETCDAddresses() []string + ETCDServerURL() string +} + +// NewETCDProxy initializes a new proxy structure that contain a load balancer +// which listens on port 2379 and proxy between etcd cluster members +func NewETCDProxy(enabled bool, dataDir, etcdURL string) (Proxy, error) { + e := &etcdproxy{ + dataDir: dataDir, + initialETCDURL: etcdURL, + etcdURL: etcdURL, + } + + if enabled { + lb, err := loadbalancer.New(dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379) + if err != nil { + return nil, err + } + e.etcdLB = lb + e.etcdLBURL = lb.LoadBalancerServerURL() + } + + u, err := url.Parse(e.initialETCDURL) + if err != nil { + return nil, errors.Wrap(err, "failed to parse etcd client URL") + } + e.fallbackETCDAddress = u.Host + e.etcdPort = u.Port() + + return e, nil +} + +type etcdproxy struct { + dataDir string + etcdLBURL string + + initialETCDURL string + etcdURL string + etcdPort string + fallbackETCDAddress string + etcdAddresses []string + etcdLB *loadbalancer.LoadBalancer +} + +func (e *etcdproxy) Update(addresses []string) { + if e.etcdLB != nil { + e.etcdLB.Update(addresses) + } +} + +func (e *etcdproxy) ETCDURL() string { + return e.etcdURL +} + +func (e *etcdproxy) ETCDAddresses() []string { + if len(e.etcdAddresses) > 0 { + return e.etcdAddresses + } + return []string{e.fallbackETCDAddress} +} + +func (e *etcdproxy) ETCDServerURL() string { + return e.etcdURL +} diff --git a/pkg/server/etcd.go b/pkg/server/etcd.go new file mode 100644 index 0000000000..9ed44711e8 --- /dev/null +++ b/pkg/server/etcd.go @@ -0,0 +1,108 @@ +package server + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/rancher/k3s/pkg/etcd" + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// setETCDLabelsAndAnnotations will set the etcd role label if not exists also it +// sets special annotaitons on the node object which are etcd node id and etcd node +// address, the function will also remove the controlplane and master role labels if +// they exist on the node +func setETCDLabelsAndAnnotations(ctx context.Context, config *Config) error { + t := time.NewTicker(5 * time.Second) + defer t.Stop() + for range t.C { + controlConfig := &config.ControlConfig + + sc, err := newContext(ctx, controlConfig.Runtime.KubeConfigAdmin) + if err != nil { + logrus.Infof("Failed to set etcd role label: %v", err) + continue + } + + if err := stageFiles(ctx, sc, controlConfig); err != nil { + logrus.Infof("Failed to set etcd role label: %v", err) + continue + } + + if err := sc.Start(ctx); err != nil { + logrus.Infof("Failed to set etcd role label: %v", err) + continue + } + + controlConfig.Runtime.Core = sc.Core + nodes := sc.Core.Core().V1().Node() + + nodeName := os.Getenv("NODE_NAME") + if nodeName == "" { + logrus.Info("Failed to set etcd role label: node name not set") + continue + } + node, err := nodes.Get(nodeName, metav1.GetOptions{}) + if err != nil { + logrus.Infof("Failed to set etcd role label: %v", err) + continue + } + + if node.Labels == nil { + node.Labels = make(map[string]string) + } + + // remove controlplane label if role label exists + var controlRoleLabelExists bool + if _, ok := node.Labels[MasterRoleLabelKey]; ok { + delete(node.Labels, MasterRoleLabelKey) + controlRoleLabelExists = true + } + if _, ok := node.Labels[ControlPlaneRoleLabelKey]; ok { + delete(node.Labels, ControlPlaneRoleLabelKey) + controlRoleLabelExists = true + } + + if v, ok := node.Labels[ETCDRoleLabelKey]; ok && v == "true" && !controlRoleLabelExists { + break + } + + node.Labels[ETCDRoleLabelKey] = "true" + + // this is replacement to the etcd controller handleself function + if node.Annotations == nil { + node.Annotations = map[string]string{} + } + fileName := filepath.Join(controlConfig.DataDir, "db", "etcd", "name") + + data, err := ioutil.ReadFile(fileName) + if err != nil { + logrus.Infof("Waiting for etcd node name file to be available: %v", err) + continue + } + etcdNodeName := string(data) + node.Annotations[etcd.NodeID] = etcdNodeName + + address, err := etcd.GetAdvertiseAddress(controlConfig.PrivateIP) + if err != nil { + logrus.Infof("Waiting for etcd node address to be available: %v", err) + continue + } + node.Annotations[etcd.NodeAddress] = address + + _, err = nodes.Update(node) + if err == nil { + logrus.Infof("Successfully set etcd role label and annotations on node %s", nodeName) + break + } + select { + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} diff --git a/pkg/server/server.go b/pkg/server/server.go index c13ac98612..f0f7505aa5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -17,6 +17,7 @@ import ( "github.com/k3s-io/helm-controller/pkg/helm" "github.com/pkg/errors" + "github.com/rancher/k3s/pkg/apiaddresses" "github.com/rancher/k3s/pkg/clientaccess" "github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/daemons/control" @@ -40,6 +41,7 @@ import ( const ( MasterRoleLabelKey = "node-role.kubernetes.io/master" ControlPlaneRoleLabelKey = "node-role.kubernetes.io/control-plane" + ETCDRoleLabelKey = "node-role.kubernetes.io/etcd" ) func ResolveDataDir(dataDir string) (string, error) { @@ -62,7 +64,11 @@ func StartServer(ctx context.Context, config *Config) error { config.ControlConfig.Runtime.Handler = router(ctx, config) - go startOnAPIServerReady(ctx, config) + if config.ControlConfig.DisableAPIServer { + go setETCDLabelsAndAnnotations(ctx, config) + } else { + go startOnAPIServerReady(ctx, config) + } for _, hook := range config.StartupHooks { if err := hook(ctx, config.ControlConfig.Runtime.APIServerReady, config.ControlConfig.Runtime.KubeConfigAdmin); err != nil { @@ -137,9 +143,8 @@ func runControllers(ctx context.Context, config *Config) error { panic(err) } } - if !config.DisableAgent { - go setControlPlaneRoleLabel(ctx, sc.Core.Core().V1().Node()) - } + + go setControlPlaneRoleLabel(ctx, sc.Core.Core().V1().Node(), config) go setClusterDNSConfig(ctx, config, sc.Core.Core().V1().ConfigMap()) @@ -185,6 +190,10 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error { return err } + if err := apiaddresses.Register(ctx, config.ControlConfig.Runtime, sc.Core.Core().V1().Endpoints()); err != nil { + return err + } + if config.Rootless { return rootlessports.Register(ctx, sc.Core.Core().V1().Service(), !config.DisableServiceLB, config.ControlConfig.HTTPSPort) } @@ -420,7 +429,10 @@ func isSymlink(config string) bool { return false } -func setControlPlaneRoleLabel(ctx context.Context, nodes v1.NodeClient) error { +func setControlPlaneRoleLabel(ctx context.Context, nodes v1.NodeClient, config *Config) error { + if config.DisableAgent || config.ControlConfig.DisableAPIServer { + return nil + } for { nodeName := os.Getenv("NODE_NAME") if nodeName == "" { @@ -434,7 +446,15 @@ func setControlPlaneRoleLabel(ctx context.Context, nodes v1.NodeClient) error { time.Sleep(1 * time.Second) continue } - if v, ok := node.Labels[ControlPlaneRoleLabelKey]; ok && v == "true" { + // remove etcd label if etcd is disabled + var etcdRoleLabelExists bool + if config.ControlConfig.DisableETCD { + if _, ok := node.Labels[ETCDRoleLabelKey]; ok { + delete(node.Labels, ETCDRoleLabelKey) + etcdRoleLabelExists = true + } + } + if v, ok := node.Labels[ControlPlaneRoleLabelKey]; ok && v == "true" && !etcdRoleLabelExists { break } if node.Labels == nil { @@ -442,6 +462,7 @@ func setControlPlaneRoleLabel(ctx context.Context, nodes v1.NodeClient) error { } node.Labels[ControlPlaneRoleLabelKey] = "true" node.Labels[MasterRoleLabelKey] = "true" + _, err = nodes.Update(node) if err == nil { logrus.Infof("Control-plane role label has been set successfully on node: %s", nodeName)