mirror of
https://github.com/k3s-io/k3s.git
synced 2024-06-07 19:41:36 +00:00
Watch the local Node object instead of get/sleep looping
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
parent
3fe460d080
commit
5acd0b9008
@ -2,20 +2,21 @@ package flannel
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/rancher/k3s/pkg/agent/util"
|
"github.com/rancher/k3s/pkg/agent/util"
|
||||||
"github.com/rancher/k3s/pkg/daemons/config"
|
"github.com/rancher/k3s/pkg/daemons/config"
|
||||||
"github.com/rancher/k3s/pkg/version"
|
"github.com/rancher/k3s/pkg/version"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
utilsnet "k8s.io/utils/net"
|
utilsnet "k8s.io/utils/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -87,27 +88,14 @@ func Prepare(ctx context.Context, nodeConfig *config.Node) error {
|
|||||||
return createFlannelConf(nodeConfig)
|
return createFlannelConf(nodeConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Run(ctx context.Context, nodeConfig *config.Node, nodes v1.NodeInterface) error {
|
func Run(ctx context.Context, nodeConfig *config.Node, nodes typedcorev1.NodeInterface) error {
|
||||||
nodeName := nodeConfig.AgentConfig.NodeName
|
if err := waitForPodCIDR(ctx, nodeConfig.AgentConfig.NodeName, nodes); err != nil {
|
||||||
|
return errors.Wrap(err, "failed to wait for PodCIDR assignment")
|
||||||
for {
|
|
||||||
node, err := nodes.Get(ctx, nodeName, metav1.GetOptions{})
|
|
||||||
if err == nil && node.Spec.PodCIDR != "" {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
logrus.Info("Waiting for node " + nodeName + " CIDR not assigned yet")
|
|
||||||
} else {
|
|
||||||
logrus.Infof("Waiting for node %s: %v", nodeName, err)
|
|
||||||
}
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
}
|
}
|
||||||
logrus.Info("Node CIDR assigned for: " + nodeName)
|
|
||||||
|
|
||||||
netMode, err := findNetMode(nodeConfig.AgentConfig.ClusterCIDRs)
|
netMode, err := findNetMode(nodeConfig.AgentConfig.ClusterCIDRs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Fatalf("Error checking netMode")
|
return errors.Wrap(err, "failed to check netMode for flannel")
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
err := flannel(ctx, nodeConfig.FlannelIface, nodeConfig.FlannelConf, nodeConfig.AgentConfig.KubeConfigKubelet, netMode)
|
err := flannel(ctx, nodeConfig.FlannelIface, nodeConfig.FlannelConf, nodeConfig.AgentConfig.KubeConfigKubelet, netMode)
|
||||||
@ -119,6 +107,28 @@ func Run(ctx context.Context, nodeConfig *config.Node, nodes v1.NodeInterface) e
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForPodCIDR watches nodes with this node's name, and returns when the PodCIDR has been set.
|
||||||
|
func waitForPodCIDR(ctx context.Context, nodeName string, nodes typedcorev1.NodeInterface) error {
|
||||||
|
fieldSelector := fields.Set{metav1.ObjectNameField: nodeName}.String()
|
||||||
|
watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer watch.Stop()
|
||||||
|
|
||||||
|
for ev := range watch.ResultChan() {
|
||||||
|
node, ok := ev.Object.(*corev1.Node)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("could not convert event object to node: %v", ev)
|
||||||
|
}
|
||||||
|
if node.Spec.PodCIDR != "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logrus.Info("PodCIDR assigned for node " + nodeName)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func createCNIConf(dir string) error {
|
func createCNIConf(dir string) error {
|
||||||
if dir == "" {
|
if dir == "" {
|
||||||
return nil
|
return nil
|
||||||
|
@ -29,11 +29,13 @@ import (
|
|||||||
"github.com/rancher/k3s/pkg/rootless"
|
"github.com/rancher/k3s/pkg/rootless"
|
||||||
"github.com/rancher/k3s/pkg/util"
|
"github.com/rancher/k3s/pkg/util"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/equality"
|
"k8s.io/apimachinery/pkg/api/equality"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
app2 "k8s.io/kubernetes/cmd/kube-proxy/app"
|
app2 "k8s.io/kubernetes/cmd/kube-proxy/app"
|
||||||
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
|
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
|
||||||
@ -106,16 +108,16 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
|
|||||||
|
|
||||||
util.WaitForAPIServerReady(coreClient, 30*time.Second)
|
util.WaitForAPIServerReady(coreClient, 30*time.Second)
|
||||||
|
|
||||||
|
if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if !nodeConfig.NoFlannel {
|
if !nodeConfig.NoFlannel {
|
||||||
if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil {
|
if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !nodeConfig.AgentConfig.DisableNPC {
|
if !nodeConfig.AgentConfig.DisableNPC {
|
||||||
if err := netpol.Run(ctx, nodeConfig); err != nil {
|
if err := netpol.Run(ctx, nodeConfig); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -220,17 +222,18 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
|
|||||||
return run(ctx, cfg, proxy)
|
return run(ctx, cfg, proxy)
|
||||||
}
|
}
|
||||||
|
|
||||||
func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v1.NodeInterface) error {
|
func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes typedcorev1.NodeInterface) error {
|
||||||
count := 0
|
fieldSelector := fields.Set{metav1.ObjectNameField: agentConfig.NodeName}.String()
|
||||||
for {
|
watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
|
||||||
node, err := nodes.Get(ctx, agentConfig.NodeName, metav1.GetOptions{})
|
if err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
if count%30 == 0 {
|
}
|
||||||
logrus.Infof("Waiting for kubelet to be ready on node %s: %v", agentConfig.NodeName, err)
|
defer watch.Stop()
|
||||||
}
|
|
||||||
count++
|
for ev := range watch.ResultChan() {
|
||||||
time.Sleep(1 * time.Second)
|
node, ok := ev.Object.(*corev1.Node)
|
||||||
continue
|
if !ok {
|
||||||
|
return fmt.Errorf("could not convert event object to node: %v", ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
updateNode := false
|
updateNode := false
|
||||||
@ -260,12 +263,7 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v
|
|||||||
if updateNode {
|
if updateNode {
|
||||||
if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
|
if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
|
||||||
logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err)
|
logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err)
|
||||||
select {
|
continue
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName)
|
logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName)
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user