From 0b96ca92bc32511b4fe1d1d3b1562d0908157ed5 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Thu, 29 Sep 2022 20:37:50 +0000 Subject: [PATCH] Move servicelb into cloudprovider LoadBalancer interface Signed-off-by: Brad Davidson --- manifests/ccm.yaml | 27 +- pkg/cloudprovider/cloudprovider.go | 92 ++++- pkg/cloudprovider/loadbalancer.go | 53 +++ pkg/cloudprovider/servicelb.go | 642 +++++++++++++++++++++++++++++ pkg/daemons/agent/agent_linux.go | 4 +- pkg/daemons/config/types.go | 3 +- pkg/daemons/control/deps/deps.go | 24 ++ pkg/daemons/control/server.go | 7 +- pkg/daemons/executor/embed.go | 15 +- pkg/deploy/zz_generated_bindata.go | 2 +- pkg/server/server.go | 19 +- pkg/servicelb/controller.go | 626 ---------------------------- 12 files changed, 832 insertions(+), 682 deletions(-) create mode 100644 pkg/cloudprovider/loadbalancer.go create mode 100644 pkg/cloudprovider/servicelb.go delete mode 100644 pkg/servicelb/controller.go diff --git a/manifests/ccm.yaml b/manifests/ccm.yaml index b9bf48ab64..57ec04d631 100644 --- a/manifests/ccm.yaml +++ b/manifests/ccm.yaml @@ -24,21 +24,22 @@ rules: resources: - nodes verbs: - - '*' + - "*" - apiGroups: - "" resources: - nodes/status + - services/status verbs: - patch - apiGroups: - "" resources: - services + - pods verbs: + - get - list - - patch - - update - watch - apiGroups: - "" @@ -49,22 +50,16 @@ rules: - apiGroups: - "" resources: - - persistentvolumes - verbs: - - get - - list - - update - - watch -- apiGroups: - - "" - resources: - - endpoints + - namespaces verbs: - create - get - - list - - watch - - update +- apiGroups: + - apps + resources: + - daemonsets + verbs: + - "*" --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index cbc9bb664a..ce388bd733 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -1,24 +1,112 @@ package cloudprovider import ( + "encoding/json" "io" + "io/ioutil" + "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" + "github.com/rancher/wrangler/pkg/apply" + "github.com/rancher/wrangler/pkg/generated/controllers/apps" + appsclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1" + "github.com/rancher/wrangler/pkg/generated/controllers/core" + coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" + "github.com/rancher/wrangler/pkg/generic" + "github.com/rancher/wrangler/pkg/start" + "github.com/sirupsen/logrus" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" ) +// Config describes externally-configurable cloud provider configuration. +// This is normally unmarshalled from a JSON config file. +type Config struct { + LBEnabled bool `json:"lbEnabled"` + LBImage string `json:"lbImage"` + LBNamespace string `json:"lbNamespace"` + Rootless bool `json:"rootless"` +} + type k3s struct { + Config + + client kubernetes.Interface + recorder record.EventRecorder + + processor apply.Apply + daemonsetCache appsclient.DaemonSetCache + nodeCache coreclient.NodeCache + podCache coreclient.PodCache + workqueue workqueue.RateLimitingInterface } var _ cloudprovider.Interface = &k3s{} func init() { cloudprovider.RegisterCloudProvider(version.Program, func(config io.Reader) (cloudprovider.Interface, error) { - return &k3s{}, nil + var err error + k := k3s{ + Config: Config{ + LBEnabled: true, + LBImage: DefaultLBImage, + LBNamespace: DefaultLBNS, + }, + } + + if config != nil { + var bytes []byte + bytes, err = ioutil.ReadAll(config) + if err == nil { + err = json.Unmarshal(bytes, &k.Config) + } + } + + return &k, err }) } func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) { + ctx, _ := wait.ContextForChannel(stop) + config := clientBuilder.ConfigOrDie(controllerName) + k.client = kubernetes.NewForConfigOrDie(config) + + if k.LBEnabled { + // Wrangler controller and caches are only needed if the load balancer controller is enabled. + k.recorder = util.BuildControllerEventRecorder(k.client, controllerName, meta.NamespaceAll) + coreFactory := core.NewFactoryFromConfigOrDie(config) + k.nodeCache = coreFactory.Core().V1().Node().Cache() + + lbCoreFactory := core.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace}) + lbAppsFactory := apps.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace}) + + processor, err := apply.NewForConfig(config) + if err != nil { + logrus.Fatalf("Failed to create apply processor for %s: %v", controllerName, err) + } + k.processor = processor.WithDynamicLookup().WithCacheTypes(lbAppsFactory.Apps().V1().DaemonSet()) + k.daemonsetCache = lbAppsFactory.Apps().V1().DaemonSet().Cache() + k.podCache = lbCoreFactory.Core().V1().Pod().Cache() + k.workqueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + + if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod()); err != nil { + logrus.Fatalf("Failed to register %s handlers: %v", controllerName, err) + } + + if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory); err != nil { + logrus.Fatalf("Failed to start %s controllers: %v", controllerName, err) + } + } else { + // If load-balancer functionality has not been enabled, delete managed daemonsets. + // This uses the raw kubernetes client, as the controllers are not started when the load balancer controller is disabled. + if err := k.deleteAllDaemonsets(ctx); err != nil { + logrus.Fatalf("Failed to clean up %s daemonsets: %v", controllerName, err) + } + } } func (k *k3s) Instances() (cloudprovider.Instances, bool) { @@ -30,7 +118,7 @@ func (k *k3s) InstancesV2() (cloudprovider.InstancesV2, bool) { } func (k *k3s) LoadBalancer() (cloudprovider.LoadBalancer, bool) { - return nil, false + return k, k.LBEnabled } func (k *k3s) Zones() (cloudprovider.Zones, bool) { diff --git a/pkg/cloudprovider/loadbalancer.go b/pkg/cloudprovider/loadbalancer.go new file mode 100644 index 0000000000..693511bf4c --- /dev/null +++ b/pkg/cloudprovider/loadbalancer.go @@ -0,0 +1,53 @@ +package cloudprovider + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + cloudprovider "k8s.io/cloud-provider" +) + +var _ cloudprovider.LoadBalancer = &k3s{} + +// GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is. +func (k *k3s) GetLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service) (*corev1.LoadBalancerStatus, bool, error) { + if _, err := k.getDaemonSet(service); err != nil { + if apierrors.IsNotFound(err) { + return nil, false, nil + } + return nil, false, err + } + + status, err := k.getStatus(service) + return status, true, err +} + +// GetLoadBalancerName returns the name of the load balancer. +func (k *k3s) GetLoadBalancerName(ctx context.Context, clusterName string, service *corev1.Service) string { + return generateName(service) +} + +// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer. +// The node list is unused; see the comment on UpdateLoadBalancer for information on why. +// This is called when the Service is created or changes. +func (k *k3s) EnsureLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service, nodes []*corev1.Node) (*corev1.LoadBalancerStatus, error) { + if err := k.deployDaemonSet(ctx, service); err != nil { + return nil, err + } + return nil, cloudprovider.ImplementedElsewhere +} + +// UpdateLoadBalancer updates hosts under the specified load balancer. +// This is not used, as it filters node updates based on criteria not compatible with how our DaemonSet selects +// nodes for inclusion. It also does not provide any opportunity to update the load balancer status. +// https://github.com/kubernetes/kubernetes/blob/v1.25.0/staging/src/k8s.io/cloud-provider/controllers/service/controller.go#L985-L993 +func (k *k3s) UpdateLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service, nodes []*corev1.Node) error { + return cloudprovider.ImplementedElsewhere +} + +// EnsureLoadBalancerDeleted deletes the specified load balancer if it exists, +// returning nil if the load balancer specified either didn't exist or was successfully deleted. +func (k *k3s) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *corev1.Service) error { + return k.deleteDaemonSet(ctx, service) +} diff --git a/pkg/cloudprovider/servicelb.go b/pkg/cloudprovider/servicelb.go new file mode 100644 index 0000000000..e3b4682926 --- /dev/null +++ b/pkg/cloudprovider/servicelb.go @@ -0,0 +1,642 @@ +package cloudprovider + +import ( + "context" + "errors" + "fmt" + "sort" + "strconv" + "strings" + "time" + + "github.com/k3s-io/k3s/pkg/version" + "github.com/rancher/wrangler/pkg/condition" + coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" + "github.com/rancher/wrangler/pkg/merr" + "github.com/rancher/wrangler/pkg/objectset" + "github.com/sirupsen/logrus" + apps "k8s.io/api/apps/v1" + core "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" + ccmapp "k8s.io/cloud-provider/app" + servicehelper "k8s.io/cloud-provider/service/helpers" + utilsnet "k8s.io/utils/net" + utilpointer "k8s.io/utils/pointer" +) + +var ( + finalizerName = "svccontroller." + version.Program + ".cattle.io/daemonset" + svcNameLabel = "svccontroller." + version.Program + ".cattle.io/svcname" + svcNamespaceLabel = "svccontroller." + version.Program + ".cattle.io/svcnamespace" + daemonsetNodeLabel = "svccontroller." + version.Program + ".cattle.io/enablelb" + daemonsetNodePoolLabel = "svccontroller." + version.Program + ".cattle.io/lbpool" + nodeSelectorLabel = "svccontroller." + version.Program + ".cattle.io/nodeselector" + controllerName = ccmapp.DefaultInitFuncConstructors["service"].InitContext.ClientName +) + +const ( + Ready = condition.Cond("Ready") + DefaultLBNS = meta.NamespaceSystem + DefaultLBImage = "rancher/klipper-lb:v0.3.5" +) + +func (k *k3s) Register(ctx context.Context, + nodes coreclient.NodeController, + pods coreclient.PodController, +) error { + nodes.OnChange(ctx, controllerName, k.onChangeNode) + pods.OnChange(ctx, controllerName, k.onChangePod) + + if err := k.createServiceLBNamespace(ctx); err != nil { + return err + } + + go wait.Until(k.runWorker, time.Second, ctx.Done()) + + return k.removeServiceFinalizers(ctx) +} + +// createServiceLBNamespace ensures that the configured namespace exists. +func (k *k3s) createServiceLBNamespace(ctx context.Context) error { + _, err := k.client.CoreV1().Namespaces().Create(ctx, &core.Namespace{ + ObjectMeta: meta.ObjectMeta{ + Name: k.LBNamespace, + }, + }, meta.CreateOptions{}) + if apierrors.IsAlreadyExists(err) { + return nil + } + return err +} + +// onChangePod handles changes to Pods. +// If the pod has labels that tie it to a service, and the pod has an IP assigned, +// enqueue an update to the service's status. +func (k *k3s) onChangePod(key string, pod *core.Pod) (*core.Pod, error) { + if pod == nil { + return nil, nil + } + + serviceName := pod.Labels[svcNameLabel] + if serviceName == "" { + return pod, nil + } + + serviceNamespace := pod.Labels[svcNamespaceLabel] + if serviceNamespace == "" { + return pod, nil + } + + if pod.Status.PodIP == "" { + return pod, nil + } + + k.workqueue.Add(serviceNamespace + "/" + serviceName) + return pod, nil +} + +// onChangeNode handles changes to Nodes. We need to handle this as we may need to kick the DaemonSet +// to add or remove pods from nodes if labels have changed. +func (k *k3s) onChangeNode(key string, node *core.Node) (*core.Node, error) { + if node == nil { + return nil, nil + } + if _, ok := node.Labels[daemonsetNodeLabel]; !ok { + return node, nil + } + + if err := k.updateDaemonSets(); err != nil { + return node, err + } + + return node, nil +} + +// runWorker dequeues Service changes from the work queue +// We run a lightweight work queue to handle service updates. We don't need the full overhead +// of a wrangler service controller and shared informer cache, but we do want to run changes +// through a keyed queue to reduce thrashing when pods are updated. Much of this is cribbed from +// https://github.com/rancher/lasso/blob/release/v2.5/pkg/controller/controller.go#L173-L215 +func (k *k3s) runWorker() { + for k.processNextWorkItem() { + } +} + +// processNextWorkItem does work for a single item in the queue, +// returning a boolean that indicates if the queue should continue +// to be serviced. +func (k *k3s) processNextWorkItem() bool { + obj, shutdown := k.workqueue.Get() + + if shutdown { + return false + } + + if err := k.processSingleItem(obj); err != nil && !apierrors.IsConflict(err) { + logrus.Errorf("%s: %v", controllerName, err) + } + return true +} + +// processSingleItem processes a single item from the work queue, +// requeueing it if the handler fails. +func (k *k3s) processSingleItem(obj interface{}) error { + var ( + key string + ok bool + ) + + defer k.workqueue.Done(obj) + + if key, ok = obj.(string); !ok { + logrus.Errorf("expected string in workqueue but got %#v", obj) + k.workqueue.Forget(obj) + return nil + } + keyParts := strings.SplitN(key, "/", 2) + if err := k.updateStatus(keyParts[0], keyParts[1]); err != nil { + k.workqueue.AddRateLimited(key) + return fmt.Errorf("error updating LoadBalancer Status for %s: %v, requeueing", key, err) + } + + k.workqueue.Forget(obj) + return nil + +} + +// updateServiceStatus updates the load balancer status for the matching service, if it exists and is a +// LoadBalancer service. The patchStatus function handles checking to see if status needs updating. +func (k *k3s) updateStatus(namespace, name string) error { + svc, err := k.client.CoreV1().Services(namespace).Get(context.TODO(), name, meta.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + if svc.Spec.Type != core.ServiceTypeLoadBalancer { + return nil + } + + previousStatus := svc.Status.LoadBalancer.DeepCopy() + newStatus, err := k.getStatus(svc) + if err != nil { + return err + } + + return k.patchStatus(svc, previousStatus, newStatus) +} + +// getDaemonSet returns the DaemonSet that should exist for the Service. +func (k *k3s) getDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { + return k.daemonsetCache.Get(k.LBNamespace, generateName(svc)) +} + +// getStatus returns a LoadBalancerStatus listing ingress IPs for all ready pods +// matching the selected service. +func (k *k3s) getStatus(svc *core.Service) (*core.LoadBalancerStatus, error) { + pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(map[string]string{ + svcNameLabel: svc.Name, + svcNamespaceLabel: svc.Namespace, + })) + + if err != nil { + return nil, err + } + + expectedIPs, err := k.podIPs(pods, svc) + if err != nil { + return nil, err + } + + sort.Strings(expectedIPs) + + loadbalancer := &core.LoadBalancerStatus{} + for _, ip := range expectedIPs { + loadbalancer.Ingress = append(loadbalancer.Ingress, core.LoadBalancerIngress{ + IP: ip, + }) + } + + return loadbalancer, nil +} + +// patchStatus patches the service status. If the status has not changed, this function is a no-op. +func (k *k3s) patchStatus(svc *core.Service, previousStatus, newStatus *core.LoadBalancerStatus) error { + if servicehelper.LoadBalancerStatusEqual(previousStatus, newStatus) { + return nil + } + + updated := svc.DeepCopy() + updated.Status.LoadBalancer = *newStatus + _, err := servicehelper.PatchService(k.client.CoreV1(), svc, updated) + if err == nil { + if len(newStatus.Ingress) == 0 { + k.recorder.Event(svc, core.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") + } else { + k.recorder.Eventf(svc, core.EventTypeNormal, "UpdatedLoadBalancer", "Updated LoadBalancer with new IPs: %v -> %v", ingressToString(previousStatus.Ingress), ingressToString(newStatus.Ingress)) + } + } + return err +} + +// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods. +// If at least one node has External IPs available, only external IPs are returned. +// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned. +func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) { + // Go doesn't have sets so we stuff things into a map of bools and then get lists of keys + // to determine the unique set of IPs in use by pods. + extIPs := map[string]bool{} + intIPs := map[string]bool{} + + for _, pod := range pods { + if pod.Spec.NodeName == "" || pod.Status.PodIP == "" { + continue + } + if !Ready.IsTrue(pod) { + continue + } + + node, err := k.nodeCache.Get(pod.Spec.NodeName) + if apierrors.IsNotFound(err) { + continue + } else if err != nil { + return nil, err + } + + for _, addr := range node.Status.Addresses { + if addr.Type == core.NodeExternalIP { + extIPs[addr.Address] = true + } else if addr.Type == core.NodeInternalIP { + intIPs[addr.Address] = true + } + } + } + + keys := func(addrs map[string]bool) (ips []string) { + for k := range addrs { + ips = append(ips, k) + } + return ips + } + + var ips []string + if len(extIPs) > 0 { + ips = keys(extIPs) + } else { + ips = keys(intIPs) + } + + ips, err := filterByIPFamily(ips, svc) + if err != nil { + return nil, err + } + + if len(ips) > 0 && k.Rootless { + return []string{"127.0.0.1"}, nil + } + + return ips, nil +} + +// filterByIPFamily filters ips based on dual-stack parameters of the service +func filterByIPFamily(ips []string, svc *core.Service) ([]string, error) { + var ipFamilyPolicy core.IPFamilyPolicyType + var ipv4Addresses []string + var ipv6Addresses []string + + for _, ip := range ips { + if utilsnet.IsIPv4String(ip) { + ipv4Addresses = append(ipv4Addresses, ip) + } + if utilsnet.IsIPv6String(ip) { + ipv6Addresses = append(ipv6Addresses, ip) + } + } + + if svc.Spec.IPFamilyPolicy != nil { + ipFamilyPolicy = *svc.Spec.IPFamilyPolicy + } + + switch ipFamilyPolicy { + case core.IPFamilyPolicySingleStack: + if svc.Spec.IPFamilies[0] == core.IPv4Protocol { + return ipv4Addresses, nil + } + if svc.Spec.IPFamilies[0] == core.IPv6Protocol { + return ipv6Addresses, nil + } + case core.IPFamilyPolicyPreferDualStack: + if svc.Spec.IPFamilies[0] == core.IPv4Protocol { + ipAddresses := append(ipv4Addresses, ipv6Addresses...) + return ipAddresses, nil + } + if svc.Spec.IPFamilies[0] == core.IPv6Protocol { + ipAddresses := append(ipv6Addresses, ipv4Addresses...) + return ipAddresses, nil + } + case core.IPFamilyPolicyRequireDualStack: + if (len(ipv4Addresses) == 0) || (len(ipv6Addresses) == 0) { + return nil, errors.New("one or more IP families did not have addresses available for service with ipFamilyPolicy=RequireDualStack") + } + if svc.Spec.IPFamilies[0] == core.IPv4Protocol { + ipAddresses := append(ipv4Addresses, ipv6Addresses...) + return ipAddresses, nil + } + if svc.Spec.IPFamilies[0] == core.IPv6Protocol { + ipAddresses := append(ipv6Addresses, ipv4Addresses...) + return ipAddresses, nil + } + } + + return nil, errors.New("unhandled ipFamilyPolicy") +} + +// deployDaemonSet ensures that there is a DaemonSet for the service. +func (k *k3s) deployDaemonSet(ctx context.Context, svc *core.Service) error { + ds, err := k.newDaemonSet(svc) + if err != nil { + return err + } + + defer k.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applied LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name) + return k.processor.WithContext(ctx).WithOwner(svc).Apply(objectset.NewObjectSet(ds)) +} + +// deleteDaemonSet ensures that there are no DaemonSets for the given service. +func (k *k3s) deleteDaemonSet(ctx context.Context, svc *core.Service) error { + name := generateName(svc) + if err := k.client.AppsV1().DaemonSets(k.LBNamespace).Delete(ctx, name, meta.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + defer k.recorder.Eventf(svc, core.EventTypeNormal, "DeletedDaemonSet", "Deleted LoadBalancer DaemonSet %s/%s", k.LBNamespace, name) + return nil +} + +// newDaemonSet creates a DaemonSet to ensure that ServiceLB pods are run on +// each eligible node. +func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { + name := generateName(svc) + oneInt := intstr.FromInt(1) + + sourceRanges, err := servicehelper.GetLoadBalancerSourceRanges(svc) + if err != nil { + return nil, err + } + + ds := &apps.DaemonSet{ + ObjectMeta: meta.ObjectMeta{ + Name: name, + Namespace: k.LBNamespace, + Labels: map[string]string{ + nodeSelectorLabel: "false", + svcNameLabel: svc.Name, + svcNamespaceLabel: svc.Namespace, + }, + }, + TypeMeta: meta.TypeMeta{ + Kind: "DaemonSet", + APIVersion: "apps/v1", + }, + Spec: apps.DaemonSetSpec{ + Selector: &meta.LabelSelector{ + MatchLabels: map[string]string{ + "app": name, + }, + }, + Template: core.PodTemplateSpec{ + ObjectMeta: meta.ObjectMeta{ + Labels: map[string]string{ + "app": name, + svcNameLabel: svc.Name, + svcNamespaceLabel: svc.Namespace, + }, + }, + Spec: core.PodSpec{ + AutomountServiceAccountToken: utilpointer.Bool(false), + }, + }, + UpdateStrategy: apps.DaemonSetUpdateStrategy{ + Type: apps.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &apps.RollingUpdateDaemonSet{ + MaxUnavailable: &oneInt, + }, + }, + }, + } + + var sysctls []core.Sysctl + for _, ipFamily := range svc.Spec.IPFamilies { + switch ipFamily { + case core.IPv4Protocol: + sysctls = append(sysctls, core.Sysctl{Name: "net.ipv4.ip_forward", Value: "1"}) + case core.IPv6Protocol: + sysctls = append(sysctls, core.Sysctl{Name: "net.ipv6.conf.all.forwarding", Value: "1"}) + } + } + + ds.Spec.Template.Spec.SecurityContext = &core.PodSecurityContext{Sysctls: sysctls} + + for _, port := range svc.Spec.Ports { + portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port) + container := core.Container{ + Name: portName, + Image: k.LBImage, + ImagePullPolicy: core.PullIfNotPresent, + Ports: []core.ContainerPort{ + { + Name: portName, + ContainerPort: port.Port, + HostPort: port.Port, + Protocol: port.Protocol, + }, + }, + Env: []core.EnvVar{ + { + Name: "SRC_PORT", + Value: strconv.Itoa(int(port.Port)), + }, + { + Name: "SRC_RANGES", + Value: strings.Join(sourceRanges.StringSlice(), " "), + }, + { + Name: "DEST_PROTO", + Value: string(port.Protocol), + }, + { + Name: "DEST_PORT", + Value: strconv.Itoa(int(port.Port)), + }, + { + Name: "DEST_IPS", + Value: strings.Join(svc.Spec.ClusterIPs, " "), + }, + }, + SecurityContext: &core.SecurityContext{ + Capabilities: &core.Capabilities{ + Add: []core.Capability{ + "NET_ADMIN", + }, + }, + }, + } + + ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container) + } + + // Add toleration to noderole.kubernetes.io/master=*:NoSchedule + masterToleration := core.Toleration{ + Key: "node-role.kubernetes.io/master", + Operator: "Exists", + Effect: "NoSchedule", + } + ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration) + + // Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule + controlPlaneToleration := core.Toleration{ + Key: "node-role.kubernetes.io/control-plane", + Operator: "Exists", + Effect: "NoSchedule", + } + ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration) + + // Add toleration to CriticalAddonsOnly + criticalAddonsOnlyToleration := core.Toleration{ + Key: "CriticalAddonsOnly", + Operator: "Exists", + } + ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration) + + // Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes + enableNodeSelector, err := k.nodeHasDaemonSetLabel() + if err != nil { + return nil, err + } + if enableNodeSelector { + ds.Spec.Template.Spec.NodeSelector = map[string]string{ + daemonsetNodeLabel: "true", + } + // Add node selector for "svccontroller.k3s.cattle.io/lbpool=" if service has lbpool label + if svc.Labels[daemonsetNodePoolLabel] != "" { + ds.Spec.Template.Spec.NodeSelector[daemonsetNodePoolLabel] = svc.Labels[daemonsetNodePoolLabel] + } + ds.Labels[nodeSelectorLabel] = "true" + } + return ds, nil +} + +// updateDaemonSets ensures that our DaemonSets have a NodeSelector present if one is enabled, +// and do not have one if it is not. Nodes are checked for this label when the DaemonSet is generated, +// but node labels may change between Service updates and the NodeSelector needs to be updated appropriately. +func (k *k3s) updateDaemonSets() error { + enableNodeSelector, err := k.nodeHasDaemonSetLabel() + if err != nil { + return err + } + + nodeSelector := labels.SelectorFromSet(map[string]string{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)}) + daemonsets, err := k.daemonsetCache.List(k.LBNamespace, nodeSelector) + if err != nil { + return err + } + + for _, ds := range daemonsets { + ds.Labels[nodeSelectorLabel] = fmt.Sprintf("%t", enableNodeSelector) + ds.Spec.Template.Spec.NodeSelector = map[string]string{} + if enableNodeSelector { + ds.Spec.Template.Spec.NodeSelector[daemonsetNodeLabel] = "true" + } + if _, err := k.client.AppsV1().DaemonSets(ds.Namespace).Update(context.TODO(), ds, meta.UpdateOptions{}); err != nil { + return err + } + } + return nil +} + +// nodeHasDaemonSetLabel returns true if any node is labeled for inclusion or exclusion +// from use by ServiceLB. If any node is labeled, only nodes with a label value of "true" +// will be used. +func (k *k3s) nodeHasDaemonSetLabel() (bool, error) { + selector, err := labels.Parse(daemonsetNodeLabel) + if err != nil { + return false, err + } + nodesWithLabel, err := k.nodeCache.List(selector) + return len(nodesWithLabel) > 0, err +} + +// deleteAllDaemonsets deletes all daemonsets created by this controller +func (k *k3s) deleteAllDaemonsets(ctx context.Context) error { + return k.client.AppsV1().DaemonSets(k.LBNamespace).DeleteCollection(ctx, meta.DeleteOptions{}, meta.ListOptions{LabelSelector: nodeSelectorLabel}) +} + +// removeServiceFinalizers ensures that there are no finalizers left on any services. +// Previous implementations of the servicelb controller manually added finalizers to services it managed; +// these need to be removed in order to release ownership to the cloud provider implementation. +func (k *k3s) removeServiceFinalizers(ctx context.Context) error { + services, err := k.client.CoreV1().Services(meta.NamespaceAll).List(ctx, meta.ListOptions{}) + if err != nil { + return err + } + + var errs merr.Errors + for _, svc := range services.Items { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + s, err := k.removeFinalizer(ctx, &svc) + svc = *s + return err + }); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return errs + } + return nil +} + +// removeFinalizer ensures that there is not a finalizer for this controller on the Service +func (k *k3s) removeFinalizer(ctx context.Context, svc *core.Service) (*core.Service, error) { + var found bool + for k, v := range svc.Finalizers { + if v != finalizerName { + continue + } + found = true + svc.Finalizers = append(svc.Finalizers[:k], svc.Finalizers[k+1:]...) + } + + if found { + return k.client.CoreV1().Services(svc.Namespace).Update(ctx, svc, meta.UpdateOptions{}) + } + return svc, nil +} + +// generateName generates a distinct name for the DaemonSet based on the service name and UID +func generateName(svc *core.Service) string { + return fmt.Sprintf("svclb-%s-%s", svc.Name, svc.UID[:8]) +} + +// ingressToString converts a list of LoadBalancerIngress entries to strings +func ingressToString(ingresses []core.LoadBalancerIngress) []string { + parts := make([]string, len(ingresses)) + for i, ingress := range ingresses { + if ingress.IP != "" { + parts[i] = ingress.IP + } else { + parts[i] = ingress.Hostname + } + } + return parts +} diff --git a/pkg/daemons/agent/agent_linux.go b/pkg/daemons/agent/agent_linux.go index 3c5431a1af..8d2d1be006 100644 --- a/pkg/daemons/agent/agent_linux.go +++ b/pkg/daemons/agent/agent_linux.go @@ -173,8 +173,8 @@ func kubeletArgs(cfg *config.Agent) map[string]string { argsMap["protect-kernel-defaults"] = "true" } - if !cfg.DisableServiceLB && cfg.EnableIPv6 { - argsMap["allowed-unsafe-sysctls"] = "net.ipv6.conf.all.forwarding" + if !cfg.DisableServiceLB { + argsMap["allowed-unsafe-sysctls"] = "net.ipv4.ip_forward,net.ipv6.conf.all.forwarding" } return argsMap diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 5e4a30c6b9..7294a6455e 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -308,7 +308,8 @@ type ControlRuntime struct { Tunnel http.Handler Authenticator authenticator.Request - EgressSelectorConfig string + EgressSelectorConfig string + CloudControllerConfig string ClientAuthProxyCert string ClientAuthProxyKey string diff --git a/pkg/daemons/control/deps/deps.go b/pkg/daemons/control/deps/deps.go index d5afef4582..75e44e6bcc 100644 --- a/pkg/daemons/control/deps/deps.go +++ b/pkg/daemons/control/deps/deps.go @@ -19,6 +19,7 @@ import ( "time" "github.com/k3s-io/k3s/pkg/clientaccess" + "github.com/k3s-io/k3s/pkg/cloudprovider" "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/passwd" "github.com/k3s-io/k3s/pkg/token" @@ -138,6 +139,7 @@ func CreateRuntimeCertFiles(config *config.Control) { runtime.ServingKubeletKey = filepath.Join(config.DataDir, "tls", "serving-kubelet.key") runtime.EgressSelectorConfig = filepath.Join(config.DataDir, "etc", "egress-selector-config.yaml") + runtime.CloudControllerConfig = filepath.Join(config.DataDir, "etc", "cloud-config.yaml") runtime.ClientAuthProxyCert = filepath.Join(config.DataDir, "tls", "client-auth-proxy.crt") runtime.ClientAuthProxyKey = filepath.Join(config.DataDir, "tls", "client-auth-proxy.key") @@ -187,6 +189,10 @@ func GenServerDeps(config *config.Control) error { return err } + if err := genCloudConfig(config); err != nil { + return err + } + return readTokens(runtime) } @@ -764,3 +770,21 @@ func genEgressSelectorConfig(controlConfig *config.Control) error { } return ioutil.WriteFile(controlConfig.Runtime.EgressSelectorConfig, b, 0600) } + +func genCloudConfig(controlConfig *config.Control) error { + cloudConfig := cloudprovider.Config{ + LBEnabled: !controlConfig.DisableServiceLB, + LBNamespace: controlConfig.ServiceLBNamespace, + LBImage: cloudprovider.DefaultLBImage, + Rootless: controlConfig.Rootless, + } + if controlConfig.SystemDefaultRegistry != "" { + cloudConfig.LBImage = controlConfig.SystemDefaultRegistry + "/" + cloudConfig.LBImage + } + b, err := json.Marshal(cloudConfig) + if err != nil { + return err + } + return ioutil.WriteFile(controlConfig.Runtime.CloudControllerConfig, b, 0600) + +} diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 079872042b..d5afa75a1f 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -302,6 +302,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error { "profiling": "false", "allocate-node-cidrs": "true", "cloud-provider": version.Program, + "cloud-config": runtime.CloudControllerConfig, "cluster-cidr": util.JoinIPNets(cfg.ClusterIPRanges), "configure-cloud-routes": "false", "kubeconfig": runtime.KubeConfigCloudController, @@ -371,9 +372,9 @@ func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.Cont User: version.Program + "-cloud-controller-manager", ResourceAttributes: &authorizationv1.ResourceAttributes{ Namespace: metav1.NamespaceSystem, - Verb: "get", - Resource: "configmaps", - Name: "extension-apiserver-authentication", + Verb: "*", + Resource: "daemonsets", + Group: "apps", }, }, } diff --git a/pkg/daemons/executor/embed.go b/pkg/daemons/executor/embed.go index cfba2bce4f..58430c7f8d 100644 --- a/pkg/daemons/executor/embed.go +++ b/pkg/daemons/executor/embed.go @@ -18,7 +18,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/authentication/authenticator" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -164,27 +163,17 @@ func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan } cloudInitializer := func(config *cloudcontrollerconfig.CompletedConfig) cloudprovider.Interface { - cloud, err := ccm.InitCloudProvider(version.Program, "") + cloud, err := ccm.InitCloudProvider(version.Program, config.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile) if err != nil { logrus.Fatalf("Cloud provider could not be initialized: %v", err) } if cloud == nil { logrus.Fatalf("Cloud provider is nil") } - - cloud.Initialize(config.ClientBuilder, make(chan struct{})) - if informerUserCloud, ok := cloud.(ccm.InformerUser); ok { - informerUserCloud.SetInformers(config.SharedInformers) - } - return cloud } - controllerInitializers := ccmapp.DefaultInitFuncConstructors - delete(controllerInitializers, "service") - delete(controllerInitializers, "route") - - command := ccmapp.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, cliflag.NamedFlagSets{}, wait.NeverStop) + command := ccmapp.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, ccmapp.DefaultInitFuncConstructors, cliflag.NamedFlagSets{}, ctx.Done()) command.SetArgs(args) go func() { diff --git a/pkg/deploy/zz_generated_bindata.go b/pkg/deploy/zz_generated_bindata.go index a9f99dffdb..a26744707e 100644 --- a/pkg/deploy/zz_generated_bindata.go +++ b/pkg/deploy/zz_generated_bindata.go @@ -91,7 +91,7 @@ func (fi bindataFileInfo) Sys() interface{} { return nil } -var _ccmYaml = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x94\x41\x8f\x13\x31\x0c\x85\xef\xf9\x15\xd1\x5e\x56\x42\x4a\x57\x88\x0b\x9a\x23\x1c\xb8\xaf\x04\x77\x37\x79\x74\x43\x33\x71\x14\x3b\xb3\xc0\xaf\x47\xe9\x2c\x68\x99\xa1\x55\x5b\x40\x70\x8b\x2a\xfb\x7b\xcf\xcf\xf5\x50\x89\x1f\x50\x25\x72\x1e\x6c\xdd\x92\xdf\x50\xd3\x07\xae\xf1\x2b\x69\xe4\xbc\xd9\xbf\x96\x4d\xe4\xbb\xe9\xa5\xd9\xc7\x1c\x06\xfb\x36\x35\x51\xd4\x7b\x4e\x30\x23\x94\x02\x29\x0d\xc6\xda\x4c\x23\x06\xbb\x7f\x25\xce\x27\x6e\xc1\x79\xce\x5a\x39\x25\x54\x37\x52\xa6\x1d\xaa\xa9\x2d\x41\x06\xe3\x2c\x95\xf8\xae\x72\x2b\xd2\x1b\x9d\xf5\xcc\x35\xc4\xfc\x5c\xcf\x58\x5b\x21\xdc\xaa\xc7\x53\x51\x02\x09\xc4\x58\x3b\xa1\x6e\x9f\x7e\xdb\x41\x67\x40\x05\x29\x0e\xcf\x56\x42\x7f\xae\x34\x6e\x6e\xd6\x48\x4c\xc8\xba\x40\x3e\x43\x15\x52\xff\x70\x31\x34\x73\x58\xda\xbc\x7d\x71\x7b\x41\xef\x9d\x28\x69\x5b\x20\x66\x2f\x67\x41\x04\x75\x8a\x7e\xe9\x21\x45\xd1\x5f\x4f\xd5\x9f\x8f\x17\xe3\xc9\x7b\x6e\xc7\xd2\x3b\x0b\x54\xfa\x9f\x4e\x14\x59\x27\x4e\x6d\x3c\xb6\xdb\x1f\xc6\xaf\xb3\x8b\x1c\x0a\xc7\x53\x6b\x5e\x09\x3d\xae\xf6\xee\x9c\xb9\xfe\x4a\xde\xc4\x1c\x62\xde\x5d\x7c\x2c\x9c\x70\x8f\x8f\xbd\xfa\xfb\x98\x27\x94\x8d\xb5\xeb\xf3\x3c\x4b\x47\xda\xf6\x13\xbc\x1e\xee\x72\x46\xbc\x17\xd4\xf3\x7a\xe7\x22\x29\xe4\x7b\x65\xdb\xc2\xc9\x17\x51\x8c\xff\x24\x31\xd7\xf9\x2e\x20\x61\x47\xca\x7f\x34\xc0\x79\xaa\x61\x21\xf0\xbf\x24\xf7\x9b\x91\x21\x6b\xf4\x07\xb2\xab\xa0\x70\xca\xdc\x95\x91\xfe\x94\x25\x3e\x2b\x72\x9f\xcd\x51\x89\xfd\x63\x72\xd4\xc6\x5f\xc9\xf7\x5b\x00\x00\x00\xff\xff\xc2\xa7\x17\xb8\xee\x06\x00\x00") +var _ccmYaml = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x94\x4f\x8f\x13\x31\x0c\xc5\xef\xf9\x14\x51\x8f\x48\xe9\x0a\x71\x41\x73\x84\x03\xf7\x95\xe0\xee\x26\x8f\x6e\x68\x26\x8e\x62\xa7\xfc\xf9\xf4\x68\x66\x76\xc5\x30\xa3\xae\x3a\x05\xc4\xde\x2c\x2b\xfe\xf9\xf9\x39\x09\x95\xf8\x09\x55\x22\xe7\xce\xd6\x03\xf9\x3d\x35\x7d\xe0\x1a\x7f\x90\x46\xce\xfb\xd3\x5b\xd9\x47\xbe\x3b\xbf\x36\xa7\x98\x43\x67\xdf\xa7\x26\x8a\x7a\xcf\x09\xa6\x87\x52\x20\xa5\xce\x58\x9b\xa9\x47\x67\x4f\x6f\xc4\xf9\xc4\x2d\x38\xcf\x59\x2b\xa7\x84\xea\x7a\xca\x74\x44\x35\xb5\x25\x48\x67\x9c\xa5\x12\x3f\x54\x6e\x45\x86\x42\x67\x3d\x73\x0d\x31\xcf\xfb\x19\x6b\x2b\x84\x5b\xf5\x78\x3c\x94\x40\x02\x31\xd6\x9e\x51\x0f\x8f\xb9\x23\x74\x02\x54\x90\x62\x0c\x5b\x09\x43\xb8\xea\xb1\xdb\xad\x91\x38\x23\xeb\x02\x39\x43\x15\x52\xff\xb0\x19\x9a\x39\x2c\x65\xee\x5e\xed\x36\xd4\xde\x89\x92\x36\x19\x13\x82\x7a\x8e\x7e\x9e\x9b\x61\x27\x7d\x57\x81\x9f\x38\x53\x1d\x87\x0b\x3e\xa6\x28\x53\xf0\x75\x33\x9a\xbc\xe7\x76\xc9\xcd\xeb\x86\xa7\x1e\x52\xc8\x2f\xdd\x9b\x6d\x64\xd0\xb9\x62\x51\x29\xb2\xa6\x05\x42\xcf\x59\xb0\x54\x34\xee\xc2\x39\x73\xfb\xad\x7f\x17\x73\x88\xf9\xb8\xf9\xf2\x73\xc2\x3d\x3e\x0f\xa7\x9f\x06\x78\xa6\xb3\xb1\x76\xfd\xdc\xae\xea\x23\xed\xf0\x05\x5e\xc7\x77\x36\x21\x3e\x0a\xea\x75\xb5\xf6\xd7\x12\x3a\x7b\x6a\x07\x38\xf9\x2e\x8a\xfe\xbf\x38\xe6\x06\xbe\x0b\x48\x38\x92\xf2\x5f\x35\x70\x9a\xaa\x5b\x34\x78\x29\xce\xfd\xa1\x65\xc8\x1a\xfd\x48\x76\x15\x14\x9e\x13\x77\xa3\xa5\xbf\x79\x89\x6f\x8a\x3c\xcc\xe6\xa8\xc4\xe1\x33\xb8\x28\xe3\x9f\xf8\xfb\x33\x00\x00\xff\xff\x4c\x59\x10\xd5\xbe\x06\x00\x00") func ccmYamlBytes() ([]byte, error) { return bindataRead( diff --git a/pkg/server/server.go b/pkg/server/server.go index fa0aca8837..6d0f981f38 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -23,7 +23,6 @@ import ( "github.com/k3s-io/k3s/pkg/nodepassword" "github.com/k3s-io/k3s/pkg/rootlessports" "github.com/k3s-io/k3s/pkg/secretsencrypt" - "github.com/k3s-io/k3s/pkg/servicelb" "github.com/k3s-io/k3s/pkg/static" "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" @@ -187,10 +186,9 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error { return err } - // apply SystemDefaultRegistry setting to Helm and ServiceLB before starting controllers + // apply SystemDefaultRegistry setting to Helm before starting controllers if config.ControlConfig.SystemDefaultRegistry != "" { helm.DefaultJobImage = config.ControlConfig.SystemDefaultRegistry + "/" + helm.DefaultJobImage - servicelb.DefaultLBImage = config.ControlConfig.SystemDefaultRegistry + "/" + servicelb.DefaultLBImage } if !config.ControlConfig.DisableHelmController { @@ -205,21 +203,6 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error { sc.Core.Core().V1().ConfigMap()) } - if err := servicelb.Register(ctx, - sc.K8s, - sc.Apply, - sc.Apps.Apps().V1().DaemonSet(), - sc.Apps.Apps().V1().Deployment(), - sc.Core.Core().V1().Node(), - sc.Core.Core().V1().Pod(), - sc.Core.Core().V1().Service(), - sc.Core.Core().V1().Endpoints(), - config.ControlConfig.ServiceLBNamespace, - !config.ControlConfig.DisableServiceLB, - config.ControlConfig.Rootless); err != nil { - return err - } - if config.ControlConfig.EncryptSecrets { if err := secretsencrypt.Register(ctx, sc.K8s, diff --git a/pkg/servicelb/controller.go b/pkg/servicelb/controller.go deleted file mode 100644 index 1d7506528f..0000000000 --- a/pkg/servicelb/controller.go +++ /dev/null @@ -1,626 +0,0 @@ -package servicelb - -import ( - "context" - "errors" - "fmt" - "sort" - "strconv" - "strings" - - util "github.com/k3s-io/k3s/pkg/util" - "github.com/k3s-io/k3s/pkg/version" - "github.com/rancher/wrangler/pkg/apply" - "github.com/rancher/wrangler/pkg/condition" - appclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1" - coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" - "github.com/rancher/wrangler/pkg/objectset" - "github.com/rancher/wrangler/pkg/relatedresource" - "github.com/rancher/wrangler/pkg/slice" - apps "k8s.io/api/apps/v1" - core "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes" - v1getter "k8s.io/client-go/kubernetes/typed/apps/v1" - coregetter "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" - utilsnet "k8s.io/utils/net" - utilpointer "k8s.io/utils/pointer" -) - -var ( - finalizerName = "svccontroller." + version.Program + ".cattle.io/daemonset" - svcNameLabel = "svccontroller." + version.Program + ".cattle.io/svcname" - svcNamespaceLabel = "svccontroller." + version.Program + ".cattle.io/svcnamespace" - daemonsetNodeLabel = "svccontroller." + version.Program + ".cattle.io/enablelb" - daemonsetNodePoolLabel = "svccontroller." + version.Program + ".cattle.io/lbpool" - nodeSelectorLabel = "svccontroller." + version.Program + ".cattle.io/nodeselector" - DefaultLBImage = "rancher/klipper-lb:v0.3.5" -) - -const ( - Ready = condition.Cond("Ready") - ControllerName = "svccontroller" -) - -func Register(ctx context.Context, - kubernetes kubernetes.Interface, - apply apply.Apply, - daemonSetController appclient.DaemonSetController, - deployments appclient.DeploymentController, - nodes coreclient.NodeController, - pods coreclient.PodController, - services coreclient.ServiceController, - endpoints coreclient.EndpointsController, - klipperLBNamespace string, - enabled, rootless bool) error { - h := &handler{ - rootless: rootless, - enabled: enabled, - klipperLBNamespace: klipperLBNamespace, - nodeCache: nodes.Cache(), - podCache: pods.Cache(), - deploymentCache: deployments.Cache(), - processor: apply.WithSetID(ControllerName).WithCacheTypes(daemonSetController), - serviceCache: services.Cache(), - services: kubernetes.CoreV1(), - daemonsets: kubernetes.AppsV1(), - deployments: kubernetes.AppsV1(), - recorder: util.BuildControllerEventRecorder(kubernetes, ControllerName, meta.NamespaceAll), - } - - services.OnChange(ctx, ControllerName, h.onChangeService) - nodes.OnChange(ctx, ControllerName, h.onChangeNode) - relatedresource.Watch(ctx, ControllerName+"-watcher", - h.onResourceChange, - services, - pods, - endpoints) - - if enabled { - if err := createServiceLBNamespace(ctx, h.klipperLBNamespace, kubernetes); err != nil { - return err - } - } - - return nil -} - -type handler struct { - rootless bool - klipperLBNamespace string - enabled bool - nodeCache coreclient.NodeCache - podCache coreclient.PodCache - deploymentCache appclient.DeploymentCache - processor apply.Apply - serviceCache coreclient.ServiceCache - services coregetter.ServicesGetter - daemonsets v1getter.DaemonSetsGetter - deployments v1getter.DeploymentsGetter - recorder record.EventRecorder -} - -func createServiceLBNamespace(ctx context.Context, ns string, k8s kubernetes.Interface) error { - _, err := k8s.CoreV1().Namespaces().Get(ctx, ns, meta.GetOptions{}) - if apierrors.IsNotFound(err) { - _, err := k8s.CoreV1().Namespaces().Create(ctx, &core.Namespace{ - ObjectMeta: meta.ObjectMeta{ - Name: ns, - }, - }, meta.CreateOptions{}) - return err - } - return err -} - -func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) ([]relatedresource.Key, error) { - if ep, ok := obj.(*core.Endpoints); ok { - return []relatedresource.Key{ - { - Name: ep.Name, - Namespace: ep.Namespace, - }, - }, nil - } - - pod, ok := obj.(*core.Pod) - if !ok { - return nil, nil - } - - serviceName := pod.Labels[svcNameLabel] - if serviceName == "" { - return nil, nil - } - - serviceNamespace := pod.Labels[svcNamespaceLabel] - if serviceNamespace == "" { - return nil, nil - } - - if pod.Status.PodIP == "" { - return nil, nil - } - - return []relatedresource.Key{ - { - Name: serviceName, - Namespace: serviceNamespace, - }, - }, nil -} - -// onChangeService handles changes to Services. -func (h *handler) onChangeService(key string, svc *core.Service) (*core.Service, error) { - if svc == nil { - return nil, nil - } - - if err := h.deployPod(svc); err != nil { - return svc, err - } - - // Don't return service because we don't want another update - _, err := h.updateService(svc) - return nil, err -} - -// onChangeNode handles changes to Nodes. We need to handle this as we may need to kick the DaemonSet -// to add or remove pods from nodes if labels have changed. -func (h *handler) onChangeNode(key string, node *core.Node) (*core.Node, error) { - if node == nil { - return nil, nil - } - if _, ok := node.Labels[daemonsetNodeLabel]; !ok { - return node, nil - } - - if err := h.updateDaemonSets(); err != nil { - return node, err - } - - return node, nil -} - -// updateService ensures that the Service ingress IP address list is in sync -// with the Nodes actually running pods for this service. -func (h *handler) updateService(svc *core.Service) (runtime.Object, error) { - if !h.enabled || svc.DeletionTimestamp != nil || svc.Spec.Type != core.ServiceTypeLoadBalancer { - return h.removeFinalizer(svc) - } - - pods, err := h.podCache.List(h.klipperLBNamespace, labels.SelectorFromSet(map[string]string{ - svcNameLabel: svc.Name, - svcNamespaceLabel: svc.Namespace, - })) - - if err != nil { - return svc, err - } - - existingIPs := serviceIPs(svc) - expectedIPs, err := h.podIPs(pods, svc) - if err != nil { - return svc, err - } - - sort.Strings(expectedIPs) - sort.Strings(existingIPs) - - if slice.StringsEqual(expectedIPs, existingIPs) { - return svc, nil - } - - svc = svc.DeepCopy() - svc, err = h.addFinalizer(svc) - if err != nil { - return svc, err - } - - svc.Status.LoadBalancer.Ingress = nil - for _, ip := range expectedIPs { - svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, core.LoadBalancerIngress{ - IP: ip, - }) - } - - defer h.recorder.Eventf(svc, core.EventTypeNormal, "UpdatedIngressIP", "LoadBalancer Ingress IP addresses updated: %s", strings.Join(expectedIPs, ", ")) - return h.services.Services(svc.Namespace).UpdateStatus(context.TODO(), svc, meta.UpdateOptions{}) -} - -// serviceIPs returns the list of ingress IP addresses from the Service -func serviceIPs(svc *core.Service) []string { - var ips []string - - for _, ingress := range svc.Status.LoadBalancer.Ingress { - if ingress.IP != "" { - ips = append(ips, ingress.IP) - } - } - - return ips -} - -// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods. -// If at least one node has External IPs available, only external IPs are returned. -// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned. -func (h *handler) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) { - // Go doesn't have sets so we stuff things into a map of bools and then get lists of keys - // to determine the unique set of IPs in use by pods. - extIPs := map[string]bool{} - intIPs := map[string]bool{} - - for _, pod := range pods { - if pod.Spec.NodeName == "" || pod.Status.PodIP == "" { - continue - } - if !Ready.IsTrue(pod) { - continue - } - - node, err := h.nodeCache.Get(pod.Spec.NodeName) - if apierrors.IsNotFound(err) { - continue - } else if err != nil { - return nil, err - } - - for _, addr := range node.Status.Addresses { - if addr.Type == core.NodeExternalIP { - extIPs[addr.Address] = true - } else if addr.Type == core.NodeInternalIP { - intIPs[addr.Address] = true - } - } - } - - keys := func(addrs map[string]bool) (ips []string) { - for k := range addrs { - ips = append(ips, k) - } - return ips - } - - var ips []string - if len(extIPs) > 0 { - ips = keys(extIPs) - } else { - ips = keys(intIPs) - } - - ips, err := filterByIPFamily(ips, svc) - if err != nil { - return nil, err - } - - if len(ips) > 0 && h.rootless { - return []string{"127.0.0.1"}, nil - } - - return ips, nil -} - -// filterByIPFamily filters ips based on dual-stack parameters of the service -func filterByIPFamily(ips []string, svc *core.Service) ([]string, error) { - var ipFamilyPolicy core.IPFamilyPolicyType - var ipv4Addresses []string - var ipv6Addresses []string - - for _, ip := range ips { - if utilsnet.IsIPv4String(ip) { - ipv4Addresses = append(ipv4Addresses, ip) - } - if utilsnet.IsIPv6String(ip) { - ipv6Addresses = append(ipv6Addresses, ip) - } - } - - if svc.Spec.IPFamilyPolicy != nil { - ipFamilyPolicy = *svc.Spec.IPFamilyPolicy - } - - switch ipFamilyPolicy { - case core.IPFamilyPolicySingleStack: - if svc.Spec.IPFamilies[0] == core.IPv4Protocol { - return ipv4Addresses, nil - } - if svc.Spec.IPFamilies[0] == core.IPv6Protocol { - return ipv6Addresses, nil - } - case core.IPFamilyPolicyPreferDualStack: - if svc.Spec.IPFamilies[0] == core.IPv4Protocol { - ipAddresses := append(ipv4Addresses, ipv6Addresses...) - return ipAddresses, nil - } - if svc.Spec.IPFamilies[0] == core.IPv6Protocol { - ipAddresses := append(ipv6Addresses, ipv4Addresses...) - return ipAddresses, nil - } - case core.IPFamilyPolicyRequireDualStack: - if (len(ipv4Addresses) == 0) || (len(ipv6Addresses) == 0) { - return nil, errors.New("one or more IP families did not have addresses available for service with ipFamilyPolicy=RequireDualStack") - } - if svc.Spec.IPFamilies[0] == core.IPv4Protocol { - ipAddresses := append(ipv4Addresses, ipv6Addresses...) - return ipAddresses, nil - } - if svc.Spec.IPFamilies[0] == core.IPv6Protocol { - ipAddresses := append(ipv6Addresses, ipv4Addresses...) - return ipAddresses, nil - } - } - - return nil, errors.New("unhandled ipFamilyPolicy") -} - -// deployPod ensures that there is a DaemonSet for the service. -// It also ensures that any legacy Deployments from older versions of ServiceLB are deleted. -func (h *handler) deployPod(svc *core.Service) error { - if err := h.deleteOldDeployments(svc); err != nil { - return err - } - - if !h.enabled || svc.DeletionTimestamp != nil || svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" { - return h.deletePod(svc) - } - - ds, err := h.newDaemonSet(svc) - if err != nil { - return err - } - - objs := objectset.NewObjectSet() - if ds != nil { - objs.Add(ds) - defer h.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applied LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name) - } - return h.processor.WithOwner(svc).Apply(objs) -} - -// deletePod ensures that there are no DaemonSets for the given service. -func (h *handler) deletePod(svc *core.Service) error { - name := generateName(svc) - if err := h.daemonsets.DaemonSets(h.klipperLBNamespace).Delete(context.TODO(), name, meta.DeleteOptions{}); err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - defer h.recorder.Eventf(svc, core.EventTypeNormal, "DeletedDaemonSet", "Deleted LoadBalancer DaemonSet %s/%s", h.klipperLBNamespace, name) - return nil -} - -// newDaemonSet creates a DaemonSet to ensure that ServiceLB pods are run on -// each eligible node. -func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { - name := generateName(svc) - oneInt := intstr.FromInt(1) - - // If ipv6 is present, we must enable ipv6 forwarding in the manifest - var ipv6Switch bool - for _, ipFamily := range svc.Spec.IPFamilies { - if ipFamily == core.IPv6Protocol { - ipv6Switch = true - } - } - - ds := &apps.DaemonSet{ - ObjectMeta: meta.ObjectMeta{ - Name: name, - Namespace: h.klipperLBNamespace, - Labels: map[string]string{ - nodeSelectorLabel: "false", - svcNameLabel: svc.Name, - svcNamespaceLabel: svc.Namespace, - }, - }, - TypeMeta: meta.TypeMeta{ - Kind: "DaemonSet", - APIVersion: "apps/v1", - }, - Spec: apps.DaemonSetSpec{ - Selector: &meta.LabelSelector{ - MatchLabels: map[string]string{ - "app": name, - }, - }, - Template: core.PodTemplateSpec{ - ObjectMeta: meta.ObjectMeta{ - Labels: map[string]string{ - "app": name, - svcNameLabel: svc.Name, - svcNamespaceLabel: svc.Namespace, - }, - }, - Spec: core.PodSpec{ - AutomountServiceAccountToken: utilpointer.Bool(false), - }, - }, - UpdateStrategy: apps.DaemonSetUpdateStrategy{ - Type: apps.RollingUpdateDaemonSetStrategyType, - RollingUpdate: &apps.RollingUpdateDaemonSet{ - MaxUnavailable: &oneInt, - }, - }, - }, - } - - if ipv6Switch { - // Add security context to enable ipv6 forwarding - securityContext := &core.PodSecurityContext{ - Sysctls: []core.Sysctl{ - { - Name: "net.ipv6.conf.all.forwarding", - Value: "1", - }, - }, - } - ds.Spec.Template.Spec.SecurityContext = securityContext - } - - for _, port := range svc.Spec.Ports { - portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port) - container := core.Container{ - Name: portName, - Image: DefaultLBImage, - ImagePullPolicy: core.PullIfNotPresent, - Ports: []core.ContainerPort{ - { - Name: portName, - ContainerPort: port.Port, - HostPort: port.Port, - Protocol: port.Protocol, - }, - }, - Env: []core.EnvVar{ - { - Name: "SRC_PORT", - Value: strconv.Itoa(int(port.Port)), - }, - { - Name: "DEST_PROTO", - Value: string(port.Protocol), - }, - { - Name: "DEST_PORT", - Value: strconv.Itoa(int(port.Port)), - }, - { - Name: "DEST_IPS", - Value: strings.Join(svc.Spec.ClusterIPs, " "), - }, - }, - SecurityContext: &core.SecurityContext{ - Capabilities: &core.Capabilities{ - Add: []core.Capability{ - "NET_ADMIN", - }, - }, - }, - } - - ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container) - } - - // Add toleration to noderole.kubernetes.io/master=*:NoSchedule - masterToleration := core.Toleration{ - Key: "node-role.kubernetes.io/master", - Operator: "Exists", - Effect: "NoSchedule", - } - ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration) - - // Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule - controlPlaneToleration := core.Toleration{ - Key: "node-role.kubernetes.io/control-plane", - Operator: "Exists", - Effect: "NoSchedule", - } - ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration) - - // Add toleration to CriticalAddonsOnly - criticalAddonsOnlyToleration := core.Toleration{ - Key: "CriticalAddonsOnly", - Operator: "Exists", - } - ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration) - - // Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes - selector, err := labels.Parse(daemonsetNodeLabel) - if err != nil { - return nil, err - } - nodesWithLabel, err := h.nodeCache.List(selector) - if err != nil { - return nil, err - } - if len(nodesWithLabel) > 0 { - ds.Spec.Template.Spec.NodeSelector = map[string]string{ - daemonsetNodeLabel: "true", - } - // Add node selector for "svccontroller.k3s.cattle.io/lbpool=" if service has lbpool label - if svc.Labels[daemonsetNodePoolLabel] != "" { - ds.Spec.Template.Spec.NodeSelector[daemonsetNodePoolLabel] = svc.Labels[daemonsetNodePoolLabel] - } - ds.Labels[nodeSelectorLabel] = "true" - } - return ds, nil -} - -func (h *handler) updateDaemonSets() error { - daemonsets, err := h.daemonsets.DaemonSets("").List(context.TODO(), meta.ListOptions{ - LabelSelector: nodeSelectorLabel + "=false", - }) - if err != nil { - return err - } - - for _, ds := range daemonsets.Items { - ds.Spec.Template.Spec.NodeSelector = map[string]string{ - daemonsetNodeLabel: "true", - } - ds.Labels[nodeSelectorLabel] = "true" - if _, err := h.daemonsets.DaemonSets(ds.Namespace).Update(context.TODO(), &ds, meta.UpdateOptions{}); err != nil { - return err - } - } - - return nil -} - -// deleteOldDeployments ensures that there are no legacy Deployments for ServiceLB pods. -// ServiceLB used to use Deployments before switching to DaemonSets in 875ba28 -func (h *handler) deleteOldDeployments(svc *core.Service) error { - name := fmt.Sprintf("svclb-%s", svc.Name) - if _, err := h.deploymentCache.Get(svc.Namespace, name); err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - return h.deployments.Deployments(svc.Namespace).Delete(context.TODO(), name, meta.DeleteOptions{}) -} - -// addFinalizer ensures that there is a finalizer for this controller on the Service -func (h *handler) addFinalizer(svc *core.Service) (*core.Service, error) { - if !h.hasFinalizer(svc) { - svc.Finalizers = append(svc.Finalizers, finalizerName) - return h.services.Services(svc.Namespace).Update(context.TODO(), svc, meta.UpdateOptions{}) - } - return svc, nil -} - -// removeFinalizer ensures that there is not a finalizer for this controller on the Service -func (h *handler) removeFinalizer(svc *core.Service) (*core.Service, error) { - if !h.hasFinalizer(svc) { - return svc, nil - } - - for k, v := range svc.Finalizers { - if v != finalizerName { - continue - } - svc.Finalizers = append(svc.Finalizers[:k], svc.Finalizers[k+1:]...) - } - return h.services.Services(svc.Namespace).Update(context.TODO(), svc, meta.UpdateOptions{}) -} - -// hasFinalizer returns a boolean indicating whether or not there is a finalizer for this controller on the Service -func (h *handler) hasFinalizer(svc *core.Service) bool { - for _, finalizer := range svc.Finalizers { - if finalizer == finalizerName { - return true - } - } - return false -} - -// generateName generates a distinct name for the DaemonSet based on the service name and UID -func generateName(svc *core.Service) string { - return fmt.Sprintf("svclb-%s-%s", svc.Name, svc.UID[:8]) -}