mirror of
https://github.com/k3s-io/k3s.git
synced 2024-06-07 19:41:36 +00:00
f410fc7d1e
Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
379 lines
10 KiB
Go
379 lines
10 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/containerd/cgroups"
|
|
cgroupsv2 "github.com/containerd/cgroups/v2"
|
|
systemd "github.com/coreos/go-systemd/daemon"
|
|
"github.com/pkg/errors"
|
|
"github.com/rancher/k3s/pkg/agent/config"
|
|
"github.com/rancher/k3s/pkg/agent/containerd"
|
|
"github.com/rancher/k3s/pkg/agent/flannel"
|
|
"github.com/rancher/k3s/pkg/agent/netpol"
|
|
"github.com/rancher/k3s/pkg/agent/proxy"
|
|
"github.com/rancher/k3s/pkg/agent/syssetup"
|
|
"github.com/rancher/k3s/pkg/agent/tunnel"
|
|
"github.com/rancher/k3s/pkg/cli/cmds"
|
|
"github.com/rancher/k3s/pkg/clientaccess"
|
|
cp "github.com/rancher/k3s/pkg/cloudprovider"
|
|
"github.com/rancher/k3s/pkg/daemons/agent"
|
|
daemonconfig "github.com/rancher/k3s/pkg/daemons/config"
|
|
"github.com/rancher/k3s/pkg/nodeconfig"
|
|
"github.com/rancher/k3s/pkg/rootless"
|
|
"github.com/rancher/k3s/pkg/util"
|
|
"github.com/sirupsen/logrus"
|
|
"k8s.io/apimachinery/pkg/api/equality"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/client-go/kubernetes"
|
|
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/controller-manager/app"
|
|
utilsnet "k8s.io/utils/net"
|
|
)
|
|
|
|
const (
|
|
dockershimSock = "unix:///var/run/dockershim.sock"
|
|
containerdSock = "unix:///run/k3s/containerd/containerd.sock"
|
|
)
|
|
|
|
// setupCriCtlConfig creates the crictl config file and populates it
|
|
// with the given data from config.
|
|
func setupCriCtlConfig(cfg cmds.Agent, nodeConfig *daemonconfig.Node) error {
|
|
cre := nodeConfig.ContainerRuntimeEndpoint
|
|
if cre == "" {
|
|
switch {
|
|
case cfg.Docker:
|
|
cre = dockershimSock
|
|
default:
|
|
cre = containerdSock
|
|
}
|
|
}
|
|
|
|
agentConfDir := filepath.Join(cfg.DataDir, "agent", "etc")
|
|
if _, err := os.Stat(agentConfDir); os.IsNotExist(err) {
|
|
if err := os.MkdirAll(agentConfDir, 0700); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
crp := "runtime-endpoint: " + cre + "\n"
|
|
return ioutil.WriteFile(agentConfDir+"/crictl.yaml", []byte(crp), 0600)
|
|
}
|
|
|
|
func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
|
|
nodeConfig := config.Get(ctx, cfg, proxy)
|
|
|
|
dualCluster, err := utilsnet.IsDualStackCIDRs(nodeConfig.AgentConfig.ClusterCIDRs)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to validate cluster-cidr")
|
|
}
|
|
dualService, err := utilsnet.IsDualStackCIDRs(nodeConfig.AgentConfig.ServiceCIDRs)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to validate service-cidr")
|
|
}
|
|
dualNode, err := utilsnet.IsDualStackIPs(nodeConfig.AgentConfig.NodeIPs)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to validate node-ip")
|
|
}
|
|
|
|
syssetup.Configure(dualCluster || dualService || dualNode)
|
|
|
|
if err := setupCriCtlConfig(cfg, nodeConfig); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !nodeConfig.NoFlannel {
|
|
if err := flannel.Prepare(ctx, nodeConfig); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !nodeConfig.Docker && nodeConfig.ContainerRuntimeEndpoint == "" {
|
|
if err := containerd.Run(ctx, nodeConfig); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := setupTunnelAndRunAgent(ctx, nodeConfig, cfg, proxy); err != nil {
|
|
return err
|
|
}
|
|
|
|
coreClient, err := coreClient(nodeConfig.AgentConfig.KubeConfigKubelet)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
app.WaitForAPIServer(coreClient, 30*time.Second)
|
|
|
|
if !nodeConfig.NoFlannel {
|
|
if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !nodeConfig.AgentConfig.DisableNPC {
|
|
if err := netpol.Run(ctx, nodeConfig); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
<-ctx.Done()
|
|
return ctx.Err()
|
|
}
|
|
|
|
func coreClient(cfg string) (kubernetes.Interface, error) {
|
|
restConfig, err := clientcmd.BuildConfigFromFlags("", cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return kubernetes.NewForConfig(restConfig)
|
|
}
|
|
|
|
func Run(ctx context.Context, cfg cmds.Agent) error {
|
|
if err := validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if cfg.Rootless && !cfg.RootlessAlreadyUnshared {
|
|
if err := rootless.Rootless(cfg.DataDir); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
agentDir := filepath.Join(cfg.DataDir, "agent")
|
|
if err := os.MkdirAll(agentDir, 0700); err != nil {
|
|
return err
|
|
}
|
|
|
|
proxy, err := proxy.NewSupervisorProxy(ctx, !cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
newToken, err := clientaccess.ParseAndValidateTokenForUser(proxy.SupervisorURL(), cfg.Token, "node")
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(2 * time.Second):
|
|
}
|
|
continue
|
|
}
|
|
cfg.Token = newToken.String()
|
|
break
|
|
}
|
|
systemd.SdNotify(true, "READY=1\n")
|
|
return run(ctx, cfg, proxy)
|
|
}
|
|
|
|
func validate() error {
|
|
if cgroups.Mode() == cgroups.Unified {
|
|
return validateCgroupsV2()
|
|
}
|
|
return validateCgroupsV1()
|
|
}
|
|
|
|
func validateCgroupsV1() error {
|
|
cgroups, err := ioutil.ReadFile("/proc/self/cgroup")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !strings.Contains(string(cgroups), "cpuset") {
|
|
logrus.Warn(`Failed to find cpuset cgroup, you may need to add "cgroup_enable=cpuset" to your linux cmdline (/boot/cmdline.txt on a Raspberry Pi)`)
|
|
}
|
|
|
|
if !strings.Contains(string(cgroups), "memory") {
|
|
msg := "ailed to find memory cgroup, you may need to add \"cgroup_memory=1 cgroup_enable=memory\" to your linux cmdline (/boot/cmdline.txt on a Raspberry Pi)"
|
|
logrus.Error("F" + msg)
|
|
return errors.New("f" + msg)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func validateCgroupsV2() error {
|
|
manager, err := cgroupsv2.LoadManager("/sys/fs/cgroup", "/")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
controllers, err := manager.RootControllers()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m := make(map[string]struct{})
|
|
for _, controller := range controllers {
|
|
m[controller] = struct{}{}
|
|
}
|
|
for _, controller := range []string{"cpu", "cpuset", "memory"} {
|
|
if _, ok := m[controller]; !ok {
|
|
return fmt.Errorf("failed to find %s cgroup (v2)", controller)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v1.NodeInterface) error {
|
|
count := 0
|
|
for {
|
|
node, err := nodes.Get(ctx, agentConfig.NodeName, metav1.GetOptions{})
|
|
if err != nil {
|
|
if count%30 == 0 {
|
|
logrus.Infof("Waiting for kubelet to be ready on node %s: %v", agentConfig.NodeName, err)
|
|
}
|
|
count++
|
|
time.Sleep(1 * time.Second)
|
|
continue
|
|
}
|
|
|
|
updateNode := false
|
|
if labels, changed := updateMutableLabels(agentConfig, node.Labels); changed {
|
|
node.Labels = labels
|
|
updateNode = true
|
|
}
|
|
|
|
if !agentConfig.DisableCCM {
|
|
if annotations, changed := updateAddressAnnotations(agentConfig, node.Annotations); changed {
|
|
node.Annotations = annotations
|
|
updateNode = true
|
|
}
|
|
if labels, changed := updateLegacyAddressLabels(agentConfig, node.Labels); changed {
|
|
node.Labels = labels
|
|
updateNode = true
|
|
}
|
|
}
|
|
|
|
// inject node config
|
|
if changed, err := nodeconfig.SetNodeConfigAnnotations(node); err != nil {
|
|
return err
|
|
} else if changed {
|
|
updateNode = true
|
|
}
|
|
|
|
if updateNode {
|
|
if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
|
|
logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err)
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(time.Second):
|
|
continue
|
|
}
|
|
}
|
|
logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName)
|
|
} else {
|
|
logrus.Infof("labels have already set on node: %s", agentConfig.NodeName)
|
|
}
|
|
|
|
break
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func updateMutableLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string]string) (map[string]string, bool) {
|
|
result := map[string]string{}
|
|
|
|
for _, m := range agentConfig.NodeLabels {
|
|
var (
|
|
v string
|
|
p = strings.SplitN(m, `=`, 2)
|
|
k = p[0]
|
|
)
|
|
if len(p) > 1 {
|
|
v = p[1]
|
|
}
|
|
result[k] = v
|
|
}
|
|
result = labels.Merge(nodeLabels, result)
|
|
return result, !equality.Semantic.DeepEqual(nodeLabels, result)
|
|
}
|
|
|
|
func updateLegacyAddressLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string]string) (map[string]string, bool) {
|
|
ls := labels.Set(nodeLabels)
|
|
if ls.Has(cp.InternalIPKey) || ls.Has(cp.HostnameKey) {
|
|
result := map[string]string{
|
|
cp.InternalIPKey: agentConfig.NodeIP,
|
|
cp.HostnameKey: agentConfig.NodeName,
|
|
}
|
|
|
|
if agentConfig.NodeExternalIP != "" {
|
|
result[cp.ExternalIPKey] = agentConfig.NodeExternalIP
|
|
}
|
|
|
|
result = labels.Merge(nodeLabels, result)
|
|
return result, !equality.Semantic.DeepEqual(nodeLabels, result)
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
func updateAddressAnnotations(agentConfig *daemonconfig.Agent, nodeAnnotations map[string]string) (map[string]string, bool) {
|
|
result := map[string]string{
|
|
cp.InternalIPKey: util.JoinIPs(agentConfig.NodeIPs),
|
|
cp.HostnameKey: agentConfig.NodeName,
|
|
}
|
|
|
|
if agentConfig.NodeExternalIP != "" {
|
|
result[cp.ExternalIPKey] = util.JoinIPs(agentConfig.NodeExternalIPs)
|
|
}
|
|
|
|
result = labels.Merge(nodeAnnotations, result)
|
|
return result, !equality.Semantic.DeepEqual(nodeAnnotations, result)
|
|
}
|
|
|
|
// setupTunnelAndRunAgent should start the setup tunnel before starting kubelet and kubeproxy
|
|
// there are special case for etcd agents, it will wait until it can find the apiaddress from
|
|
// the address channel and update the proxy with the servers addresses, if in rke2 we need to
|
|
// start the agent before the tunnel is setup to allow kubelet to start first and start the pods
|
|
func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
|
|
var agentRan bool
|
|
if cfg.ETCDAgent {
|
|
// only in rke2 run the agent before the tunnel setup and check for that later in the function
|
|
if proxy.IsAPIServerLBEnabled() {
|
|
if err := agent.Agent(&nodeConfig.AgentConfig); err != nil {
|
|
return err
|
|
}
|
|
agentRan = true
|
|
}
|
|
select {
|
|
case address := <-cfg.APIAddressCh:
|
|
cfg.ServerURL = address
|
|
u, err := url.Parse(cfg.ServerURL)
|
|
if err != nil {
|
|
logrus.Warn(err)
|
|
}
|
|
proxy.Update([]string{fmt.Sprintf("%s:%d", u.Hostname(), nodeConfig.ServerHTTPSPort)})
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
} else if cfg.ClusterReset && proxy.IsAPIServerLBEnabled() {
|
|
if err := agent.Agent(&nodeConfig.AgentConfig); err != nil {
|
|
return err
|
|
}
|
|
agentRan = true
|
|
}
|
|
|
|
if err := tunnel.Setup(ctx, nodeConfig, proxy); err != nil {
|
|
return err
|
|
}
|
|
if !agentRan {
|
|
return agent.Agent(&nodeConfig.AgentConfig)
|
|
}
|
|
return nil
|
|
}
|