From 369b81b45e28303b35f4ffc6a6c40e9d320eb578 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Thu, 26 Jan 2023 23:48:12 +0000 Subject: [PATCH] Honor Service ExternalTrafficPolicy Signed-off-by: Brad Davidson --- manifests/ccm.yaml | 8 ++ pkg/cloudprovider/cloudprovider.go | 9 +- pkg/cloudprovider/servicelb.go | 219 ++++++++++++++++++----------- pkg/daemons/control/server.go | 6 +- 4 files changed, 157 insertions(+), 85 deletions(-) diff --git a/manifests/ccm.yaml b/manifests/ccm.yaml index f66a68ce1c..d72b7fded9 100644 --- a/manifests/ccm.yaml +++ b/manifests/ccm.yaml @@ -67,6 +67,14 @@ rules: - daemonsets verbs: - "*" +- apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index a44e10b52c..236bc97ef8 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -12,6 +12,8 @@ import ( appsclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1" "github.com/rancher/wrangler/pkg/generated/controllers/core" coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" + "github.com/rancher/wrangler/pkg/generated/controllers/discovery" + discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1" "github.com/rancher/wrangler/pkg/generic" "github.com/rancher/wrangler/pkg/start" "github.com/sirupsen/logrus" @@ -41,6 +43,7 @@ type k3s struct { processor apply.Apply daemonsetCache appsclient.DaemonSetCache + endpointsCache discoveryclient.EndpointSliceCache nodeCache coreclient.NodeCache podCache coreclient.PodCache workqueue workqueue.RateLimitingInterface @@ -89,6 +92,7 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st lbCoreFactory := core.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace}) lbAppsFactory := apps.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace}) + lbDiscFactory := discovery.NewFactoryFromConfigOrDie(config) processor, err := apply.NewForConfig(config) if err != nil { @@ -96,14 +100,15 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st } k.processor = processor.WithDynamicLookup().WithCacheTypes(lbAppsFactory.Apps().V1().DaemonSet()) k.daemonsetCache = lbAppsFactory.Apps().V1().DaemonSet().Cache() + k.endpointsCache = lbDiscFactory.Discovery().V1().EndpointSlice().Cache() k.podCache = lbCoreFactory.Core().V1().Pod().Cache() k.workqueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod()); err != nil { + if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod(), lbDiscFactory.Discovery().V1().EndpointSlice()); err != nil { logrus.Fatalf("Failed to register %s handlers: %v", controllerName, err) } - if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory); err != nil { + if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory, lbDiscFactory); err != nil { logrus.Fatalf("Failed to start %s controllers: %v", controllerName, err) } } else { diff --git a/pkg/cloudprovider/servicelb.go b/pkg/cloudprovider/servicelb.go index dfc70bcae5..89478f2069 100644 --- a/pkg/cloudprovider/servicelb.go +++ b/pkg/cloudprovider/servicelb.go @@ -12,11 +12,13 @@ import ( "github.com/k3s-io/k3s/pkg/version" "github.com/rancher/wrangler/pkg/condition" coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" + discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1" "github.com/rancher/wrangler/pkg/merr" "github.com/rancher/wrangler/pkg/objectset" "github.com/sirupsen/logrus" apps "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -48,9 +50,11 @@ const ( func (k *k3s) Register(ctx context.Context, nodes coreclient.NodeController, pods coreclient.PodController, + endpointslices discoveryclient.EndpointSliceController, ) error { nodes.OnChange(ctx, controllerName, k.onChangeNode) pods.OnChange(ctx, controllerName, k.onChangePod) + endpointslices.OnChange(ctx, controllerName, k.onChangeEndpointSlice) if err := k.createServiceLBNamespace(ctx); err != nil { return err @@ -135,6 +139,22 @@ func (k *k3s) onChangeNode(key string, node *core.Node) (*core.Node, error) { return node, nil } +// onChangeEndpointSlice handles changes to EndpointSlices. This is used to ensure that LoadBalancer +// addresses only list Nodes with ready Pods, when their ExternalTrafficPolicy is set to Local. +func (k *k3s) onChangeEndpointSlice(key string, eps *discovery.EndpointSlice) (*discovery.EndpointSlice, error) { + if eps == nil { + return nil, nil + } + + serviceName, ok := eps.Labels[discovery.LabelServiceName] + if !ok { + return eps, nil + } + + k.workqueue.Add(eps.Namespace + "/" + serviceName) + return eps, nil +} + // runWorker dequeues Service changes from the work queue // We run a lightweight work queue to handle service updates. We don't need the full overhead // of a wrangler service controller and shared informer cache, but we do want to run changes @@ -219,16 +239,37 @@ func (k *k3s) getDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { // getStatus returns a LoadBalancerStatus listing ingress IPs for all ready pods // matching the selected service. func (k *k3s) getStatus(svc *core.Service) (*core.LoadBalancerStatus, error) { - pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(map[string]string{ + var readyNodes map[string]bool + + if servicehelper.RequestsOnlyLocalTraffic(svc) { + readyNodes = map[string]bool{} + eps, err := k.endpointsCache.List(svc.Namespace, labels.SelectorFromSet(labels.Set{ + discovery.LabelServiceName: svc.Name, + })) + if err != nil { + return nil, err + } + + for _, ep := range eps { + for _, endpoint := range ep.Endpoints { + isPod := endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" + isReady := endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready + if isPod && isReady && endpoint.NodeName != nil { + readyNodes[*endpoint.NodeName] = true + } + } + } + } + + pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(labels.Set{ svcNameLabel: svc.Name, svcNamespaceLabel: svc.Namespace, })) - if err != nil { return nil, err } - expectedIPs, err := k.podIPs(pods, svc) + expectedIPs, err := k.podIPs(pods, svc, readyNodes) if err != nil { return nil, err } @@ -267,7 +308,7 @@ func (k *k3s) patchStatus(svc *core.Service, previousStatus, newStatus *core.Loa // podIPs returns a list of IPs for Nodes hosting ServiceLB Pods. // If at least one node has External IPs available, only external IPs are returned. // If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned. -func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) { +func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service, readyNodes map[string]bool) ([]string, error) { // Go doesn't have sets so we stuff things into a map of bools and then get lists of keys // to determine the unique set of IPs in use by pods. extIPs := map[string]bool{} @@ -280,6 +321,9 @@ func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) { if !Ready.IsTrue(pod) { continue } + if readyNodes != nil && !readyNodes[pod.Spec.NodeName] { + continue + } node, err := k.nodeCache.Get(pod.Spec.NodeName) if apierrors.IsNotFound(err) { @@ -405,54 +449,12 @@ func (k *k3s) deleteDaemonSet(ctx context.Context, svc *core.Service) error { func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { name := generateName(svc) oneInt := intstr.FromInt(1) - + localTraffic := servicehelper.RequestsOnlyLocalTraffic(svc) sourceRanges, err := servicehelper.GetLoadBalancerSourceRanges(svc) if err != nil { return nil, err } - ds := &apps.DaemonSet{ - ObjectMeta: meta.ObjectMeta{ - Name: name, - Namespace: k.LBNamespace, - Labels: map[string]string{ - nodeSelectorLabel: "false", - svcNameLabel: svc.Name, - svcNamespaceLabel: svc.Namespace, - }, - }, - TypeMeta: meta.TypeMeta{ - Kind: "DaemonSet", - APIVersion: "apps/v1", - }, - Spec: apps.DaemonSetSpec{ - Selector: &meta.LabelSelector{ - MatchLabels: map[string]string{ - "app": name, - }, - }, - Template: core.PodTemplateSpec{ - ObjectMeta: meta.ObjectMeta{ - Labels: map[string]string{ - "app": name, - svcNameLabel: svc.Name, - svcNamespaceLabel: svc.Namespace, - }, - }, - Spec: core.PodSpec{ - ServiceAccountName: "svclb", - AutomountServiceAccountToken: utilpointer.Bool(false), - }, - }, - UpdateStrategy: apps.DaemonSetUpdateStrategy{ - Type: apps.RollingUpdateDaemonSetStrategyType, - RollingUpdate: &apps.RollingUpdateDaemonSet{ - MaxUnavailable: &oneInt, - }, - }, - }, - } - var sysctls []core.Sysctl for _, ipFamily := range svc.Spec.IPFamilies { switch ipFamily { @@ -463,7 +465,66 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { } } - ds.Spec.Template.Spec.SecurityContext = &core.PodSecurityContext{Sysctls: sysctls} + ds := &apps.DaemonSet{ + ObjectMeta: meta.ObjectMeta{ + Name: name, + Namespace: k.LBNamespace, + Labels: labels.Set{ + nodeSelectorLabel: "false", + svcNameLabel: svc.Name, + svcNamespaceLabel: svc.Namespace, + }, + }, + TypeMeta: meta.TypeMeta{ + Kind: "DaemonSet", + APIVersion: "apps/v1", + }, + Spec: apps.DaemonSetSpec{ + Selector: &meta.LabelSelector{ + MatchLabels: labels.Set{ + "app": name, + }, + }, + Template: core.PodTemplateSpec{ + ObjectMeta: meta.ObjectMeta{ + Labels: labels.Set{ + "app": name, + svcNameLabel: svc.Name, + svcNamespaceLabel: svc.Namespace, + }, + }, + Spec: core.PodSpec{ + ServiceAccountName: "svclb", + AutomountServiceAccountToken: utilpointer.Bool(false), + SecurityContext: &core.PodSecurityContext{ + Sysctls: sysctls, + }, + Tolerations: []core.Toleration{ + { + Key: "node-role.kubernetes.io/master", + Operator: "Exists", + Effect: "NoSchedule", + }, + { + Key: "node-role.kubernetes.io/control-plane", + Operator: "Exists", + Effect: "NoSchedule", + }, + { + Key: "CriticalAddonsOnly", + Operator: "Exists", + }, + }, + }, + }, + UpdateStrategy: apps.DaemonSetUpdateStrategy{ + Type: apps.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &apps.RollingUpdateDaemonSet{ + MaxUnavailable: &oneInt, + }, + }, + }, + } for _, port := range svc.Spec.Ports { portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port) @@ -492,14 +553,6 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { Name: "DEST_PROTO", Value: string(port.Protocol), }, - { - Name: "DEST_PORT", - Value: strconv.Itoa(int(port.Port)), - }, - { - Name: "DEST_IPS", - Value: strings.Join(svc.Spec.ClusterIPs, " "), - }, }, SecurityContext: &core.SecurityContext{ Capabilities: &core.Capabilities{ @@ -510,32 +563,37 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { }, } + if localTraffic { + container.Env = append(container.Env, + core.EnvVar{ + Name: "DEST_PORT", + Value: strconv.Itoa(int(port.NodePort)), + }, + core.EnvVar{ + Name: "DEST_IPS", + ValueFrom: &core.EnvVarSource{ + FieldRef: &core.ObjectFieldSelector{ + FieldPath: "status.hostIP", + }, + }, + }, + ) + } else { + container.Env = append(container.Env, + core.EnvVar{ + Name: "DEST_PORT", + Value: strconv.Itoa(int(port.Port)), + }, + core.EnvVar{ + Name: "DEST_IPS", + Value: strings.Join(svc.Spec.ClusterIPs, " "), + }, + ) + } + ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container) } - // Add toleration to noderole.kubernetes.io/master=*:NoSchedule - masterToleration := core.Toleration{ - Key: "node-role.kubernetes.io/master", - Operator: "Exists", - Effect: "NoSchedule", - } - ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration) - - // Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule - controlPlaneToleration := core.Toleration{ - Key: "node-role.kubernetes.io/control-plane", - Operator: "Exists", - Effect: "NoSchedule", - } - ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration) - - // Add toleration to CriticalAddonsOnly - criticalAddonsOnlyToleration := core.Toleration{ - Key: "CriticalAddonsOnly", - Operator: "Exists", - } - ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration) - // Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes enableNodeSelector, err := k.nodeHasDaemonSetLabel() if err != nil { @@ -551,6 +609,7 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { } ds.Labels[nodeSelectorLabel] = "true" } + return ds, nil } @@ -563,7 +622,7 @@ func (k *k3s) updateDaemonSets() error { return err } - nodeSelector := labels.SelectorFromSet(map[string]string{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)}) + nodeSelector := labels.SelectorFromSet(labels.Set{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)}) daemonsets, err := k.daemonsetCache.List(k.LBNamespace, nodeSelector) if err != nil { return err diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 97872a48c6..0104cfa02c 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -371,9 +371,9 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error { func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.ControlRuntime, timeout time.Duration) error { return util.WaitForRBACReady(ctx, runtime.KubeConfigAdmin, timeout, authorizationv1.ResourceAttributes{ Namespace: metav1.NamespaceSystem, - Verb: "*", - Resource: "daemonsets", - Group: "apps", + Verb: "watch", + Resource: "endpointslices", + Group: "discovery.k8s.io", }, version.Program+"-cloud-controller-manager") }