k3s/pkg/agent/run.go
Brad Davidson f970e49b7d Wait for apiserver to become healthy before starting agent controllers
It is possible that the apiserver may serve read requests but not allow
writes yet, in which case flannel will crash on startup when trying to
configure the subnet manager.

Fix this by waiting for the apiserver to become fully ready before
starting flannel and the network policy controller.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
2021-02-26 19:28:53 -08:00

338 lines
8.8 KiB
Go

package agent
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2"
systemd "github.com/coreos/go-systemd/daemon"
"github.com/rancher/k3s/pkg/agent/config"
"github.com/rancher/k3s/pkg/agent/containerd"
"github.com/rancher/k3s/pkg/agent/flannel"
"github.com/rancher/k3s/pkg/agent/netpol"
"github.com/rancher/k3s/pkg/agent/proxy"
"github.com/rancher/k3s/pkg/agent/syssetup"
"github.com/rancher/k3s/pkg/agent/tunnel"
"github.com/rancher/k3s/pkg/cli/cmds"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/agent"
daemonconfig "github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/nodeconfig"
"github.com/rancher/k3s/pkg/rootless"
"github.com/rancher/k3s/pkg/version"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/controller-manager/app"
)
var (
InternalIPLabel = version.Program + ".io/internal-ip"
ExternalIPLabel = version.Program + ".io/external-ip"
HostnameLabel = version.Program + ".io/hostname"
)
const (
dockershimSock = "unix:///var/run/dockershim.sock"
containerdSock = "unix:///run/k3s/containerd/containerd.sock"
)
// setupCriCtlConfig creates the crictl config file and populates it
// with the given data from config.
func setupCriCtlConfig(cfg cmds.Agent, nodeConfig *daemonconfig.Node) error {
cre := nodeConfig.ContainerRuntimeEndpoint
if cre == "" {
switch {
case cfg.Docker:
cre = dockershimSock
default:
cre = containerdSock
}
}
agentConfDir := filepath.Join(cfg.DataDir, "agent", "etc")
if _, err := os.Stat(agentConfDir); os.IsNotExist(err) {
if err := os.MkdirAll(agentConfDir, 0700); err != nil {
return err
}
}
crp := "runtime-endpoint: " + cre + "\n"
return ioutil.WriteFile(agentConfDir+"/crictl.yaml", []byte(crp), 0600)
}
func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
nodeConfig := config.Get(ctx, cfg, proxy)
if err := setupCriCtlConfig(cfg, nodeConfig); err != nil {
return err
}
if !nodeConfig.NoFlannel {
if err := flannel.Prepare(ctx, nodeConfig); err != nil {
return err
}
}
if !nodeConfig.Docker && nodeConfig.ContainerRuntimeEndpoint == "" {
if err := containerd.Run(ctx, nodeConfig); err != nil {
return err
}
}
if err := setupTunnelAndRunAgent(ctx, nodeConfig, cfg, proxy); err != nil {
return err
}
coreClient, err := coreClient(nodeConfig.AgentConfig.KubeConfigKubelet)
if err != nil {
return err
}
app.WaitForAPIServer(coreClient, 30*time.Second)
if !nodeConfig.NoFlannel {
if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
}
}
if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
}
if !nodeConfig.AgentConfig.DisableNPC {
if err := netpol.Run(ctx, nodeConfig); err != nil {
return err
}
}
<-ctx.Done()
return ctx.Err()
}
func coreClient(cfg string) (kubernetes.Interface, error) {
restConfig, err := clientcmd.BuildConfigFromFlags("", cfg)
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(restConfig)
}
func Run(ctx context.Context, cfg cmds.Agent) error {
if err := validate(); err != nil {
return err
}
syssetup.Configure()
if cfg.Rootless && !cfg.RootlessAlreadyUnshared {
if err := rootless.Rootless(cfg.DataDir); err != nil {
return err
}
}
agentDir := filepath.Join(cfg.DataDir, "agent")
if err := os.MkdirAll(agentDir, 0700); err != nil {
return err
}
proxy, err := proxy.NewAPIProxy(!cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort)
if err != nil {
return err
}
for {
newToken, err := clientaccess.ParseAndValidateTokenForUser(proxy.SupervisorURL(), cfg.Token, "node")
if err != nil {
logrus.Error(err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
}
continue
}
cfg.Token = newToken.String()
break
}
systemd.SdNotify(true, "READY=1\n")
return run(ctx, cfg, proxy)
}
func validate() error {
if cgroups.Mode() == cgroups.Unified {
return validateCgroupsV2()
}
return validateCgroupsV1()
}
func validateCgroupsV1() error {
cgroups, err := ioutil.ReadFile("/proc/self/cgroup")
if err != nil {
return err
}
if !strings.Contains(string(cgroups), "cpuset") {
logrus.Warn(`Failed to find cpuset cgroup, you may need to add "cgroup_enable=cpuset" to your linux cmdline (/boot/cmdline.txt on a Raspberry Pi)`)
}
if !strings.Contains(string(cgroups), "memory") {
msg := "ailed to find memory cgroup, you may need to add \"cgroup_memory=1 cgroup_enable=memory\" to your linux cmdline (/boot/cmdline.txt on a Raspberry Pi)"
logrus.Error("F" + msg)
return errors.New("f" + msg)
}
return nil
}
func validateCgroupsV2() error {
manager, err := cgroupsv2.LoadManager("/sys/fs/cgroup", "/")
if err != nil {
return err
}
controllers, err := manager.RootControllers()
if err != nil {
return err
}
m := make(map[string]struct{})
for _, controller := range controllers {
m[controller] = struct{}{}
}
for _, controller := range []string{"cpu", "cpuset", "memory"} {
if _, ok := m[controller]; !ok {
return fmt.Errorf("failed to find %s cgroup (v2)", controller)
}
}
return nil
}
func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v1.NodeInterface) error {
count := 0
for {
node, err := nodes.Get(ctx, agentConfig.NodeName, metav1.GetOptions{})
if err != nil {
if count%30 == 0 {
logrus.Infof("Waiting for kubelet to be ready on node %s: %v", agentConfig.NodeName, err)
}
count++
time.Sleep(1 * time.Second)
continue
}
newLabels, updateMutables := updateMutableLabels(agentConfig, node.Labels)
updateAddresses := !agentConfig.DisableCCM
if updateAddresses {
newLabels, updateAddresses = updateAddressLabels(agentConfig, newLabels)
}
// inject node config
updateNode, err := nodeconfig.SetNodeConfigAnnotations(node)
if err != nil {
return err
}
if updateAddresses || updateMutables {
node.Labels = newLabels
updateNode = true
}
if updateNode {
if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
continue
}
}
logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName)
} else {
logrus.Infof("labels have already set on node: %s", agentConfig.NodeName)
}
break
}
return nil
}
func updateMutableLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string]string) (map[string]string, bool) {
result := map[string]string{}
for _, m := range agentConfig.NodeLabels {
var (
v string
p = strings.SplitN(m, `=`, 2)
k = p[0]
)
if len(p) > 1 {
v = p[1]
}
result[k] = v
}
result = labels.Merge(nodeLabels, result)
return result, !equality.Semantic.DeepEqual(nodeLabels, result)
}
func updateAddressLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string]string) (map[string]string, bool) {
result := map[string]string{
InternalIPLabel: agentConfig.NodeIP,
HostnameLabel: agentConfig.NodeName,
}
if agentConfig.NodeExternalIP != "" {
result[ExternalIPLabel] = agentConfig.NodeExternalIP
}
result = labels.Merge(nodeLabels, result)
return result, !equality.Semantic.DeepEqual(nodeLabels, result)
}
// setupTunnelAndRunAgent should start the setup tunnel before starting kubelet and kubeproxy
// there are special case for etcd agents, it will wait until it can find the apiaddress from
// the address channel and update the proxy with the servers addresses, if in rke2 we need to
// start the agent before the tunnel is setup to allow kubelet to start first and start the pods
func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
var agentRan bool
if cfg.ETCDAgent {
// only in rke2 run the agent before the tunnel setup and check for that later in the function
if proxy.IsAPIServerLBEnabled() {
if err := agent.Agent(&nodeConfig.AgentConfig); err != nil {
return err
}
agentRan = true
}
select {
case address := <-cfg.APIAddressCh:
cfg.ServerURL = address
u, err := url.Parse(cfg.ServerURL)
if err != nil {
logrus.Warn(err)
}
proxy.Update([]string{fmt.Sprintf("%s:%d", u.Hostname(), nodeConfig.ServerHTTPSPort)})
case <-ctx.Done():
return ctx.Err()
}
}
if err := tunnel.Setup(ctx, nodeConfig, proxy); err != nil {
return err
}
if !agentRan {
return agent.Agent(&nodeConfig.AgentConfig)
}
return nil
}