Brad Davidson f970e49b7d Wait for apiserver to become healthy before starting agent controllers
It is possible that the apiserver may serve read requests but not allow
writes yet, in which case flannel will crash on startup when trying to
configure the subnet manager.

Fix this by waiting for the apiserver to become fully ready before
starting flannel and the network policy controller.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
2021-02-26 19:28:53 -08:00

338 lines
8.8 KiB

package agent
import (
cgroupsv2 "github.com/containerd/cgroups/v2"
systemd "github.com/coreos/go-systemd/daemon"
daemonconfig "github.com/rancher/k3s/pkg/daemons/config"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
var (
InternalIPLabel = version.Program + ".io/internal-ip"
ExternalIPLabel = version.Program + ".io/external-ip"
HostnameLabel = version.Program + ".io/hostname"
const (
dockershimSock = "unix:///var/run/dockershim.sock"
containerdSock = "unix:///run/k3s/containerd/containerd.sock"
// setupCriCtlConfig creates the crictl config file and populates it
// with the given data from config.
func setupCriCtlConfig(cfg cmds.Agent, nodeConfig *daemonconfig.Node) error {
cre := nodeConfig.ContainerRuntimeEndpoint
if cre == "" {
switch {
case cfg.Docker:
cre = dockershimSock
cre = containerdSock
agentConfDir := filepath.Join(cfg.DataDir, "agent", "etc")
if _, err := os.Stat(agentConfDir); os.IsNotExist(err) {
if err := os.MkdirAll(agentConfDir, 0700); err != nil {
return err
crp := "runtime-endpoint: " + cre + "\n"
return ioutil.WriteFile(agentConfDir+"/crictl.yaml", []byte(crp), 0600)
func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
nodeConfig := config.Get(ctx, cfg, proxy)
if err := setupCriCtlConfig(cfg, nodeConfig); err != nil {
return err
if !nodeConfig.NoFlannel {
if err := flannel.Prepare(ctx, nodeConfig); err != nil {
return err
if !nodeConfig.Docker && nodeConfig.ContainerRuntimeEndpoint == "" {
if err := containerd.Run(ctx, nodeConfig); err != nil {
return err
if err := setupTunnelAndRunAgent(ctx, nodeConfig, cfg, proxy); err != nil {
return err
coreClient, err := coreClient(nodeConfig.AgentConfig.KubeConfigKubelet)
if err != nil {
return err
app.WaitForAPIServer(coreClient, 30*time.Second)
if !nodeConfig.NoFlannel {
if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
if !nodeConfig.AgentConfig.DisableNPC {
if err := netpol.Run(ctx, nodeConfig); err != nil {
return err
return ctx.Err()
func coreClient(cfg string) (kubernetes.Interface, error) {
restConfig, err := clientcmd.BuildConfigFromFlags("", cfg)
if err != nil {
return nil, err
return kubernetes.NewForConfig(restConfig)
func Run(ctx context.Context, cfg cmds.Agent) error {
if err := validate(); err != nil {
return err
if cfg.Rootless && !cfg.RootlessAlreadyUnshared {
if err := rootless.Rootless(cfg.DataDir); err != nil {
return err
agentDir := filepath.Join(cfg.DataDir, "agent")
if err := os.MkdirAll(agentDir, 0700); err != nil {
return err
proxy, err := proxy.NewAPIProxy(!cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort)
if err != nil {
return err
for {
newToken, err := clientaccess.ParseAndValidateTokenForUser(proxy.SupervisorURL(), cfg.Token, "node")
if err != nil {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
cfg.Token = newToken.String()
systemd.SdNotify(true, "READY=1\n")
return run(ctx, cfg, proxy)
func validate() error {
if cgroups.Mode() == cgroups.Unified {
return validateCgroupsV2()
return validateCgroupsV1()
func validateCgroupsV1() error {
cgroups, err := ioutil.ReadFile("/proc/self/cgroup")
if err != nil {
return err
if !strings.Contains(string(cgroups), "cpuset") {
logrus.Warn(`Failed to find cpuset cgroup, you may need to add "cgroup_enable=cpuset" to your linux cmdline (/boot/cmdline.txt on a Raspberry Pi)`)
if !strings.Contains(string(cgroups), "memory") {
msg := "ailed to find memory cgroup, you may need to add \"cgroup_memory=1 cgroup_enable=memory\" to your linux cmdline (/boot/cmdline.txt on a Raspberry Pi)"
logrus.Error("F" + msg)
return errors.New("f" + msg)
return nil
func validateCgroupsV2() error {
manager, err := cgroupsv2.LoadManager("/sys/fs/cgroup", "/")
if err != nil {
return err
controllers, err := manager.RootControllers()
if err != nil {
return err
m := make(map[string]struct{})
for _, controller := range controllers {
m[controller] = struct{}{}
for _, controller := range []string{"cpu", "cpuset", "memory"} {
if _, ok := m[controller]; !ok {
return fmt.Errorf("failed to find %s cgroup (v2)", controller)
return nil
func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v1.NodeInterface) error {
count := 0
for {
node, err := nodes.Get(ctx, agentConfig.NodeName, metav1.GetOptions{})
if err != nil {
if count%30 == 0 {
logrus.Infof("Waiting for kubelet to be ready on node %s: %v", agentConfig.NodeName, err)
time.Sleep(1 * time.Second)
newLabels, updateMutables := updateMutableLabels(agentConfig, node.Labels)
updateAddresses := !agentConfig.DisableCCM
if updateAddresses {
newLabels, updateAddresses = updateAddressLabels(agentConfig, newLabels)
// inject node config
updateNode, err := nodeconfig.SetNodeConfigAnnotations(node)
if err != nil {
return err
if updateAddresses || updateMutables {
node.Labels = newLabels
updateNode = true
if updateNode {
if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName)
} else {
logrus.Infof("labels have already set on node: %s", agentConfig.NodeName)
return nil
func updateMutableLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string]string) (map[string]string, bool) {
result := map[string]string{}
for _, m := range agentConfig.NodeLabels {
var (
v string
p = strings.SplitN(m, `=`, 2)
k = p[0]
if len(p) > 1 {
v = p[1]
result[k] = v
result = labels.Merge(nodeLabels, result)
return result, !equality.Semantic.DeepEqual(nodeLabels, result)
func updateAddressLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string]string) (map[string]string, bool) {
result := map[string]string{
InternalIPLabel: agentConfig.NodeIP,
HostnameLabel: agentConfig.NodeName,
if agentConfig.NodeExternalIP != "" {
result[ExternalIPLabel] = agentConfig.NodeExternalIP
result = labels.Merge(nodeLabels, result)
return result, !equality.Semantic.DeepEqual(nodeLabels, result)
// setupTunnelAndRunAgent should start the setup tunnel before starting kubelet and kubeproxy
// there are special case for etcd agents, it will wait until it can find the apiaddress from
// the address channel and update the proxy with the servers addresses, if in rke2 we need to
// start the agent before the tunnel is setup to allow kubelet to start first and start the pods
func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
var agentRan bool
if cfg.ETCDAgent {
// only in rke2 run the agent before the tunnel setup and check for that later in the function
if proxy.IsAPIServerLBEnabled() {
if err := agent.Agent(&nodeConfig.AgentConfig); err != nil {
return err
agentRan = true
select {
case address := <-cfg.APIAddressCh:
cfg.ServerURL = address
u, err := url.Parse(cfg.ServerURL)
if err != nil {
proxy.Update([]string{fmt.Sprintf("%s:%d", u.Hostname(), nodeConfig.ServerHTTPSPort)})
case <-ctx.Done():
return ctx.Err()
if err := tunnel.Setup(ctx, nodeConfig, proxy); err != nil {
return err
if !agentRan {
return agent.Agent(&nodeConfig.AgentConfig)
return nil