Move servicelb into cloudprovider LoadBalancer interface

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2022-09-29 20:37:50 +00:00 committed by Brad Davidson
parent a15e7e8b68
commit 0b96ca92bc
12 changed files with 832 additions and 682 deletions

View File

@ -24,21 +24,22 @@ rules:
resources:
- nodes
verbs:
- '*'
- "*"
- apiGroups:
- ""
resources:
- nodes/status
- services/status
verbs:
- patch
- apiGroups:
- ""
resources:
- services
- pods
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
@ -49,22 +50,16 @@ rules:
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- list
- update
- watch
- apiGroups:
- ""
resources:
- endpoints
- namespaces
verbs:
- create
- get
- list
- watch
- update
- apiGroups:
- apps
resources:
- daemonsets
verbs:
- "*"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding

View File

@ -1,24 +1,112 @@
package cloudprovider
import (
"encoding/json"
"io"
"io/ioutil"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/generated/controllers/apps"
appsclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1"
"github.com/rancher/wrangler/pkg/generated/controllers/core"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/start"
"github.com/sirupsen/logrus"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
)
// Config describes externally-configurable cloud provider configuration.
// This is normally unmarshalled from a JSON config file.
type Config struct {
LBEnabled bool `json:"lbEnabled"`
LBImage string `json:"lbImage"`
LBNamespace string `json:"lbNamespace"`
Rootless bool `json:"rootless"`
}
type k3s struct {
Config
client kubernetes.Interface
recorder record.EventRecorder
processor apply.Apply
daemonsetCache appsclient.DaemonSetCache
nodeCache coreclient.NodeCache
podCache coreclient.PodCache
workqueue workqueue.RateLimitingInterface
}
var _ cloudprovider.Interface = &k3s{}
func init() {
cloudprovider.RegisterCloudProvider(version.Program, func(config io.Reader) (cloudprovider.Interface, error) {
return &k3s{}, nil
var err error
k := k3s{
Config: Config{
LBEnabled: true,
LBImage: DefaultLBImage,
LBNamespace: DefaultLBNS,
},
}
if config != nil {
var bytes []byte
bytes, err = ioutil.ReadAll(config)
if err == nil {
err = json.Unmarshal(bytes, &k.Config)
}
}
return &k, err
})
}
func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) {
ctx, _ := wait.ContextForChannel(stop)
config := clientBuilder.ConfigOrDie(controllerName)
k.client = kubernetes.NewForConfigOrDie(config)
if k.LBEnabled {
// Wrangler controller and caches are only needed if the load balancer controller is enabled.
k.recorder = util.BuildControllerEventRecorder(k.client, controllerName, meta.NamespaceAll)
coreFactory := core.NewFactoryFromConfigOrDie(config)
k.nodeCache = coreFactory.Core().V1().Node().Cache()
lbCoreFactory := core.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
lbAppsFactory := apps.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
processor, err := apply.NewForConfig(config)
if err != nil {
logrus.Fatalf("Failed to create apply processor for %s: %v", controllerName, err)
}
k.processor = processor.WithDynamicLookup().WithCacheTypes(lbAppsFactory.Apps().V1().DaemonSet())
k.daemonsetCache = lbAppsFactory.Apps().V1().DaemonSet().Cache()
k.podCache = lbCoreFactory.Core().V1().Pod().Cache()
k.workqueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod()); err != nil {
logrus.Fatalf("Failed to register %s handlers: %v", controllerName, err)
}
if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory); err != nil {
logrus.Fatalf("Failed to start %s controllers: %v", controllerName, err)
}
} else {
// If load-balancer functionality has not been enabled, delete managed daemonsets.
// This uses the raw kubernetes client, as the controllers are not started when the load balancer controller is disabled.
if err := k.deleteAllDaemonsets(ctx); err != nil {
logrus.Fatalf("Failed to clean up %s daemonsets: %v", controllerName, err)
}
}
}
func (k *k3s) Instances() (cloudprovider.Instances, bool) {
@ -30,7 +118,7 @@ func (k *k3s) InstancesV2() (cloudprovider.InstancesV2, bool) {
}
func (k *k3s) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false
return k, k.LBEnabled
}
func (k *k3s) Zones() (cloudprovider.Zones, bool) {

View File

@ -0,0 +1,53 @@
package cloudprovider
import (
"context"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
cloudprovider "k8s.io/cloud-provider"
)
var _ cloudprovider.LoadBalancer = &k3s{}
// GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is.
func (k *k3s) GetLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service) (*corev1.LoadBalancerStatus, bool, error) {
if _, err := k.getDaemonSet(service); err != nil {
if apierrors.IsNotFound(err) {
return nil, false, nil
}
return nil, false, err
}
status, err := k.getStatus(service)
return status, true, err
}
// GetLoadBalancerName returns the name of the load balancer.
func (k *k3s) GetLoadBalancerName(ctx context.Context, clusterName string, service *corev1.Service) string {
return generateName(service)
}
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer.
// The node list is unused; see the comment on UpdateLoadBalancer for information on why.
// This is called when the Service is created or changes.
func (k *k3s) EnsureLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service, nodes []*corev1.Node) (*corev1.LoadBalancerStatus, error) {
if err := k.deployDaemonSet(ctx, service); err != nil {
return nil, err
}
return nil, cloudprovider.ImplementedElsewhere
}
// UpdateLoadBalancer updates hosts under the specified load balancer.
// This is not used, as it filters node updates based on criteria not compatible with how our DaemonSet selects
// nodes for inclusion. It also does not provide any opportunity to update the load balancer status.
// https://github.com/kubernetes/kubernetes/blob/v1.25.0/staging/src/k8s.io/cloud-provider/controllers/service/controller.go#L985-L993
func (k *k3s) UpdateLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service, nodes []*corev1.Node) error {
return cloudprovider.ImplementedElsewhere
}
// EnsureLoadBalancerDeleted deletes the specified load balancer if it exists,
// returning nil if the load balancer specified either didn't exist or was successfully deleted.
func (k *k3s) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *corev1.Service) error {
return k.deleteDaemonSet(ctx, service)
}

View File

@ -0,0 +1,642 @@
package cloudprovider
import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/wrangler/pkg/condition"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/merr"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
ccmapp "k8s.io/cloud-provider/app"
servicehelper "k8s.io/cloud-provider/service/helpers"
utilsnet "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer"
)
var (
finalizerName = "svccontroller." + version.Program + ".cattle.io/daemonset"
svcNameLabel = "svccontroller." + version.Program + ".cattle.io/svcname"
svcNamespaceLabel = "svccontroller." + version.Program + ".cattle.io/svcnamespace"
daemonsetNodeLabel = "svccontroller." + version.Program + ".cattle.io/enablelb"
daemonsetNodePoolLabel = "svccontroller." + version.Program + ".cattle.io/lbpool"
nodeSelectorLabel = "svccontroller." + version.Program + ".cattle.io/nodeselector"
controllerName = ccmapp.DefaultInitFuncConstructors["service"].InitContext.ClientName
)
const (
Ready = condition.Cond("Ready")
DefaultLBNS = meta.NamespaceSystem
DefaultLBImage = "rancher/klipper-lb:v0.3.5"
)
func (k *k3s) Register(ctx context.Context,
nodes coreclient.NodeController,
pods coreclient.PodController,
) error {
nodes.OnChange(ctx, controllerName, k.onChangeNode)
pods.OnChange(ctx, controllerName, k.onChangePod)
if err := k.createServiceLBNamespace(ctx); err != nil {
return err
}
go wait.Until(k.runWorker, time.Second, ctx.Done())
return k.removeServiceFinalizers(ctx)
}
// createServiceLBNamespace ensures that the configured namespace exists.
func (k *k3s) createServiceLBNamespace(ctx context.Context) error {
_, err := k.client.CoreV1().Namespaces().Create(ctx, &core.Namespace{
ObjectMeta: meta.ObjectMeta{
Name: k.LBNamespace,
},
}, meta.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
return nil
}
return err
}
// onChangePod handles changes to Pods.
// If the pod has labels that tie it to a service, and the pod has an IP assigned,
// enqueue an update to the service's status.
func (k *k3s) onChangePod(key string, pod *core.Pod) (*core.Pod, error) {
if pod == nil {
return nil, nil
}
serviceName := pod.Labels[svcNameLabel]
if serviceName == "" {
return pod, nil
}
serviceNamespace := pod.Labels[svcNamespaceLabel]
if serviceNamespace == "" {
return pod, nil
}
if pod.Status.PodIP == "" {
return pod, nil
}
k.workqueue.Add(serviceNamespace + "/" + serviceName)
return pod, nil
}
// onChangeNode handles changes to Nodes. We need to handle this as we may need to kick the DaemonSet
// to add or remove pods from nodes if labels have changed.
func (k *k3s) onChangeNode(key string, node *core.Node) (*core.Node, error) {
if node == nil {
return nil, nil
}
if _, ok := node.Labels[daemonsetNodeLabel]; !ok {
return node, nil
}
if err := k.updateDaemonSets(); err != nil {
return node, err
}
return node, nil
}
// runWorker dequeues Service changes from the work queue
// We run a lightweight work queue to handle service updates. We don't need the full overhead
// of a wrangler service controller and shared informer cache, but we do want to run changes
// through a keyed queue to reduce thrashing when pods are updated. Much of this is cribbed from
// https://github.com/rancher/lasso/blob/release/v2.5/pkg/controller/controller.go#L173-L215
func (k *k3s) runWorker() {
for k.processNextWorkItem() {
}
}
// processNextWorkItem does work for a single item in the queue,
// returning a boolean that indicates if the queue should continue
// to be serviced.
func (k *k3s) processNextWorkItem() bool {
obj, shutdown := k.workqueue.Get()
if shutdown {
return false
}
if err := k.processSingleItem(obj); err != nil && !apierrors.IsConflict(err) {
logrus.Errorf("%s: %v", controllerName, err)
}
return true
}
// processSingleItem processes a single item from the work queue,
// requeueing it if the handler fails.
func (k *k3s) processSingleItem(obj interface{}) error {
var (
key string
ok bool
)
defer k.workqueue.Done(obj)
if key, ok = obj.(string); !ok {
logrus.Errorf("expected string in workqueue but got %#v", obj)
k.workqueue.Forget(obj)
return nil
}
keyParts := strings.SplitN(key, "/", 2)
if err := k.updateStatus(keyParts[0], keyParts[1]); err != nil {
k.workqueue.AddRateLimited(key)
return fmt.Errorf("error updating LoadBalancer Status for %s: %v, requeueing", key, err)
}
k.workqueue.Forget(obj)
return nil
}
// updateServiceStatus updates the load balancer status for the matching service, if it exists and is a
// LoadBalancer service. The patchStatus function handles checking to see if status needs updating.
func (k *k3s) updateStatus(namespace, name string) error {
svc, err := k.client.CoreV1().Services(namespace).Get(context.TODO(), name, meta.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
if svc.Spec.Type != core.ServiceTypeLoadBalancer {
return nil
}
previousStatus := svc.Status.LoadBalancer.DeepCopy()
newStatus, err := k.getStatus(svc)
if err != nil {
return err
}
return k.patchStatus(svc, previousStatus, newStatus)
}
// getDaemonSet returns the DaemonSet that should exist for the Service.
func (k *k3s) getDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
return k.daemonsetCache.Get(k.LBNamespace, generateName(svc))
}
// getStatus returns a LoadBalancerStatus listing ingress IPs for all ready pods
// matching the selected service.
func (k *k3s) getStatus(svc *core.Service) (*core.LoadBalancerStatus, error) {
pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(map[string]string{
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
}))
if err != nil {
return nil, err
}
expectedIPs, err := k.podIPs(pods, svc)
if err != nil {
return nil, err
}
sort.Strings(expectedIPs)
loadbalancer := &core.LoadBalancerStatus{}
for _, ip := range expectedIPs {
loadbalancer.Ingress = append(loadbalancer.Ingress, core.LoadBalancerIngress{
IP: ip,
})
}
return loadbalancer, nil
}
// patchStatus patches the service status. If the status has not changed, this function is a no-op.
func (k *k3s) patchStatus(svc *core.Service, previousStatus, newStatus *core.LoadBalancerStatus) error {
if servicehelper.LoadBalancerStatusEqual(previousStatus, newStatus) {
return nil
}
updated := svc.DeepCopy()
updated.Status.LoadBalancer = *newStatus
_, err := servicehelper.PatchService(k.client.CoreV1(), svc, updated)
if err == nil {
if len(newStatus.Ingress) == 0 {
k.recorder.Event(svc, core.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
} else {
k.recorder.Eventf(svc, core.EventTypeNormal, "UpdatedLoadBalancer", "Updated LoadBalancer with new IPs: %v -> %v", ingressToString(previousStatus.Ingress), ingressToString(newStatus.Ingress))
}
}
return err
}
// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods.
// If at least one node has External IPs available, only external IPs are returned.
// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned.
func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
// Go doesn't have sets so we stuff things into a map of bools and then get lists of keys
// to determine the unique set of IPs in use by pods.
extIPs := map[string]bool{}
intIPs := map[string]bool{}
for _, pod := range pods {
if pod.Spec.NodeName == "" || pod.Status.PodIP == "" {
continue
}
if !Ready.IsTrue(pod) {
continue
}
node, err := k.nodeCache.Get(pod.Spec.NodeName)
if apierrors.IsNotFound(err) {
continue
} else if err != nil {
return nil, err
}
for _, addr := range node.Status.Addresses {
if addr.Type == core.NodeExternalIP {
extIPs[addr.Address] = true
} else if addr.Type == core.NodeInternalIP {
intIPs[addr.Address] = true
}
}
}
keys := func(addrs map[string]bool) (ips []string) {
for k := range addrs {
ips = append(ips, k)
}
return ips
}
var ips []string
if len(extIPs) > 0 {
ips = keys(extIPs)
} else {
ips = keys(intIPs)
}
ips, err := filterByIPFamily(ips, svc)
if err != nil {
return nil, err
}
if len(ips) > 0 && k.Rootless {
return []string{"127.0.0.1"}, nil
}
return ips, nil
}
// filterByIPFamily filters ips based on dual-stack parameters of the service
func filterByIPFamily(ips []string, svc *core.Service) ([]string, error) {
var ipFamilyPolicy core.IPFamilyPolicyType
var ipv4Addresses []string
var ipv6Addresses []string
for _, ip := range ips {
if utilsnet.IsIPv4String(ip) {
ipv4Addresses = append(ipv4Addresses, ip)
}
if utilsnet.IsIPv6String(ip) {
ipv6Addresses = append(ipv6Addresses, ip)
}
}
if svc.Spec.IPFamilyPolicy != nil {
ipFamilyPolicy = *svc.Spec.IPFamilyPolicy
}
switch ipFamilyPolicy {
case core.IPFamilyPolicySingleStack:
if svc.Spec.IPFamilies[0] == core.IPv4Protocol {
return ipv4Addresses, nil
}
if svc.Spec.IPFamilies[0] == core.IPv6Protocol {
return ipv6Addresses, nil
}
case core.IPFamilyPolicyPreferDualStack:
if svc.Spec.IPFamilies[0] == core.IPv4Protocol {
ipAddresses := append(ipv4Addresses, ipv6Addresses...)
return ipAddresses, nil
}
if svc.Spec.IPFamilies[0] == core.IPv6Protocol {
ipAddresses := append(ipv6Addresses, ipv4Addresses...)
return ipAddresses, nil
}
case core.IPFamilyPolicyRequireDualStack:
if (len(ipv4Addresses) == 0) || (len(ipv6Addresses) == 0) {
return nil, errors.New("one or more IP families did not have addresses available for service with ipFamilyPolicy=RequireDualStack")
}
if svc.Spec.IPFamilies[0] == core.IPv4Protocol {
ipAddresses := append(ipv4Addresses, ipv6Addresses...)
return ipAddresses, nil
}
if svc.Spec.IPFamilies[0] == core.IPv6Protocol {
ipAddresses := append(ipv6Addresses, ipv4Addresses...)
return ipAddresses, nil
}
}
return nil, errors.New("unhandled ipFamilyPolicy")
}
// deployDaemonSet ensures that there is a DaemonSet for the service.
func (k *k3s) deployDaemonSet(ctx context.Context, svc *core.Service) error {
ds, err := k.newDaemonSet(svc)
if err != nil {
return err
}
defer k.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applied LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name)
return k.processor.WithContext(ctx).WithOwner(svc).Apply(objectset.NewObjectSet(ds))
}
// deleteDaemonSet ensures that there are no DaemonSets for the given service.
func (k *k3s) deleteDaemonSet(ctx context.Context, svc *core.Service) error {
name := generateName(svc)
if err := k.client.AppsV1().DaemonSets(k.LBNamespace).Delete(ctx, name, meta.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
defer k.recorder.Eventf(svc, core.EventTypeNormal, "DeletedDaemonSet", "Deleted LoadBalancer DaemonSet %s/%s", k.LBNamespace, name)
return nil
}
// newDaemonSet creates a DaemonSet to ensure that ServiceLB pods are run on
// each eligible node.
func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
name := generateName(svc)
oneInt := intstr.FromInt(1)
sourceRanges, err := servicehelper.GetLoadBalancerSourceRanges(svc)
if err != nil {
return nil, err
}
ds := &apps.DaemonSet{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: k.LBNamespace,
Labels: map[string]string{
nodeSelectorLabel: "false",
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
},
},
TypeMeta: meta.TypeMeta{
Kind: "DaemonSet",
APIVersion: "apps/v1",
},
Spec: apps.DaemonSetSpec{
Selector: &meta.LabelSelector{
MatchLabels: map[string]string{
"app": name,
},
},
Template: core.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: map[string]string{
"app": name,
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
},
},
Spec: core.PodSpec{
AutomountServiceAccountToken: utilpointer.Bool(false),
},
},
UpdateStrategy: apps.DaemonSetUpdateStrategy{
Type: apps.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &apps.RollingUpdateDaemonSet{
MaxUnavailable: &oneInt,
},
},
},
}
var sysctls []core.Sysctl
for _, ipFamily := range svc.Spec.IPFamilies {
switch ipFamily {
case core.IPv4Protocol:
sysctls = append(sysctls, core.Sysctl{Name: "net.ipv4.ip_forward", Value: "1"})
case core.IPv6Protocol:
sysctls = append(sysctls, core.Sysctl{Name: "net.ipv6.conf.all.forwarding", Value: "1"})
}
}
ds.Spec.Template.Spec.SecurityContext = &core.PodSecurityContext{Sysctls: sysctls}
for _, port := range svc.Spec.Ports {
portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port)
container := core.Container{
Name: portName,
Image: k.LBImage,
ImagePullPolicy: core.PullIfNotPresent,
Ports: []core.ContainerPort{
{
Name: portName,
ContainerPort: port.Port,
HostPort: port.Port,
Protocol: port.Protocol,
},
},
Env: []core.EnvVar{
{
Name: "SRC_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "SRC_RANGES",
Value: strings.Join(sourceRanges.StringSlice(), " "),
},
{
Name: "DEST_PROTO",
Value: string(port.Protocol),
},
{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "DEST_IPS",
Value: strings.Join(svc.Spec.ClusterIPs, " "),
},
},
SecurityContext: &core.SecurityContext{
Capabilities: &core.Capabilities{
Add: []core.Capability{
"NET_ADMIN",
},
},
},
}
ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
}
// Add toleration to noderole.kubernetes.io/master=*:NoSchedule
masterToleration := core.Toleration{
Key: "node-role.kubernetes.io/master",
Operator: "Exists",
Effect: "NoSchedule",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration)
// Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule
controlPlaneToleration := core.Toleration{
Key: "node-role.kubernetes.io/control-plane",
Operator: "Exists",
Effect: "NoSchedule",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration)
// Add toleration to CriticalAddonsOnly
criticalAddonsOnlyToleration := core.Toleration{
Key: "CriticalAddonsOnly",
Operator: "Exists",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration)
// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
enableNodeSelector, err := k.nodeHasDaemonSetLabel()
if err != nil {
return nil, err
}
if enableNodeSelector {
ds.Spec.Template.Spec.NodeSelector = map[string]string{
daemonsetNodeLabel: "true",
}
// Add node selector for "svccontroller.k3s.cattle.io/lbpool=<pool>" if service has lbpool label
if svc.Labels[daemonsetNodePoolLabel] != "" {
ds.Spec.Template.Spec.NodeSelector[daemonsetNodePoolLabel] = svc.Labels[daemonsetNodePoolLabel]
}
ds.Labels[nodeSelectorLabel] = "true"
}
return ds, nil
}
// updateDaemonSets ensures that our DaemonSets have a NodeSelector present if one is enabled,
// and do not have one if it is not. Nodes are checked for this label when the DaemonSet is generated,
// but node labels may change between Service updates and the NodeSelector needs to be updated appropriately.
func (k *k3s) updateDaemonSets() error {
enableNodeSelector, err := k.nodeHasDaemonSetLabel()
if err != nil {
return err
}
nodeSelector := labels.SelectorFromSet(map[string]string{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)})
daemonsets, err := k.daemonsetCache.List(k.LBNamespace, nodeSelector)
if err != nil {
return err
}
for _, ds := range daemonsets {
ds.Labels[nodeSelectorLabel] = fmt.Sprintf("%t", enableNodeSelector)
ds.Spec.Template.Spec.NodeSelector = map[string]string{}
if enableNodeSelector {
ds.Spec.Template.Spec.NodeSelector[daemonsetNodeLabel] = "true"
}
if _, err := k.client.AppsV1().DaemonSets(ds.Namespace).Update(context.TODO(), ds, meta.UpdateOptions{}); err != nil {
return err
}
}
return nil
}
// nodeHasDaemonSetLabel returns true if any node is labeled for inclusion or exclusion
// from use by ServiceLB. If any node is labeled, only nodes with a label value of "true"
// will be used.
func (k *k3s) nodeHasDaemonSetLabel() (bool, error) {
selector, err := labels.Parse(daemonsetNodeLabel)
if err != nil {
return false, err
}
nodesWithLabel, err := k.nodeCache.List(selector)
return len(nodesWithLabel) > 0, err
}
// deleteAllDaemonsets deletes all daemonsets created by this controller
func (k *k3s) deleteAllDaemonsets(ctx context.Context) error {
return k.client.AppsV1().DaemonSets(k.LBNamespace).DeleteCollection(ctx, meta.DeleteOptions{}, meta.ListOptions{LabelSelector: nodeSelectorLabel})
}
// removeServiceFinalizers ensures that there are no finalizers left on any services.
// Previous implementations of the servicelb controller manually added finalizers to services it managed;
// these need to be removed in order to release ownership to the cloud provider implementation.
func (k *k3s) removeServiceFinalizers(ctx context.Context) error {
services, err := k.client.CoreV1().Services(meta.NamespaceAll).List(ctx, meta.ListOptions{})
if err != nil {
return err
}
var errs merr.Errors
for _, svc := range services.Items {
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
s, err := k.removeFinalizer(ctx, &svc)
svc = *s
return err
}); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errs
}
return nil
}
// removeFinalizer ensures that there is not a finalizer for this controller on the Service
func (k *k3s) removeFinalizer(ctx context.Context, svc *core.Service) (*core.Service, error) {
var found bool
for k, v := range svc.Finalizers {
if v != finalizerName {
continue
}
found = true
svc.Finalizers = append(svc.Finalizers[:k], svc.Finalizers[k+1:]...)
}
if found {
return k.client.CoreV1().Services(svc.Namespace).Update(ctx, svc, meta.UpdateOptions{})
}
return svc, nil
}
// generateName generates a distinct name for the DaemonSet based on the service name and UID
func generateName(svc *core.Service) string {
return fmt.Sprintf("svclb-%s-%s", svc.Name, svc.UID[:8])
}
// ingressToString converts a list of LoadBalancerIngress entries to strings
func ingressToString(ingresses []core.LoadBalancerIngress) []string {
parts := make([]string, len(ingresses))
for i, ingress := range ingresses {
if ingress.IP != "" {
parts[i] = ingress.IP
} else {
parts[i] = ingress.Hostname
}
}
return parts
}

View File

@ -173,8 +173,8 @@ func kubeletArgs(cfg *config.Agent) map[string]string {
argsMap["protect-kernel-defaults"] = "true"
}
if !cfg.DisableServiceLB && cfg.EnableIPv6 {
argsMap["allowed-unsafe-sysctls"] = "net.ipv6.conf.all.forwarding"
if !cfg.DisableServiceLB {
argsMap["allowed-unsafe-sysctls"] = "net.ipv4.ip_forward,net.ipv6.conf.all.forwarding"
}
return argsMap

View File

@ -309,6 +309,7 @@ type ControlRuntime struct {
Authenticator authenticator.Request
EgressSelectorConfig string
CloudControllerConfig string
ClientAuthProxyCert string
ClientAuthProxyKey string

View File

@ -19,6 +19,7 @@ import (
"time"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/cloudprovider"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/passwd"
"github.com/k3s-io/k3s/pkg/token"
@ -138,6 +139,7 @@ func CreateRuntimeCertFiles(config *config.Control) {
runtime.ServingKubeletKey = filepath.Join(config.DataDir, "tls", "serving-kubelet.key")
runtime.EgressSelectorConfig = filepath.Join(config.DataDir, "etc", "egress-selector-config.yaml")
runtime.CloudControllerConfig = filepath.Join(config.DataDir, "etc", "cloud-config.yaml")
runtime.ClientAuthProxyCert = filepath.Join(config.DataDir, "tls", "client-auth-proxy.crt")
runtime.ClientAuthProxyKey = filepath.Join(config.DataDir, "tls", "client-auth-proxy.key")
@ -187,6 +189,10 @@ func GenServerDeps(config *config.Control) error {
return err
}
if err := genCloudConfig(config); err != nil {
return err
}
return readTokens(runtime)
}
@ -764,3 +770,21 @@ func genEgressSelectorConfig(controlConfig *config.Control) error {
}
return ioutil.WriteFile(controlConfig.Runtime.EgressSelectorConfig, b, 0600)
}
func genCloudConfig(controlConfig *config.Control) error {
cloudConfig := cloudprovider.Config{
LBEnabled: !controlConfig.DisableServiceLB,
LBNamespace: controlConfig.ServiceLBNamespace,
LBImage: cloudprovider.DefaultLBImage,
Rootless: controlConfig.Rootless,
}
if controlConfig.SystemDefaultRegistry != "" {
cloudConfig.LBImage = controlConfig.SystemDefaultRegistry + "/" + cloudConfig.LBImage
}
b, err := json.Marshal(cloudConfig)
if err != nil {
return err
}
return ioutil.WriteFile(controlConfig.Runtime.CloudControllerConfig, b, 0600)
}

View File

@ -302,6 +302,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
"profiling": "false",
"allocate-node-cidrs": "true",
"cloud-provider": version.Program,
"cloud-config": runtime.CloudControllerConfig,
"cluster-cidr": util.JoinIPNets(cfg.ClusterIPRanges),
"configure-cloud-routes": "false",
"kubeconfig": runtime.KubeConfigCloudController,
@ -371,9 +372,9 @@ func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.Cont
User: version.Program + "-cloud-controller-manager",
ResourceAttributes: &authorizationv1.ResourceAttributes{
Namespace: metav1.NamespaceSystem,
Verb: "get",
Resource: "configmaps",
Name: "extension-apiserver-authentication",
Verb: "*",
Resource: "daemonsets",
Group: "apps",
},
},
}

View File

@ -18,7 +18,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/authenticator"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@ -164,27 +163,17 @@ func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan
}
cloudInitializer := func(config *cloudcontrollerconfig.CompletedConfig) cloudprovider.Interface {
cloud, err := ccm.InitCloudProvider(version.Program, "")
cloud, err := ccm.InitCloudProvider(version.Program, config.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile)
if err != nil {
logrus.Fatalf("Cloud provider could not be initialized: %v", err)
}
if cloud == nil {
logrus.Fatalf("Cloud provider is nil")
}
cloud.Initialize(config.ClientBuilder, make(chan struct{}))
if informerUserCloud, ok := cloud.(ccm.InformerUser); ok {
informerUserCloud.SetInformers(config.SharedInformers)
}
return cloud
}
controllerInitializers := ccmapp.DefaultInitFuncConstructors
delete(controllerInitializers, "service")
delete(controllerInitializers, "route")
command := ccmapp.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, cliflag.NamedFlagSets{}, wait.NeverStop)
command := ccmapp.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, ccmapp.DefaultInitFuncConstructors, cliflag.NamedFlagSets{}, ctx.Done())
command.SetArgs(args)
go func() {

View File

@ -91,7 +91,7 @@ func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var _ccmYaml = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x94\x41\x8f\x13\x31\x0c\x85\xef\xf9\x15\xd1\x5e\x56\x42\x4a\x57\x88\x0b\x9a\x23\x1c\xb8\xaf\x04\x77\x37\x79\x74\x43\x33\x71\x14\x3b\xb3\xc0\xaf\x47\xe9\x2c\x68\x99\xa1\x55\x5b\x40\x70\x8b\x2a\xfb\x7b\xcf\xcf\xf5\x50\x89\x1f\x50\x25\x72\x1e\x6c\xdd\x92\xdf\x50\xd3\x07\xae\xf1\x2b\x69\xe4\xbc\xd9\xbf\x96\x4d\xe4\xbb\xe9\xa5\xd9\xc7\x1c\x06\xfb\x36\x35\x51\xd4\x7b\x4e\x30\x23\x94\x02\x29\x0d\xc6\xda\x4c\x23\x06\xbb\x7f\x25\xce\x27\x6e\xc1\x79\xce\x5a\x39\x25\x54\x37\x52\xa6\x1d\xaa\xa9\x2d\x41\x06\xe3\x2c\x95\xf8\xae\x72\x2b\xd2\x1b\x9d\xf5\xcc\x35\xc4\xfc\x5c\xcf\x58\x5b\x21\xdc\xaa\xc7\x53\x51\x02\x09\xc4\x58\x3b\xa1\x6e\x9f\x7e\xdb\x41\x67\x40\x05\x29\x0e\xcf\x56\x42\x7f\xae\x34\x6e\x6e\xd6\x48\x4c\xc8\xba\x40\x3e\x43\x15\x52\xff\x70\x31\x34\x73\x58\xda\xbc\x7d\x71\x7b\x41\xef\x9d\x28\x69\x5b\x20\x66\x2f\x67\x41\x04\x75\x8a\x7e\xe9\x21\x45\xd1\x5f\x4f\xd5\x9f\x8f\x17\xe3\xc9\x7b\x6e\xc7\xd2\x3b\x0b\x54\xfa\x9f\x4e\x14\x59\x27\x4e\x6d\x3c\xb6\xdb\x1f\xc6\xaf\xb3\x8b\x1c\x0a\xc7\x53\x6b\x5e\x09\x3d\xae\xf6\xee\x9c\xb9\xfe\x4a\xde\xc4\x1c\x62\xde\x5d\x7c\x2c\x9c\x70\x8f\x8f\xbd\xfa\xfb\x98\x27\x94\x8d\xb5\xeb\xf3\x3c\x4b\x47\xda\xf6\x13\xbc\x1e\xee\x72\x46\xbc\x17\xd4\xf3\x7a\xe7\x22\x29\xe4\x7b\x65\xdb\xc2\xc9\x17\x51\x8c\xff\x24\x31\xd7\xf9\x2e\x20\x61\x47\xca\x7f\x34\xc0\x79\xaa\x61\x21\xf0\xbf\x24\xf7\x9b\x91\x21\x6b\xf4\x07\xb2\xab\xa0\x70\xca\xdc\x95\x91\xfe\x94\x25\x3e\x2b\x72\x9f\xcd\x51\x89\xfd\x63\x72\xd4\xc6\x5f\xc9\xf7\x5b\x00\x00\x00\xff\xff\xc2\xa7\x17\xb8\xee\x06\x00\x00")
var _ccmYaml = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x94\x4f\x8f\x13\x31\x0c\xc5\xef\xf9\x14\x51\x8f\x48\xe9\x0a\x71\x41\x73\x84\x03\xf7\x95\xe0\xee\x26\x8f\x6e\x68\x26\x8e\x62\xa7\xfc\xf9\xf4\x68\x66\x76\xc5\x30\xa3\xae\x3a\x05\xc4\xde\x2c\x2b\xfe\xf9\xf9\x39\x09\x95\xf8\x09\x55\x22\xe7\xce\xd6\x03\xf9\x3d\x35\x7d\xe0\x1a\x7f\x90\x46\xce\xfb\xd3\x5b\xd9\x47\xbe\x3b\xbf\x36\xa7\x98\x43\x67\xdf\xa7\x26\x8a\x7a\xcf\x09\xa6\x87\x52\x20\xa5\xce\x58\x9b\xa9\x47\x67\x4f\x6f\xc4\xf9\xc4\x2d\x38\xcf\x59\x2b\xa7\x84\xea\x7a\xca\x74\x44\x35\xb5\x25\x48\x67\x9c\xa5\x12\x3f\x54\x6e\x45\x86\x42\x67\x3d\x73\x0d\x31\xcf\xfb\x19\x6b\x2b\x84\x5b\xf5\x78\x3c\x94\x40\x02\x31\xd6\x9e\x51\x0f\x8f\xb9\x23\x74\x02\x54\x90\x62\x0c\x5b\x09\x43\xb8\xea\xb1\xdb\xad\x91\x38\x23\xeb\x02\x39\x43\x15\x52\xff\xb0\x19\x9a\x39\x2c\x65\xee\x5e\xed\x36\xd4\xde\x89\x92\x36\x19\x13\x82\x7a\x8e\x7e\x9e\x9b\x61\x27\x7d\x57\x81\x9f\x38\x53\x1d\x87\x0b\x3e\xa6\x28\x53\xf0\x75\x33\x9a\xbc\xe7\x76\xc9\xcd\xeb\x86\xa7\x1e\x52\xc8\x2f\xdd\x9b\x6d\x64\xd0\xb9\x62\x51\x29\xb2\xa6\x05\x42\xcf\x59\xb0\x54\x34\xee\xc2\x39\x73\xfb\xad\x7f\x17\x73\x88\xf9\xb8\xf9\xf2\x73\xc2\x3d\x3e\x0f\xa7\x9f\x06\x78\xa6\xb3\xb1\x76\xfd\xdc\xae\xea\x23\xed\xf0\x05\x5e\xc7\x77\x36\x21\x3e\x0a\xea\x75\xb5\xf6\xd7\x12\x3a\x7b\x6a\x07\x38\xf9\x2e\x8a\xfe\xbf\x38\xe6\x06\xbe\x0b\x48\x38\x92\xf2\x5f\x35\x70\x9a\xaa\x5b\x34\x78\x29\xce\xfd\xa1\x65\xc8\x1a\xfd\x48\x76\x15\x14\x9e\x13\x77\xa3\xa5\xbf\x79\x89\x6f\x8a\x3c\xcc\xe6\xa8\xc4\xe1\x33\xb8\x28\xe3\x9f\xf8\xfb\x33\x00\x00\xff\xff\x4c\x59\x10\xd5\xbe\x06\x00\x00")
func ccmYamlBytes() ([]byte, error) {
return bindataRead(

View File

@ -23,7 +23,6 @@ import (
"github.com/k3s-io/k3s/pkg/nodepassword"
"github.com/k3s-io/k3s/pkg/rootlessports"
"github.com/k3s-io/k3s/pkg/secretsencrypt"
"github.com/k3s-io/k3s/pkg/servicelb"
"github.com/k3s-io/k3s/pkg/static"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
@ -187,10 +186,9 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error {
return err
}
// apply SystemDefaultRegistry setting to Helm and ServiceLB before starting controllers
// apply SystemDefaultRegistry setting to Helm before starting controllers
if config.ControlConfig.SystemDefaultRegistry != "" {
helm.DefaultJobImage = config.ControlConfig.SystemDefaultRegistry + "/" + helm.DefaultJobImage
servicelb.DefaultLBImage = config.ControlConfig.SystemDefaultRegistry + "/" + servicelb.DefaultLBImage
}
if !config.ControlConfig.DisableHelmController {
@ -205,21 +203,6 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error {
sc.Core.Core().V1().ConfigMap())
}
if err := servicelb.Register(ctx,
sc.K8s,
sc.Apply,
sc.Apps.Apps().V1().DaemonSet(),
sc.Apps.Apps().V1().Deployment(),
sc.Core.Core().V1().Node(),
sc.Core.Core().V1().Pod(),
sc.Core.Core().V1().Service(),
sc.Core.Core().V1().Endpoints(),
config.ControlConfig.ServiceLBNamespace,
!config.ControlConfig.DisableServiceLB,
config.ControlConfig.Rootless); err != nil {
return err
}
if config.ControlConfig.EncryptSecrets {
if err := secretsencrypt.Register(ctx,
sc.K8s,

View File

@ -1,626 +0,0 @@
package servicelb
import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
util "github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
appclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/rancher/wrangler/pkg/relatedresource"
"github.com/rancher/wrangler/pkg/slice"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
v1getter "k8s.io/client-go/kubernetes/typed/apps/v1"
coregetter "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
utilsnet "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer"
)
var (
finalizerName = "svccontroller." + version.Program + ".cattle.io/daemonset"
svcNameLabel = "svccontroller." + version.Program + ".cattle.io/svcname"
svcNamespaceLabel = "svccontroller." + version.Program + ".cattle.io/svcnamespace"
daemonsetNodeLabel = "svccontroller." + version.Program + ".cattle.io/enablelb"
daemonsetNodePoolLabel = "svccontroller." + version.Program + ".cattle.io/lbpool"
nodeSelectorLabel = "svccontroller." + version.Program + ".cattle.io/nodeselector"
DefaultLBImage = "rancher/klipper-lb:v0.3.5"
)
const (
Ready = condition.Cond("Ready")
ControllerName = "svccontroller"
)
func Register(ctx context.Context,
kubernetes kubernetes.Interface,
apply apply.Apply,
daemonSetController appclient.DaemonSetController,
deployments appclient.DeploymentController,
nodes coreclient.NodeController,
pods coreclient.PodController,
services coreclient.ServiceController,
endpoints coreclient.EndpointsController,
klipperLBNamespace string,
enabled, rootless bool) error {
h := &handler{
rootless: rootless,
enabled: enabled,
klipperLBNamespace: klipperLBNamespace,
nodeCache: nodes.Cache(),
podCache: pods.Cache(),
deploymentCache: deployments.Cache(),
processor: apply.WithSetID(ControllerName).WithCacheTypes(daemonSetController),
serviceCache: services.Cache(),
services: kubernetes.CoreV1(),
daemonsets: kubernetes.AppsV1(),
deployments: kubernetes.AppsV1(),
recorder: util.BuildControllerEventRecorder(kubernetes, ControllerName, meta.NamespaceAll),
}
services.OnChange(ctx, ControllerName, h.onChangeService)
nodes.OnChange(ctx, ControllerName, h.onChangeNode)
relatedresource.Watch(ctx, ControllerName+"-watcher",
h.onResourceChange,
services,
pods,
endpoints)
if enabled {
if err := createServiceLBNamespace(ctx, h.klipperLBNamespace, kubernetes); err != nil {
return err
}
}
return nil
}
type handler struct {
rootless bool
klipperLBNamespace string
enabled bool
nodeCache coreclient.NodeCache
podCache coreclient.PodCache
deploymentCache appclient.DeploymentCache
processor apply.Apply
serviceCache coreclient.ServiceCache
services coregetter.ServicesGetter
daemonsets v1getter.DaemonSetsGetter
deployments v1getter.DeploymentsGetter
recorder record.EventRecorder
}
func createServiceLBNamespace(ctx context.Context, ns string, k8s kubernetes.Interface) error {
_, err := k8s.CoreV1().Namespaces().Get(ctx, ns, meta.GetOptions{})
if apierrors.IsNotFound(err) {
_, err := k8s.CoreV1().Namespaces().Create(ctx, &core.Namespace{
ObjectMeta: meta.ObjectMeta{
Name: ns,
},
}, meta.CreateOptions{})
return err
}
return err
}
func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) ([]relatedresource.Key, error) {
if ep, ok := obj.(*core.Endpoints); ok {
return []relatedresource.Key{
{
Name: ep.Name,
Namespace: ep.Namespace,
},
}, nil
}
pod, ok := obj.(*core.Pod)
if !ok {
return nil, nil
}
serviceName := pod.Labels[svcNameLabel]
if serviceName == "" {
return nil, nil
}
serviceNamespace := pod.Labels[svcNamespaceLabel]
if serviceNamespace == "" {
return nil, nil
}
if pod.Status.PodIP == "" {
return nil, nil
}
return []relatedresource.Key{
{
Name: serviceName,
Namespace: serviceNamespace,
},
}, nil
}
// onChangeService handles changes to Services.
func (h *handler) onChangeService(key string, svc *core.Service) (*core.Service, error) {
if svc == nil {
return nil, nil
}
if err := h.deployPod(svc); err != nil {
return svc, err
}
// Don't return service because we don't want another update
_, err := h.updateService(svc)
return nil, err
}
// onChangeNode handles changes to Nodes. We need to handle this as we may need to kick the DaemonSet
// to add or remove pods from nodes if labels have changed.
func (h *handler) onChangeNode(key string, node *core.Node) (*core.Node, error) {
if node == nil {
return nil, nil
}
if _, ok := node.Labels[daemonsetNodeLabel]; !ok {
return node, nil
}
if err := h.updateDaemonSets(); err != nil {
return node, err
}
return node, nil
}
// updateService ensures that the Service ingress IP address list is in sync
// with the Nodes actually running pods for this service.
func (h *handler) updateService(svc *core.Service) (runtime.Object, error) {
if !h.enabled || svc.DeletionTimestamp != nil || svc.Spec.Type != core.ServiceTypeLoadBalancer {
return h.removeFinalizer(svc)
}
pods, err := h.podCache.List(h.klipperLBNamespace, labels.SelectorFromSet(map[string]string{
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
}))
if err != nil {
return svc, err
}
existingIPs := serviceIPs(svc)
expectedIPs, err := h.podIPs(pods, svc)
if err != nil {
return svc, err
}
sort.Strings(expectedIPs)
sort.Strings(existingIPs)
if slice.StringsEqual(expectedIPs, existingIPs) {
return svc, nil
}
svc = svc.DeepCopy()
svc, err = h.addFinalizer(svc)
if err != nil {
return svc, err
}
svc.Status.LoadBalancer.Ingress = nil
for _, ip := range expectedIPs {
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, core.LoadBalancerIngress{
IP: ip,
})
}
defer h.recorder.Eventf(svc, core.EventTypeNormal, "UpdatedIngressIP", "LoadBalancer Ingress IP addresses updated: %s", strings.Join(expectedIPs, ", "))
return h.services.Services(svc.Namespace).UpdateStatus(context.TODO(), svc, meta.UpdateOptions{})
}
// serviceIPs returns the list of ingress IP addresses from the Service
func serviceIPs(svc *core.Service) []string {
var ips []string
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP != "" {
ips = append(ips, ingress.IP)
}
}
return ips
}
// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods.
// If at least one node has External IPs available, only external IPs are returned.
// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned.
func (h *handler) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
// Go doesn't have sets so we stuff things into a map of bools and then get lists of keys
// to determine the unique set of IPs in use by pods.
extIPs := map[string]bool{}
intIPs := map[string]bool{}
for _, pod := range pods {
if pod.Spec.NodeName == "" || pod.Status.PodIP == "" {
continue
}
if !Ready.IsTrue(pod) {
continue
}
node, err := h.nodeCache.Get(pod.Spec.NodeName)
if apierrors.IsNotFound(err) {
continue
} else if err != nil {
return nil, err
}
for _, addr := range node.Status.Addresses {
if addr.Type == core.NodeExternalIP {
extIPs[addr.Address] = true
} else if addr.Type == core.NodeInternalIP {
intIPs[addr.Address] = true
}
}
}
keys := func(addrs map[string]bool) (ips []string) {
for k := range addrs {
ips = append(ips, k)
}
return ips
}
var ips []string
if len(extIPs) > 0 {
ips = keys(extIPs)
} else {
ips = keys(intIPs)
}
ips, err := filterByIPFamily(ips, svc)
if err != nil {
return nil, err
}
if len(ips) > 0 && h.rootless {
return []string{"127.0.0.1"}, nil
}
return ips, nil
}
// filterByIPFamily filters ips based on dual-stack parameters of the service
func filterByIPFamily(ips []string, svc *core.Service) ([]string, error) {
var ipFamilyPolicy core.IPFamilyPolicyType
var ipv4Addresses []string
var ipv6Addresses []string
for _, ip := range ips {
if utilsnet.IsIPv4String(ip) {
ipv4Addresses = append(ipv4Addresses, ip)
}
if utilsnet.IsIPv6String(ip) {
ipv6Addresses = append(ipv6Addresses, ip)
}
}
if svc.Spec.IPFamilyPolicy != nil {
ipFamilyPolicy = *svc.Spec.IPFamilyPolicy
}
switch ipFamilyPolicy {
case core.IPFamilyPolicySingleStack:
if svc.Spec.IPFamilies[0] == core.IPv4Protocol {
return ipv4Addresses, nil
}
if svc.Spec.IPFamilies[0] == core.IPv6Protocol {
return ipv6Addresses, nil
}
case core.IPFamilyPolicyPreferDualStack:
if svc.Spec.IPFamilies[0] == core.IPv4Protocol {
ipAddresses := append(ipv4Addresses, ipv6Addresses...)
return ipAddresses, nil
}
if svc.Spec.IPFamilies[0] == core.IPv6Protocol {
ipAddresses := append(ipv6Addresses, ipv4Addresses...)
return ipAddresses, nil
}
case core.IPFamilyPolicyRequireDualStack:
if (len(ipv4Addresses) == 0) || (len(ipv6Addresses) == 0) {
return nil, errors.New("one or more IP families did not have addresses available for service with ipFamilyPolicy=RequireDualStack")
}
if svc.Spec.IPFamilies[0] == core.IPv4Protocol {
ipAddresses := append(ipv4Addresses, ipv6Addresses...)
return ipAddresses, nil
}
if svc.Spec.IPFamilies[0] == core.IPv6Protocol {
ipAddresses := append(ipv6Addresses, ipv4Addresses...)
return ipAddresses, nil
}
}
return nil, errors.New("unhandled ipFamilyPolicy")
}
// deployPod ensures that there is a DaemonSet for the service.
// It also ensures that any legacy Deployments from older versions of ServiceLB are deleted.
func (h *handler) deployPod(svc *core.Service) error {
if err := h.deleteOldDeployments(svc); err != nil {
return err
}
if !h.enabled || svc.DeletionTimestamp != nil || svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
return h.deletePod(svc)
}
ds, err := h.newDaemonSet(svc)
if err != nil {
return err
}
objs := objectset.NewObjectSet()
if ds != nil {
objs.Add(ds)
defer h.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applied LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name)
}
return h.processor.WithOwner(svc).Apply(objs)
}
// deletePod ensures that there are no DaemonSets for the given service.
func (h *handler) deletePod(svc *core.Service) error {
name := generateName(svc)
if err := h.daemonsets.DaemonSets(h.klipperLBNamespace).Delete(context.TODO(), name, meta.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
defer h.recorder.Eventf(svc, core.EventTypeNormal, "DeletedDaemonSet", "Deleted LoadBalancer DaemonSet %s/%s", h.klipperLBNamespace, name)
return nil
}
// newDaemonSet creates a DaemonSet to ensure that ServiceLB pods are run on
// each eligible node.
func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
name := generateName(svc)
oneInt := intstr.FromInt(1)
// If ipv6 is present, we must enable ipv6 forwarding in the manifest
var ipv6Switch bool
for _, ipFamily := range svc.Spec.IPFamilies {
if ipFamily == core.IPv6Protocol {
ipv6Switch = true
}
}
ds := &apps.DaemonSet{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: h.klipperLBNamespace,
Labels: map[string]string{
nodeSelectorLabel: "false",
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
},
},
TypeMeta: meta.TypeMeta{
Kind: "DaemonSet",
APIVersion: "apps/v1",
},
Spec: apps.DaemonSetSpec{
Selector: &meta.LabelSelector{
MatchLabels: map[string]string{
"app": name,
},
},
Template: core.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: map[string]string{
"app": name,
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
},
},
Spec: core.PodSpec{
AutomountServiceAccountToken: utilpointer.Bool(false),
},
},
UpdateStrategy: apps.DaemonSetUpdateStrategy{
Type: apps.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &apps.RollingUpdateDaemonSet{
MaxUnavailable: &oneInt,
},
},
},
}
if ipv6Switch {
// Add security context to enable ipv6 forwarding
securityContext := &core.PodSecurityContext{
Sysctls: []core.Sysctl{
{
Name: "net.ipv6.conf.all.forwarding",
Value: "1",
},
},
}
ds.Spec.Template.Spec.SecurityContext = securityContext
}
for _, port := range svc.Spec.Ports {
portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port)
container := core.Container{
Name: portName,
Image: DefaultLBImage,
ImagePullPolicy: core.PullIfNotPresent,
Ports: []core.ContainerPort{
{
Name: portName,
ContainerPort: port.Port,
HostPort: port.Port,
Protocol: port.Protocol,
},
},
Env: []core.EnvVar{
{
Name: "SRC_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "DEST_PROTO",
Value: string(port.Protocol),
},
{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "DEST_IPS",
Value: strings.Join(svc.Spec.ClusterIPs, " "),
},
},
SecurityContext: &core.SecurityContext{
Capabilities: &core.Capabilities{
Add: []core.Capability{
"NET_ADMIN",
},
},
},
}
ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
}
// Add toleration to noderole.kubernetes.io/master=*:NoSchedule
masterToleration := core.Toleration{
Key: "node-role.kubernetes.io/master",
Operator: "Exists",
Effect: "NoSchedule",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration)
// Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule
controlPlaneToleration := core.Toleration{
Key: "node-role.kubernetes.io/control-plane",
Operator: "Exists",
Effect: "NoSchedule",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration)
// Add toleration to CriticalAddonsOnly
criticalAddonsOnlyToleration := core.Toleration{
Key: "CriticalAddonsOnly",
Operator: "Exists",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration)
// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
selector, err := labels.Parse(daemonsetNodeLabel)
if err != nil {
return nil, err
}
nodesWithLabel, err := h.nodeCache.List(selector)
if err != nil {
return nil, err
}
if len(nodesWithLabel) > 0 {
ds.Spec.Template.Spec.NodeSelector = map[string]string{
daemonsetNodeLabel: "true",
}
// Add node selector for "svccontroller.k3s.cattle.io/lbpool=<pool>" if service has lbpool label
if svc.Labels[daemonsetNodePoolLabel] != "" {
ds.Spec.Template.Spec.NodeSelector[daemonsetNodePoolLabel] = svc.Labels[daemonsetNodePoolLabel]
}
ds.Labels[nodeSelectorLabel] = "true"
}
return ds, nil
}
func (h *handler) updateDaemonSets() error {
daemonsets, err := h.daemonsets.DaemonSets("").List(context.TODO(), meta.ListOptions{
LabelSelector: nodeSelectorLabel + "=false",
})
if err != nil {
return err
}
for _, ds := range daemonsets.Items {
ds.Spec.Template.Spec.NodeSelector = map[string]string{
daemonsetNodeLabel: "true",
}
ds.Labels[nodeSelectorLabel] = "true"
if _, err := h.daemonsets.DaemonSets(ds.Namespace).Update(context.TODO(), &ds, meta.UpdateOptions{}); err != nil {
return err
}
}
return nil
}
// deleteOldDeployments ensures that there are no legacy Deployments for ServiceLB pods.
// ServiceLB used to use Deployments before switching to DaemonSets in 875ba28
func (h *handler) deleteOldDeployments(svc *core.Service) error {
name := fmt.Sprintf("svclb-%s", svc.Name)
if _, err := h.deploymentCache.Get(svc.Namespace, name); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
return h.deployments.Deployments(svc.Namespace).Delete(context.TODO(), name, meta.DeleteOptions{})
}
// addFinalizer ensures that there is a finalizer for this controller on the Service
func (h *handler) addFinalizer(svc *core.Service) (*core.Service, error) {
if !h.hasFinalizer(svc) {
svc.Finalizers = append(svc.Finalizers, finalizerName)
return h.services.Services(svc.Namespace).Update(context.TODO(), svc, meta.UpdateOptions{})
}
return svc, nil
}
// removeFinalizer ensures that there is not a finalizer for this controller on the Service
func (h *handler) removeFinalizer(svc *core.Service) (*core.Service, error) {
if !h.hasFinalizer(svc) {
return svc, nil
}
for k, v := range svc.Finalizers {
if v != finalizerName {
continue
}
svc.Finalizers = append(svc.Finalizers[:k], svc.Finalizers[k+1:]...)
}
return h.services.Services(svc.Namespace).Update(context.TODO(), svc, meta.UpdateOptions{})
}
// hasFinalizer returns a boolean indicating whether or not there is a finalizer for this controller on the Service
func (h *handler) hasFinalizer(svc *core.Service) bool {
for _, finalizer := range svc.Finalizers {
if finalizer == finalizerName {
return true
}
}
return false
}
// generateName generates a distinct name for the DaemonSet based on the service name and UID
func generateName(svc *core.Service) string {
return fmt.Sprintf("svclb-%s-%s", svc.Name, svc.UID[:8])
}