diff --git a/pkg/agent/flannel/setup.go b/pkg/agent/flannel/setup.go index c80fd9b716..c735ea9c75 100644 --- a/pkg/agent/flannel/setup.go +++ b/pkg/agent/flannel/setup.go @@ -2,20 +2,21 @@ package flannel import ( "context" - "errors" "fmt" "net" "os" "path/filepath" "strings" - "time" + "github.com/pkg/errors" "github.com/rancher/k3s/pkg/agent/util" "github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/version" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/apimachinery/pkg/fields" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" utilsnet "k8s.io/utils/net" ) @@ -87,27 +88,14 @@ func Prepare(ctx context.Context, nodeConfig *config.Node) error { return createFlannelConf(nodeConfig) } -func Run(ctx context.Context, nodeConfig *config.Node, nodes v1.NodeInterface) error { - nodeName := nodeConfig.AgentConfig.NodeName - - for { - node, err := nodes.Get(ctx, nodeName, metav1.GetOptions{}) - if err == nil && node.Spec.PodCIDR != "" { - break - } - if err == nil { - logrus.Info("Waiting for node " + nodeName + " CIDR not assigned yet") - } else { - logrus.Infof("Waiting for node %s: %v", nodeName, err) - } - time.Sleep(2 * time.Second) +func Run(ctx context.Context, nodeConfig *config.Node, nodes typedcorev1.NodeInterface) error { + if err := waitForPodCIDR(ctx, nodeConfig.AgentConfig.NodeName, nodes); err != nil { + return errors.Wrap(err, "failed to wait for PodCIDR assignment") } - logrus.Info("Node CIDR assigned for: " + nodeName) netMode, err := findNetMode(nodeConfig.AgentConfig.ClusterCIDRs) if err != nil { - logrus.Fatalf("Error checking netMode") - return err + return errors.Wrap(err, "failed to check netMode for flannel") } go func() { err := flannel(ctx, nodeConfig.FlannelIface, nodeConfig.FlannelConf, nodeConfig.AgentConfig.KubeConfigKubelet, netMode) @@ -119,6 +107,28 @@ func Run(ctx context.Context, nodeConfig *config.Node, nodes v1.NodeInterface) e return nil } +// waitForPodCIDR watches nodes with this node's name, and returns when the PodCIDR has been set. +func waitForPodCIDR(ctx context.Context, nodeName string, nodes typedcorev1.NodeInterface) error { + fieldSelector := fields.Set{metav1.ObjectNameField: nodeName}.String() + watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + if err != nil { + return err + } + defer watch.Stop() + + for ev := range watch.ResultChan() { + node, ok := ev.Object.(*corev1.Node) + if !ok { + return fmt.Errorf("could not convert event object to node: %v", ev) + } + if node.Spec.PodCIDR != "" { + break + } + } + logrus.Info("PodCIDR assigned for node " + nodeName) + return nil +} + func createCNIConf(dir string) error { if dir == "" { return nil diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 20246a06de..6f9d6cb749 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -29,11 +29,13 @@ import ( "github.com/rancher/k3s/pkg/rootless" "github.com/rancher/k3s/pkg/util" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/clientcmd" app2 "k8s.io/kubernetes/cmd/kube-proxy/app" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" @@ -106,16 +108,16 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { util.WaitForAPIServerReady(coreClient, 30*time.Second) + if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil { + return err + } + if !nodeConfig.NoFlannel { if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil { return err } } - if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil { - return err - } - if !nodeConfig.AgentConfig.DisableNPC { if err := netpol.Run(ctx, nodeConfig); err != nil { return err @@ -220,17 +222,18 @@ func Run(ctx context.Context, cfg cmds.Agent) error { return run(ctx, cfg, proxy) } -func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v1.NodeInterface) error { - count := 0 - for { - node, err := nodes.Get(ctx, agentConfig.NodeName, metav1.GetOptions{}) - if err != nil { - if count%30 == 0 { - logrus.Infof("Waiting for kubelet to be ready on node %s: %v", agentConfig.NodeName, err) - } - count++ - time.Sleep(1 * time.Second) - continue +func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes typedcorev1.NodeInterface) error { + fieldSelector := fields.Set{metav1.ObjectNameField: agentConfig.NodeName}.String() + watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + if err != nil { + return err + } + defer watch.Stop() + + for ev := range watch.ResultChan() { + node, ok := ev.Object.(*corev1.Node) + if !ok { + return fmt.Errorf("could not convert event object to node: %v", ev) } updateNode := false @@ -260,12 +263,7 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v if updateNode { if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil { logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Second): - continue - } + continue } logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName) } else {