Add disable flags for control components (#2900)

* Add disable flags to control components

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* golint

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* more fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fixes to disable flags

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Add comments to functions

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Fix joining problem

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* more fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* golint

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fix ticker

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fix role labels

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* more fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
This commit is contained in:
Hussein Galal 2021-02-12 17:35:57 +02:00 committed by GitHub
parent 21d1690d5d
commit 5749f66aa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 603 additions and 78 deletions

View File

@ -406,6 +406,7 @@ func get(envInfo *cmds.Agent, proxy proxy.Proxy) (*config.Node, error) {
SELinux: envInfo.EnableSELinux,
ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint,
FlannelBackend: controlConfig.FlannelBackend,
ServerHTTPSPort: controlConfig.HTTPSPort,
}
nodeConfig.FlannelIface = flannelIface
nodeConfig.Images = filepath.Join(envInfo.DataDir, "agent", "images")

View File

@ -5,6 +5,7 @@ import (
"errors"
"net"
"path/filepath"
"strconv"
"sync"
"github.com/google/tcpproxy"
@ -26,15 +27,19 @@ type LoadBalancer struct {
randomServers []string
currentServerAddress string
nextServerIndex int
Listener net.Listener
}
const RandomPort = 0
var (
SupervisorServiceName = version.Program + "-agent-load-balancer"
APIServerServiceName = version.Program + "-api-server-agent-load-balancer"
ETCDServerServiceName = version.Program + "-etcd-server-load-balancer"
)
func New(dataDir, serviceName, serverURL string) (_lb *LoadBalancer, _err error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
func New(dataDir, serviceName, serverURL string, lbServerPort int) (_lb *LoadBalancer, _err error) {
listener, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(lbServerPort))
defer func() {
if _err != nil {
logrus.Warnf("Error starting load balancer: %s", _err)

View File

@ -105,7 +105,7 @@ func TestFailOver(t *testing.T) {
DataDir: tmpDir,
}
lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL)
lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort)
if err != nil {
assertEqual(t, err, nil)
}
@ -156,7 +156,7 @@ func TestFailFast(t *testing.T) {
DataDir: tmpDir,
}
lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL)
lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort)
if err != nil {
assertEqual(t, err, nil)
}

View File

@ -17,19 +17,21 @@ type Proxy interface {
SupervisorURL() string
SupervisorAddresses() []string
APIServerURL() string
IsAPIServerLBEnabled() bool
}
func NewAPIProxy(enabled bool, dataDir, supervisorURL string) (Proxy, error) {
p := &proxy{
func NewAPIProxy(enabled bool, dataDir, supervisorURL string, lbServerPort int) (Proxy, error) {
p := proxy{
lbEnabled: enabled,
dataDir: dataDir,
initialSupervisorURL: supervisorURL,
supervisorURL: supervisorURL,
apiServerURL: supervisorURL,
lbServerPort: lbServerPort,
}
if enabled {
lb, err := loadbalancer.New(dataDir, loadbalancer.SupervisorServiceName, supervisorURL)
lb, err := loadbalancer.New(dataDir, loadbalancer.SupervisorServiceName, supervisorURL, p.lbServerPort)
if err != nil {
return nil, err
}
@ -45,12 +47,13 @@ func NewAPIProxy(enabled bool, dataDir, supervisorURL string) (Proxy, error) {
p.fallbackSupervisorAddress = u.Host
p.supervisorPort = u.Port()
return p, nil
return &p, nil
}
type proxy struct {
dataDir string
lbEnabled bool
dataDir string
lbEnabled bool
lbServerPort int
initialSupervisorURL string
supervisorURL string
@ -71,14 +74,12 @@ func (p *proxy) Update(addresses []string) {
if p.apiServerEnabled {
supervisorAddresses = p.setSupervisorPort(supervisorAddresses)
}
if p.apiServerLB != nil {
p.apiServerLB.Update(apiServerAddresses)
}
if p.supervisorLB != nil {
p.supervisorLB.Update(supervisorAddresses)
}
p.supervisorAddresses = supervisorAddresses
}
@ -106,7 +107,11 @@ func (p *proxy) StartAPIServerProxy(port int) error {
p.apiServerEnabled = true
if p.lbEnabled {
lb, err := loadbalancer.New(p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL)
lbServerPort := p.lbServerPort
if lbServerPort != 0 {
lbServerPort = lbServerPort + 1
}
lb, err := loadbalancer.New(p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort)
if err != nil {
return err
}
@ -131,3 +136,7 @@ func (p *proxy) SupervisorAddresses() []string {
func (p *proxy) APIServerURL() string {
return p.apiServerURL
}
func (p *proxy) IsAPIServerLBEnabled() bool {
return p.apiServerLB != nil
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
@ -89,12 +90,7 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
return err
}
}
if err := tunnel.Setup(ctx, nodeConfig, proxy); err != nil {
return err
}
if err := agent.Agent(&nodeConfig.AgentConfig); err != nil {
if err := setupTunnelAndRunAgent(ctx, nodeConfig, cfg, proxy); err != nil {
return err
}
@ -148,7 +144,7 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
return err
}
proxy, err := proxy.NewAPIProxy(!cfg.DisableLoadBalancer, agentDir, cfg.ServerURL)
proxy, err := proxy.NewAPIProxy(!cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort)
if err != nil {
return err
}
@ -167,7 +163,6 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
cfg.Token = newToken.String()
break
}
systemd.SdNotify(true, "READY=1\n")
return run(ctx, cfg, proxy)
}
@ -300,3 +295,39 @@ func updateAddressLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string]
result = labels.Merge(nodeLabels, result)
return result, !equality.Semantic.DeepEqual(nodeLabels, result)
}
// setupTunnelAndRunAgent should start the setup tunnel before starting kubelet and kubeproxy
// there are special case for etcd agents, it will wait until it can find the apiaddress from
// the address channel and update the proxy with the servers addresses, if in rke2 we need to
// start the agent before the tunnel is setup to allow kubelet to start first and start the pods
func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
var agentRan bool
if cfg.ETCDAgent {
// only in rke2 run the agent before the tunnel setup and check for that later in the function
if proxy.IsAPIServerLBEnabled() {
if err := agent.Agent(&nodeConfig.AgentConfig); err != nil {
return err
}
agentRan = true
}
select {
case address := <-cfg.APIAddressCh:
cfg.ServerURL = address
u, err := url.Parse(cfg.ServerURL)
if err != nil {
logrus.Warn(err)
}
proxy.Update([]string{fmt.Sprintf("%s:%d", u.Hostname(), nodeConfig.ServerHTTPSPort)})
case <-ctx.Done():
return ctx.Err()
}
}
if err := tunnel.Setup(ctx, nodeConfig, proxy); err != nil {
return err
}
if !agentRan {
return agent.Agent(&nodeConfig.AgentConfig)
}
return nil
}

View File

@ -75,7 +75,10 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
endpoint, _ := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
if endpoint != nil {
proxy.Update(getAddresses(endpoint))
addresses := getAddresses(endpoint)
if len(addresses) > 0 {
proxy.Update(getAddresses(endpoint))
}
}
disconnect := map[string]context.CancelFunc{}

View File

@ -0,0 +1,87 @@
package apiaddresses
import (
"bytes"
"context"
"encoding/json"
"net"
"strconv"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/etcd"
"github.com/rancher/k3s/pkg/version"
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
etcdv3 "go.etcd.io/etcd/clientv3"
v1 "k8s.io/api/core/v1"
)
type EndpointsControllerGetter func() controllerv1.EndpointsController
func Register(ctx context.Context, runtime *config.ControlRuntime, endpoints controllerv1.EndpointsController) error {
h := &handler{
endpointsController: endpoints,
runtime: runtime,
ctx: ctx,
}
endpoints.OnChange(ctx, version.Program+"-apiserver-lb-controller", h.sync)
cl, err := etcd.GetClient(h.ctx, h.runtime, "https://127.0.0.1:2379")
if err != nil {
return err
}
h.etcdClient = cl
return nil
}
type handler struct {
endpointsController controllerv1.EndpointsController
runtime *config.ControlRuntime
ctx context.Context
etcdClient *etcdv3.Client
}
// This controller will update the version.program/apiaddresses etcd key with a list of
// api addresses endpoints found in the kubernetes service in the default namespace
func (h *handler) sync(key string, endpoint *v1.Endpoints) (*v1.Endpoints, error) {
if endpoint == nil {
return nil, nil
}
if endpoint.Namespace != "default" && endpoint.Name != "kubernetes" {
return nil, nil
}
w := &bytes.Buffer{}
if err := json.NewEncoder(w).Encode(getAddresses(endpoint)); err != nil {
return nil, err
}
_, err := h.etcdClient.Put(h.ctx, etcd.AddressKey, w.String())
if err != nil {
return nil, err
}
return endpoint, nil
}
func getAddresses(endpoint *v1.Endpoints) []string {
serverAddresses := []string{}
if endpoint == nil {
return serverAddresses
}
for _, subset := range endpoint.Subsets {
var port string
if len(subset.Ports) > 0 {
port = strconv.Itoa(int(subset.Ports[0].Port))
}
if port == "" {
port = "443"
}
for _, address := range subset.Addresses {
serverAddresses = append(serverAddresses, net.JoinHostPort(address.IP, port))
}
}
return serverAddresses
}

View File

@ -14,7 +14,10 @@ type Agent struct {
TokenFile string
ClusterSecret string
ServerURL string
APIAddressCh chan string
DisableLoadBalancer bool
ETCDAgent bool
LBServerPort int
ResolvConf string
DataDir string
NodeIP string
@ -155,6 +158,14 @@ var (
Destination: &AgentConfig.EnableSELinux,
EnvVar: version.ProgramUpper + "_SELINUX",
}
LBServerPort = cli.IntFlag{
Name: "lb-server-port",
Usage: "(agent/node) Internal Loadbalancer port",
Hidden: false,
Destination: &AgentConfig.LBServerPort,
EnvVar: version.ProgramUpper + "_LB_SERVER_PORT",
Value: 0,
}
)
func CheckSELinuxFlags(ctx *cli.Context) error {

View File

@ -1,7 +0,0 @@
// +build !no_etcd
package cmds
const (
hideClusterFlags = false
)

View File

@ -10,6 +10,7 @@ import (
const (
defaultSnapshotRentention = 5
defaultSnapshotIntervalHours = 12
hideClusterFlags = true
)
type Server struct {
@ -54,6 +55,9 @@ type Server struct {
DisableCCM bool
DisableNPC bool
DisableKubeProxy bool
DisableAPIServer bool
DisableControllerManager bool
DisableETCD bool
ClusterInit bool
ClusterReset bool
ClusterResetRestorePath string
@ -280,6 +284,21 @@ func NewServerCommand(action func(*cli.Context) error) cli.Command {
Usage: "(components) Disable " + version.Program + " default network policy controller",
Destination: &ServerConfig.DisableNPC,
},
cli.BoolFlag{
Name: "disable-api-server",
Usage: "(components) Disable running api server",
Destination: &ServerConfig.DisableAPIServer,
},
cli.BoolFlag{
Name: "disable-controller-manager",
Usage: "(components) Disable running kube-controller-manager",
Destination: &ServerConfig.DisableControllerManager,
},
cli.BoolFlag{
Name: "disable-etcd",
Usage: "(components) Disable running etcd",
Destination: &ServerConfig.DisableETCD,
},
NodeNameFlag,
WithNodeIDFlag,
NodeLabels,

View File

@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"strings"
"time"
systemd "github.com/coreos/go-systemd/daemon"
"github.com/erikdubbelboer/gspt"
@ -14,6 +15,7 @@ import (
"github.com/rancher/k3s/pkg/agent"
"github.com/rancher/k3s/pkg/cli/cmds"
"github.com/rancher/k3s/pkg/datadir"
"github.com/rancher/k3s/pkg/etcd"
"github.com/rancher/k3s/pkg/netutil"
"github.com/rancher/k3s/pkg/rootless"
"github.com/rancher/k3s/pkg/server"
@ -31,6 +33,10 @@ import (
_ "github.com/mattn/go-sqlite3" // ensure we have sqlite
)
const (
lbServerPort = 6444
)
func Run(app *cli.Context) error {
if err := cmds.InitLogging(); err != nil {
return err
@ -86,7 +92,6 @@ func run(app *cli.Context, cfg *cmds.Server) error {
serverConfig.ControlConfig.DataDir = cfg.DataDir
serverConfig.ControlConfig.KubeConfigOutput = cfg.KubeConfigOutput
serverConfig.ControlConfig.KubeConfigMode = cfg.KubeConfigMode
serverConfig.ControlConfig.NoScheduler = cfg.DisableScheduler
serverConfig.Rootless = cfg.Rootless
serverConfig.ControlConfig.SANs = knownIPs(cfg.TLSSan)
serverConfig.ControlConfig.BindAddress = cfg.BindAddress
@ -109,6 +114,10 @@ func run(app *cli.Context, cfg *cmds.Server) error {
serverConfig.ControlConfig.DisableCCM = cfg.DisableCCM
serverConfig.ControlConfig.DisableNPC = cfg.DisableNPC
serverConfig.ControlConfig.DisableKubeProxy = cfg.DisableKubeProxy
serverConfig.ControlConfig.DisableETCD = cfg.DisableETCD
serverConfig.ControlConfig.DisableAPIServer = cfg.DisableAPIServer
serverConfig.ControlConfig.DisableScheduler = cfg.DisableScheduler
serverConfig.ControlConfig.DisableControllerManager = cfg.DisableControllerManager
serverConfig.ControlConfig.ClusterInit = cfg.ClusterInit
serverConfig.ControlConfig.EncryptSecrets = cfg.EncryptSecrets
serverConfig.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName
@ -119,7 +128,7 @@ func run(app *cli.Context, cfg *cmds.Server) error {
serverConfig.ControlConfig.EtcdExposeMetrics = cfg.EtcdExposeMetrics
if cfg.ClusterResetRestorePath != "" && !cfg.ClusterReset {
return errors.New("Invalid flag use. --cluster-reset required with --cluster-reset-restore-path")
return errors.New("invalid flag use. --cluster-reset required with --cluster-reset-restore-path")
}
serverConfig.ControlConfig.ClusterReset = cfg.ClusterReset
@ -129,6 +138,13 @@ func run(app *cli.Context, cfg *cmds.Server) error {
serverConfig.ControlConfig.SupervisorPort = serverConfig.ControlConfig.HTTPSPort
}
if serverConfig.ControlConfig.DisableAPIServer {
serverConfig.ControlConfig.APIServerPort = lbServerPort
if serverConfig.ControlConfig.SupervisorPort != serverConfig.ControlConfig.HTTPSPort {
serverConfig.ControlConfig.APIServerPort = lbServerPort + 1
}
}
if cmds.AgentConfig.FlannelIface != "" && cmds.AgentConfig.NodeIP == "" {
cmds.AgentConfig.NodeIP = netutil.GetIPFromInterface(cmds.AgentConfig.FlannelIface)
}
@ -245,6 +261,7 @@ func run(app *cli.Context, cfg *cmds.Server) error {
os.Unsetenv("NOTIFY_SOCKET")
ctx := signals.SetupSignalHandler(context.Background())
if err := server.StartServer(ctx, &serverConfig); err != nil {
return err
}
@ -280,13 +297,24 @@ func run(app *cli.Context, cfg *cmds.Server) error {
agentConfig.DataDir = filepath.Dir(serverConfig.ControlConfig.DataDir)
agentConfig.ServerURL = url
agentConfig.Token = token
agentConfig.DisableLoadBalancer = true
agentConfig.DisableLoadBalancer = !serverConfig.ControlConfig.DisableAPIServer
agentConfig.ETCDAgent = serverConfig.ControlConfig.DisableAPIServer
agentConfig.Rootless = cfg.Rootless
if agentConfig.Rootless {
// let agent specify Rootless kubelet flags, but not unshare twice
agentConfig.RootlessAlreadyUnshared = true
}
if serverConfig.ControlConfig.DisableAPIServer {
// setting LBServerPort to a prespecified port to initialize the kubeconfigs with the right address
agentConfig.LBServerPort = lbServerPort
// initialize the apiAddress Channel for receiving the api address from etcd
agentConfig.APIAddressCh = make(chan string, 1)
setAPIAddressChannel(ctx, &serverConfig, &agentConfig)
defer close(agentConfig.APIAddressCh)
}
return agent.Run(ctx, agentConfig)
}
@ -311,3 +339,30 @@ func getArgValueFromList(searchArg string, argList []string) string {
}
return value
}
// setAPIAddressChannel will try to get the api address key from etcd and when it succeed it will
// set the APIAddressCh channel with its value, the function works for both k3s and rke2 in case
// of k3s we block returning back to the agent.Run until we get the api address, however in rke2
// the code will not block operation and will run the operation in a goroutine
func setAPIAddressChannel(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) {
// start a goroutine to check for the server ip if set from etcd in case of rke2
if serverConfig.ControlConfig.HTTPSPort != serverConfig.ControlConfig.SupervisorPort {
go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
return
}
getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
}
func getAPIAddressFromEtcd(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
serverAddress, err := etcd.GetAPIServerURLFromETCD(ctx, &serverConfig.ControlConfig)
if err == nil {
agentConfig.ServerURL = "https://" + serverAddress
agentConfig.APIAddressCh <- agentConfig.ServerURL
break
}
logrus.Warn(err)
}
}

View File

@ -2,6 +2,7 @@ package cluster
import (
"context"
"net/url"
"strings"
"github.com/k3s-io/kine/pkg/client"
@ -10,6 +11,7 @@ import (
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/cluster/managed"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/etcd"
)
type Cluster struct {
@ -34,6 +36,31 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
return nil, errors.Wrap(err, "init cluster datastore and https")
}
if c.config.DisableETCD {
ready := make(chan struct{})
defer close(ready)
// try to get /db/info urls first before attempting to use join url
clientURLs, _, err := etcd.ClientURLs(ctx, c.clientAccessInfo)
if err != nil {
return nil, err
}
if len(clientURLs) < 1 {
clientURL, err := url.Parse(c.config.JoinURL)
if err != nil {
return nil, err
}
clientURL.Host = clientURL.Hostname() + ":2379"
clientURLs = append(clientURLs, clientURL.String())
}
etcdProxy, err := etcd.NewETCDProxy(true, c.config.DataDir, clientURLs[0])
if err != nil {
return nil, err
}
c.setupEtcdProxy(ctx, etcdProxy)
return ready, nil
}
// start managed database (if necessary)
if err := c.start(ctx); err != nil {
return nil, errors.Wrap(err, "start managed database")

View File

@ -126,3 +126,23 @@ func (c *Cluster) assignManagedDriver(ctx context.Context) error {
return nil
}
// setupEtcdProxy
func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {
if c.managedDB == nil {
return
}
go func() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for range t.C {
newAddresses, err := c.managedDB.GetMembersClientURLs(ctx)
if err != nil {
logrus.Warnf("failed to get etcd client URLs: %v", err)
continue
}
etcdProxy.Update(newAddresses)
}
}()
}

View File

@ -22,6 +22,7 @@ type Driver interface {
Restore(ctx context.Context) error
EndpointName() string
Snapshot(ctx context.Context, config *config.Control) error
GetMembersClientURLs(ctx context.Context) ([]string, error)
}
func RegisterDriver(d Driver) {

View File

@ -30,7 +30,6 @@ func Agent(config *config.Agent) error {
logs.InitLogs()
defer logs.FlushLogs()
if err := startKubelet(config); err != nil {
return err
}

View File

@ -38,6 +38,7 @@ type Node struct {
AgentConfig Agent
CACerts []byte
Certificate *tls.Certificate
ServerHTTPSPort int
}
type Containerd struct {
@ -115,7 +116,6 @@ type Control struct {
Skips map[string]bool
Disables map[string]bool
Datastore endpoint.Config
NoScheduler bool
ExtraAPIArgs []string
ExtraControllerArgs []string
ExtraCloudControllerArgs []string
@ -128,6 +128,10 @@ type Control struct {
DisableCCM bool
DisableNPC bool
DisableKubeProxy bool
DisableAPIServer bool
DisableControllerManager bool
DisableScheduler bool
DisableETCD bool
ClusterInit bool
ClusterReset bool
ClusterResetRestorePath string

View File

@ -95,15 +95,20 @@ func Server(ctx context.Context, cfg *config.Control) error {
cfg.Runtime.Tunnel = setupTunnel()
util.DisableProxyHostnameCheck = true
auth, handler, err := apiServer(ctx, cfg, runtime)
if err != nil {
return err
}
var auth authenticator.Request
var handler http.Handler
var err error
if err := waitForAPIServerInBackground(ctx, runtime); err != nil {
return err
}
if !cfg.DisableAPIServer {
auth, handler, err = apiServer(ctx, cfg, runtime)
if err != nil {
return err
}
if err := waitForAPIServerInBackground(ctx, runtime); err != nil {
return err
}
}
basicAuth, err := basicAuthenticator(runtime.PasswdFile)
if err != nil {
return err
@ -112,14 +117,15 @@ func Server(ctx context.Context, cfg *config.Control) error {
runtime.Authenticator = combineAuthenticators(basicAuth, auth)
runtime.Handler = handler
if !cfg.NoScheduler {
if !cfg.DisableScheduler {
if err := scheduler(cfg, runtime); err != nil {
return err
}
}
if err := controllerManager(cfg, runtime); err != nil {
return err
if !cfg.DisableControllerManager {
if err := controllerManager(cfg, runtime); err != nil {
return err
}
}
if !cfg.DisableCCM {
@ -976,7 +982,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *c
select {
case <-ctx.Done():
logrus.Fatalf("cloud-controller-manager context canceled: %v", ctx.Err())
case <-time.After(time.Second):
case <-time.After(5 * time.Second):
continue
}
}

View File

@ -11,8 +11,8 @@ import (
)
const (
nodeID = "etcd.k3s.cattle.io/node-name"
nodeAddress = "etcd.k3s.cattle.io/node-address"
NodeID = "etcd.k3s.cattle.io/node-name"
NodeAddress = "etcd.k3s.cattle.io/node-address"
master = "node-role.kubernetes.io/master"
controlPlane = "node-role.kubernetes.io/control-plane"
etcdRole = "node-role.kubernetes.io/etcd"
@ -56,10 +56,11 @@ func (h *handler) sync(key string, node *v1.Node) (*v1.Node, error) {
}
func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) {
if node.Annotations[nodeID] == h.etcd.name &&
node.Annotations[nodeAddress] == h.etcd.address &&
if node.Annotations[NodeID] == h.etcd.name &&
node.Annotations[NodeAddress] == h.etcd.address &&
node.Labels[etcdRole] == "true" &&
node.Labels[controlPlane] == "true" {
node.Labels[controlPlane] == "true" ||
h.etcd.config.DisableETCD {
return node, nil
}
@ -67,8 +68,8 @@ func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) {
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
node.Annotations[nodeID] = h.etcd.name
node.Annotations[nodeAddress] = h.etcd.address
node.Annotations[NodeID] = h.etcd.name
node.Annotations[NodeAddress] = h.etcd.address
node.Labels[etcdRole] = "true"
node.Labels[master] = "true"
node.Labels[controlPlane] = "true"
@ -81,11 +82,10 @@ func (h *handler) onRemove(key string, node *v1.Node) (*v1.Node, error) {
return node, nil
}
id := node.Annotations[nodeID]
address := node.Annotations[nodeAddress]
if address == "" {
id := node.Annotations[NodeID]
address, ok := node.Annotations[NodeAddress]
if !ok {
return node, nil
}
return node, h.etcd.removePeer(h.ctx, id, address)
}

View File

@ -58,14 +58,18 @@ func NewETCD() *ETCD {
}
}
var learnerProgressKey = version.Program + "/etcd/learnerProgress"
var (
learnerProgressKey = version.Program + "/etcd/learnerProgress"
// AddressKey will contain the value of api addresses list
AddressKey = version.Program + "/apiaddresses"
)
const (
snapshotPrefix = "etcd-snapshot-"
endpoint = "https://127.0.0.1:2379"
testTimeout = time.Second * 10
manageTickerTime = time.Second * 15
learnerMaxStallTime = time.Minute * 1
learnerMaxStallTime = time.Minute * 5
// defaultDialTimeout is intentionally short so that connections timeout within the testTimeout defined above
defaultDialTimeout = 2 * time.Second
@ -206,11 +210,6 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
return errors.Wrapf(err, "configuration validation failed")
}
e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error {
Register(ctx, e, e.config.Runtime.Core.Core().V1().Node())
return nil
}
if !e.config.EtcdDisableSnapshots {
e.setSnapshotFunction(ctx)
e.cron.Start()
@ -246,12 +245,12 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
// join attempts to add a member to an existing cluster
func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
clientURLs, memberList, err := e.clientURLs(ctx, clientAccessInfo)
clientURLs, memberList, err := ClientURLs(ctx, clientAccessInfo)
if err != nil {
return err
}
client, err := getClient(ctx, e.runtime, clientURLs...)
client, err := GetClient(ctx, e.runtime, clientURLs...)
if err != nil {
return err
}
@ -316,13 +315,13 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt
e.config = config
e.runtime = config.Runtime
client, err := getClient(ctx, e.runtime, endpoint)
client, err := GetClient(ctx, e.runtime, endpoint)
if err != nil {
return nil, err
}
e.client = client
address, err := getAdvertiseAddress(config.PrivateIP)
address, err := GetAdvertiseAddress(config.PrivateIP)
if err != nil {
return nil, err
}
@ -335,6 +334,10 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt
if err := e.setName(false); err != nil {
return nil, err
}
e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error {
Register(ctx, e, e.config.Runtime.Core.Core().V1().Node())
return nil
}
tombstoneFile := filepath.Join(etcdDBDir(e.config), "tombstone")
if _, err := os.Stat(tombstoneFile); err == nil {
@ -405,7 +408,7 @@ func (e *ETCD) infoHandler() http.Handler {
}
// getClient returns an etcd client connected to the specified endpoints
func getClient(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*etcd.Client, error) {
func GetClient(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*etcd.Client, error) {
cfg, err := getClientConfig(ctx, runtime, endpoints...)
if err != nil {
return nil, err
@ -450,7 +453,7 @@ func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) {
}
// getAdvertiseAddress returns the IP address best suited for advertising to clients
func getAdvertiseAddress(advertiseIP string) (string, error) {
func GetAdvertiseAddress(advertiseIP string) (string, error) {
ip := advertiseIP
if ip == "" {
ipAddr, err := utilnet.ChooseHostInterface()
@ -684,7 +687,7 @@ func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress)
}
// clientURLs returns a list of all non-learner etcd cluster member client access URLs
func (e *ETCD) clientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info) ([]string, Members, error) {
func ClientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info) ([]string, Members, error) {
var memberList Members
resp, err := clientaccess.Get("/db/info", clientAccessInfo)
if err != nil {
@ -737,7 +740,7 @@ func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) err
if e.config == nil {
e.config = config
}
client, err := getClient(ctx, e.config.Runtime, endpoint)
client, err := GetClient(ctx, e.config.Runtime, endpoint)
if err != nil {
return err
}
@ -892,3 +895,49 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error)
}
return backupDir, nil
}
// GetAPIServerURLFromETCD will try to fetch the version.Program/apiaddresses key from etcd
// when it succeed it will parse the first address in the list and return back an address
func GetAPIServerURLFromETCD(ctx context.Context, cfg *config.Control) (string, error) {
if cfg.Runtime == nil {
return "", fmt.Errorf("runtime is not ready yet")
}
cl, err := GetClient(ctx, cfg.Runtime, endpoint)
if err != nil {
return "", err
}
etcdResp, err := cl.KV.Get(ctx, AddressKey)
if err != nil {
return "", err
}
if etcdResp.Count < 1 {
return "", fmt.Errorf("servers addresses are not yet set")
}
var addresses []string
if err := json.Unmarshal(etcdResp.Kvs[0].Value, &addresses); err != nil {
return "", fmt.Errorf("failed to unmarshal etcd key: %v", err)
}
return addresses[0], nil
}
// GetMembersClientURLs will list through the member lists in etcd and return
// back a combined list of client urls for each member in the cluster
func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
members, err := e.client.MemberList(ctx)
if err != nil {
return nil, err
}
var memberUrls []string
for _, member := range members.Members {
for _, clientURL := range member.ClientURLs {
memberUrls = append(memberUrls, string(clientURL))
}
}
return memberUrls, nil
}

76
pkg/etcd/etcdproxy.go Normal file
View File

@ -0,0 +1,76 @@
package etcd
import (
"net/url"
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/agent/loadbalancer"
)
type Proxy interface {
Update(addresses []string)
ETCDURL() string
ETCDAddresses() []string
ETCDServerURL() string
}
// NewETCDProxy initializes a new proxy structure that contain a load balancer
// which listens on port 2379 and proxy between etcd cluster members
func NewETCDProxy(enabled bool, dataDir, etcdURL string) (Proxy, error) {
e := &etcdproxy{
dataDir: dataDir,
initialETCDURL: etcdURL,
etcdURL: etcdURL,
}
if enabled {
lb, err := loadbalancer.New(dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379)
if err != nil {
return nil, err
}
e.etcdLB = lb
e.etcdLBURL = lb.LoadBalancerServerURL()
}
u, err := url.Parse(e.initialETCDURL)
if err != nil {
return nil, errors.Wrap(err, "failed to parse etcd client URL")
}
e.fallbackETCDAddress = u.Host
e.etcdPort = u.Port()
return e, nil
}
type etcdproxy struct {
dataDir string
etcdLBURL string
initialETCDURL string
etcdURL string
etcdPort string
fallbackETCDAddress string
etcdAddresses []string
etcdLB *loadbalancer.LoadBalancer
}
func (e *etcdproxy) Update(addresses []string) {
if e.etcdLB != nil {
e.etcdLB.Update(addresses)
}
}
func (e *etcdproxy) ETCDURL() string {
return e.etcdURL
}
func (e *etcdproxy) ETCDAddresses() []string {
if len(e.etcdAddresses) > 0 {
return e.etcdAddresses
}
return []string{e.fallbackETCDAddress}
}
func (e *etcdproxy) ETCDServerURL() string {
return e.etcdURL
}

108
pkg/server/etcd.go Normal file
View File

@ -0,0 +1,108 @@
package server
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/rancher/k3s/pkg/etcd"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// setETCDLabelsAndAnnotations will set the etcd role label if not exists also it
// sets special annotaitons on the node object which are etcd node id and etcd node
// address, the function will also remove the controlplane and master role labels if
// they exist on the node
func setETCDLabelsAndAnnotations(ctx context.Context, config *Config) error {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
controlConfig := &config.ControlConfig
sc, err := newContext(ctx, controlConfig.Runtime.KubeConfigAdmin)
if err != nil {
logrus.Infof("Failed to set etcd role label: %v", err)
continue
}
if err := stageFiles(ctx, sc, controlConfig); err != nil {
logrus.Infof("Failed to set etcd role label: %v", err)
continue
}
if err := sc.Start(ctx); err != nil {
logrus.Infof("Failed to set etcd role label: %v", err)
continue
}
controlConfig.Runtime.Core = sc.Core
nodes := sc.Core.Core().V1().Node()
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
logrus.Info("Failed to set etcd role label: node name not set")
continue
}
node, err := nodes.Get(nodeName, metav1.GetOptions{})
if err != nil {
logrus.Infof("Failed to set etcd role label: %v", err)
continue
}
if node.Labels == nil {
node.Labels = make(map[string]string)
}
// remove controlplane label if role label exists
var controlRoleLabelExists bool
if _, ok := node.Labels[MasterRoleLabelKey]; ok {
delete(node.Labels, MasterRoleLabelKey)
controlRoleLabelExists = true
}
if _, ok := node.Labels[ControlPlaneRoleLabelKey]; ok {
delete(node.Labels, ControlPlaneRoleLabelKey)
controlRoleLabelExists = true
}
if v, ok := node.Labels[ETCDRoleLabelKey]; ok && v == "true" && !controlRoleLabelExists {
break
}
node.Labels[ETCDRoleLabelKey] = "true"
// this is replacement to the etcd controller handleself function
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
fileName := filepath.Join(controlConfig.DataDir, "db", "etcd", "name")
data, err := ioutil.ReadFile(fileName)
if err != nil {
logrus.Infof("Waiting for etcd node name file to be available: %v", err)
continue
}
etcdNodeName := string(data)
node.Annotations[etcd.NodeID] = etcdNodeName
address, err := etcd.GetAdvertiseAddress(controlConfig.PrivateIP)
if err != nil {
logrus.Infof("Waiting for etcd node address to be available: %v", err)
continue
}
node.Annotations[etcd.NodeAddress] = address
_, err = nodes.Update(node)
if err == nil {
logrus.Infof("Successfully set etcd role label and annotations on node %s", nodeName)
break
}
select {
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

View File

@ -17,6 +17,7 @@ import (
"github.com/k3s-io/helm-controller/pkg/helm"
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/apiaddresses"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/daemons/control"
@ -40,6 +41,7 @@ import (
const (
MasterRoleLabelKey = "node-role.kubernetes.io/master"
ControlPlaneRoleLabelKey = "node-role.kubernetes.io/control-plane"
ETCDRoleLabelKey = "node-role.kubernetes.io/etcd"
)
func ResolveDataDir(dataDir string) (string, error) {
@ -62,7 +64,11 @@ func StartServer(ctx context.Context, config *Config) error {
config.ControlConfig.Runtime.Handler = router(ctx, config)
go startOnAPIServerReady(ctx, config)
if config.ControlConfig.DisableAPIServer {
go setETCDLabelsAndAnnotations(ctx, config)
} else {
go startOnAPIServerReady(ctx, config)
}
for _, hook := range config.StartupHooks {
if err := hook(ctx, config.ControlConfig.Runtime.APIServerReady, config.ControlConfig.Runtime.KubeConfigAdmin); err != nil {
@ -137,9 +143,8 @@ func runControllers(ctx context.Context, config *Config) error {
panic(err)
}
}
if !config.DisableAgent {
go setControlPlaneRoleLabel(ctx, sc.Core.Core().V1().Node())
}
go setControlPlaneRoleLabel(ctx, sc.Core.Core().V1().Node(), config)
go setClusterDNSConfig(ctx, config, sc.Core.Core().V1().ConfigMap())
@ -185,6 +190,10 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error {
return err
}
if err := apiaddresses.Register(ctx, config.ControlConfig.Runtime, sc.Core.Core().V1().Endpoints()); err != nil {
return err
}
if config.Rootless {
return rootlessports.Register(ctx, sc.Core.Core().V1().Service(), !config.DisableServiceLB, config.ControlConfig.HTTPSPort)
}
@ -420,7 +429,10 @@ func isSymlink(config string) bool {
return false
}
func setControlPlaneRoleLabel(ctx context.Context, nodes v1.NodeClient) error {
func setControlPlaneRoleLabel(ctx context.Context, nodes v1.NodeClient, config *Config) error {
if config.DisableAgent || config.ControlConfig.DisableAPIServer {
return nil
}
for {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
@ -434,7 +446,15 @@ func setControlPlaneRoleLabel(ctx context.Context, nodes v1.NodeClient) error {
time.Sleep(1 * time.Second)
continue
}
if v, ok := node.Labels[ControlPlaneRoleLabelKey]; ok && v == "true" {
// remove etcd label if etcd is disabled
var etcdRoleLabelExists bool
if config.ControlConfig.DisableETCD {
if _, ok := node.Labels[ETCDRoleLabelKey]; ok {
delete(node.Labels, ETCDRoleLabelKey)
etcdRoleLabelExists = true
}
}
if v, ok := node.Labels[ControlPlaneRoleLabelKey]; ok && v == "true" && !etcdRoleLabelExists {
break
}
if node.Labels == nil {
@ -442,6 +462,7 @@ func setControlPlaneRoleLabel(ctx context.Context, nodes v1.NodeClient) error {
}
node.Labels[ControlPlaneRoleLabelKey] = "true"
node.Labels[MasterRoleLabelKey] = "true"
_, err = nodes.Update(node)
if err == nil {
logrus.Infof("Control-plane role label has been set successfully on node: %s", nodeName)