k3s/pkg/servicelb/controller.go
Brad Davidson 167ed19d22 Fix deletion of svclb DaemonSet when Service is deleted
87e1806697 removed the OwnerReferences
field from the DaemonSet, which makes sense since the Service may now be
in a different namespace than the DaemonSet and cross-namespace owner
references are not supported.  Unfortunately, we were relying on
garbage collection to delete the DameonSet, so this started leaving
orphaned DaemonSets when Services were deleted.

We don't want to add an a Service OnRemove handler, since this will add
finalizers to all Services, not just LoadBalancers services, causing
conformance tests to fail. Instead, manage our own finalizers, and
restore the DaemonSet removal Event that was removed by the same commit.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
2022-07-12 12:27:55 -07:00

627 lines
18 KiB
Go

package servicelb
import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
util "github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
appclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/rancher/wrangler/pkg/relatedresource"
"github.com/rancher/wrangler/pkg/slice"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
v1getter "k8s.io/client-go/kubernetes/typed/apps/v1"
coregetter "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
utilsnet "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer"
)
var (
finalizerName = "svccontroller." + version.Program + ".cattle.io/daemonset"
svcNameLabel = "svccontroller." + version.Program + ".cattle.io/svcname"
svcNamespaceLabel = "svccontroller." + version.Program + ".cattle.io/svcnamespace"
daemonsetNodeLabel = "svccontroller." + version.Program + ".cattle.io/enablelb"
daemonsetNodePoolLabel = "svccontroller." + version.Program + ".cattle.io/lbpool"
nodeSelectorLabel = "svccontroller." + version.Program + ".cattle.io/nodeselector"
DefaultLBImage = "rancher/klipper-lb:v0.3.5"
)
const (
Ready = condition.Cond("Ready")
ControllerName = "svccontroller"
)
func Register(ctx context.Context,
kubernetes kubernetes.Interface,
apply apply.Apply,
daemonSetController appclient.DaemonSetController,
deployments appclient.DeploymentController,
nodes coreclient.NodeController,
pods coreclient.PodController,
services coreclient.ServiceController,
endpoints coreclient.EndpointsController,
klipperLBNamespace string,
enabled, rootless bool) error {
h := &handler{
rootless: rootless,
enabled: enabled,
klipperLBNamespace: klipperLBNamespace,
nodeCache: nodes.Cache(),
podCache: pods.Cache(),
deploymentCache: deployments.Cache(),
processor: apply.WithSetID(ControllerName).WithCacheTypes(daemonSetController),
serviceCache: services.Cache(),
services: kubernetes.CoreV1(),
daemonsets: kubernetes.AppsV1(),
deployments: kubernetes.AppsV1(),
recorder: util.BuildControllerEventRecorder(kubernetes, ControllerName, meta.NamespaceAll),
}
services.OnChange(ctx, ControllerName, h.onChangeService)
nodes.OnChange(ctx, ControllerName, h.onChangeNode)
relatedresource.Watch(ctx, ControllerName+"-watcher",
h.onResourceChange,
services,
pods,
endpoints)
if enabled {
if err := createServiceLBNamespace(ctx, h.klipperLBNamespace, kubernetes); err != nil {
return err
}
}
return nil
}
type handler struct {
rootless bool
klipperLBNamespace string
enabled bool
nodeCache coreclient.NodeCache
podCache coreclient.PodCache
deploymentCache appclient.DeploymentCache
processor apply.Apply
serviceCache coreclient.ServiceCache
services coregetter.ServicesGetter
daemonsets v1getter.DaemonSetsGetter
deployments v1getter.DeploymentsGetter
recorder record.EventRecorder
}
func createServiceLBNamespace(ctx context.Context, ns string, k8s kubernetes.Interface) error {
_, err := k8s.CoreV1().Namespaces().Get(ctx, ns, meta.GetOptions{})
if apierrors.IsNotFound(err) {
_, err := k8s.CoreV1().Namespaces().Create(ctx, &core.Namespace{
ObjectMeta: meta.ObjectMeta{
Name: ns,
},
}, meta.CreateOptions{})
return err
}
return err
}
func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) ([]relatedresource.Key, error) {
if ep, ok := obj.(*core.Endpoints); ok {
return []relatedresource.Key{
{
Name: ep.Name,
Namespace: ep.Namespace,
},
}, nil
}
pod, ok := obj.(*core.Pod)
if !ok {
return nil, nil
}
serviceName := pod.Labels[svcNameLabel]
if serviceName == "" {
return nil, nil
}
serviceNamespace := pod.Labels[svcNamespaceLabel]
if serviceNamespace == "" {
return nil, nil
}
if pod.Status.PodIP == "" {
return nil, nil
}
return []relatedresource.Key{
{
Name: serviceName,
Namespace: serviceNamespace,
},
}, nil
}
// onChangeService handles changes to Services.
func (h *handler) onChangeService(key string, svc *core.Service) (*core.Service, error) {
if svc == nil {
return nil, nil
}
if err := h.deployPod(svc); err != nil {
return svc, err
}
// Don't return service because we don't want another update
_, err := h.updateService(svc)
return nil, err
}
// onChangeNode handles changes to Nodes. We need to handle this as we may need to kick the DaemonSet
// to add or remove pods from nodes if labels have changed.
func (h *handler) onChangeNode(key string, node *core.Node) (*core.Node, error) {
if node == nil {
return nil, nil
}
if _, ok := node.Labels[daemonsetNodeLabel]; !ok {
return node, nil
}
if err := h.updateDaemonSets(); err != nil {
return node, err
}
return node, nil
}
// updateService ensures that the Service ingress IP address list is in sync
// with the Nodes actually running pods for this service.
func (h *handler) updateService(svc *core.Service) (runtime.Object, error) {
if !h.enabled || svc.DeletionTimestamp != nil || svc.Spec.Type != core.ServiceTypeLoadBalancer {
return h.removeFinalizer(svc)
}
pods, err := h.podCache.List(h.klipperLBNamespace, labels.SelectorFromSet(map[string]string{
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
}))
if err != nil {
return svc, err
}
existingIPs := serviceIPs(svc)
expectedIPs, err := h.podIPs(pods, svc)
if err != nil {
return svc, err
}
sort.Strings(expectedIPs)
sort.Strings(existingIPs)
if slice.StringsEqual(expectedIPs, existingIPs) {
return svc, nil
}
svc = svc.DeepCopy()
svc, err = h.addFinalizer(svc)
if err != nil {
return svc, err
}
svc.Status.LoadBalancer.Ingress = nil
for _, ip := range expectedIPs {
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, core.LoadBalancerIngress{
IP: ip,
})
}
defer h.recorder.Eventf(svc, core.EventTypeNormal, "UpdatedIngressIP", "LoadBalancer Ingress IP addresses updated: %s", strings.Join(expectedIPs, ", "))
return h.services.Services(svc.Namespace).UpdateStatus(context.TODO(), svc, meta.UpdateOptions{})
}
// serviceIPs returns the list of ingress IP addresses from the Service
func serviceIPs(svc *core.Service) []string {
var ips []string
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP != "" {
ips = append(ips, ingress.IP)
}
}
return ips
}
// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods.
// If at least one node has External IPs available, only external IPs are returned.
// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned.
func (h *handler) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
// Go doesn't have sets so we stuff things into a map of bools and then get lists of keys
// to determine the unique set of IPs in use by pods.
extIPs := map[string]bool{}
intIPs := map[string]bool{}
for _, pod := range pods {
if pod.Spec.NodeName == "" || pod.Status.PodIP == "" {
continue
}
if !Ready.IsTrue(pod) {
continue
}
node, err := h.nodeCache.Get(pod.Spec.NodeName)
if apierrors.IsNotFound(err) {
continue
} else if err != nil {
return nil, err
}
for _, addr := range node.Status.Addresses {
if addr.Type == core.NodeExternalIP {
extIPs[addr.Address] = true
} else if addr.Type == core.NodeInternalIP {
intIPs[addr.Address] = true
}
}
}
keys := func(addrs map[string]bool) (ips []string) {
for k := range addrs {
ips = append(ips, k)
}
return ips
}
var ips []string
if len(extIPs) > 0 {
ips = keys(extIPs)
} else {
ips = keys(intIPs)
}
ips, err := filterByIPFamily(ips, svc)
if err != nil {
return nil, err
}
if len(ips) > 0 && h.rootless {
return []string{"127.0.0.1"}, nil
}
return ips, nil
}
// filterByIPFamily filters ips based on dual-stack parameters of the service
func filterByIPFamily(ips []string, svc *core.Service) ([]string, error) {
var ipFamilyPolicy core.IPFamilyPolicyType
var ipv4Addresses []string
var ipv6Addresses []string
for _, ip := range ips {
if utilsnet.IsIPv4String(ip) {
ipv4Addresses = append(ipv4Addresses, ip)
}
if utilsnet.IsIPv6String(ip) {
ipv6Addresses = append(ipv6Addresses, ip)
}
}
if svc.Spec.IPFamilyPolicy != nil {
ipFamilyPolicy = *svc.Spec.IPFamilyPolicy
}
switch ipFamilyPolicy {
case core.IPFamilyPolicySingleStack:
if svc.Spec.IPFamilies[0] == core.IPv4Protocol {
return ipv4Addresses, nil
}
if svc.Spec.IPFamilies[0] == core.IPv6Protocol {
return ipv6Addresses, nil
}
case core.IPFamilyPolicyPreferDualStack:
if svc.Spec.IPFamilies[0] == core.IPv4Protocol {
ipAddresses := append(ipv4Addresses, ipv6Addresses...)
return ipAddresses, nil
}
if svc.Spec.IPFamilies[0] == core.IPv6Protocol {
ipAddresses := append(ipv6Addresses, ipv4Addresses...)
return ipAddresses, nil
}
case core.IPFamilyPolicyRequireDualStack:
if (len(ipv4Addresses) == 0) || (len(ipv6Addresses) == 0) {
return nil, errors.New("one or more IP families did not have addresses available for service with ipFamilyPolicy=RequireDualStack")
}
if svc.Spec.IPFamilies[0] == core.IPv4Protocol {
ipAddresses := append(ipv4Addresses, ipv6Addresses...)
return ipAddresses, nil
}
if svc.Spec.IPFamilies[0] == core.IPv6Protocol {
ipAddresses := append(ipv6Addresses, ipv4Addresses...)
return ipAddresses, nil
}
}
return nil, errors.New("unhandled ipFamilyPolicy")
}
// deployPod ensures that there is a DaemonSet for the service.
// It also ensures that any legacy Deployments from older versions of ServiceLB are deleted.
func (h *handler) deployPod(svc *core.Service) error {
if err := h.deleteOldDeployments(svc); err != nil {
return err
}
if !h.enabled || svc.DeletionTimestamp != nil || svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
return h.deletePod(svc)
}
ds, err := h.newDaemonSet(svc)
if err != nil {
return err
}
objs := objectset.NewObjectSet()
if ds != nil {
objs.Add(ds)
defer h.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applied LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name)
}
return h.processor.WithOwner(svc).Apply(objs)
}
// deletePod ensures that there are no DaemonSets for the given service.
func (h *handler) deletePod(svc *core.Service) error {
name := generateName(svc)
if err := h.daemonsets.DaemonSets(h.klipperLBNamespace).Delete(context.TODO(), name, meta.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
defer h.recorder.Eventf(svc, core.EventTypeNormal, "DeletedDaemonSet", "Deleted LoadBalancer DaemonSet %s/%s", h.klipperLBNamespace, name)
return nil
}
// newDaemonSet creates a DaemonSet to ensure that ServiceLB pods are run on
// each eligible node.
func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
name := generateName(svc)
oneInt := intstr.FromInt(1)
// If ipv6 is present, we must enable ipv6 forwarding in the manifest
var ipv6Switch bool
for _, ipFamily := range svc.Spec.IPFamilies {
if ipFamily == core.IPv6Protocol {
ipv6Switch = true
}
}
ds := &apps.DaemonSet{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: h.klipperLBNamespace,
Labels: map[string]string{
nodeSelectorLabel: "false",
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
},
},
TypeMeta: meta.TypeMeta{
Kind: "DaemonSet",
APIVersion: "apps/v1",
},
Spec: apps.DaemonSetSpec{
Selector: &meta.LabelSelector{
MatchLabels: map[string]string{
"app": name,
},
},
Template: core.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: map[string]string{
"app": name,
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
},
},
Spec: core.PodSpec{
AutomountServiceAccountToken: utilpointer.Bool(false),
},
},
UpdateStrategy: apps.DaemonSetUpdateStrategy{
Type: apps.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &apps.RollingUpdateDaemonSet{
MaxUnavailable: &oneInt,
},
},
},
}
if ipv6Switch {
// Add security context to enable ipv6 forwarding
securityContext := &core.PodSecurityContext{
Sysctls: []core.Sysctl{
{
Name: "net.ipv6.conf.all.forwarding",
Value: "1",
},
},
}
ds.Spec.Template.Spec.SecurityContext = securityContext
}
for _, port := range svc.Spec.Ports {
portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port)
container := core.Container{
Name: portName,
Image: DefaultLBImage,
ImagePullPolicy: core.PullIfNotPresent,
Ports: []core.ContainerPort{
{
Name: portName,
ContainerPort: port.Port,
HostPort: port.Port,
Protocol: port.Protocol,
},
},
Env: []core.EnvVar{
{
Name: "SRC_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "DEST_PROTO",
Value: string(port.Protocol),
},
{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "DEST_IPS",
Value: strings.Join(svc.Spec.ClusterIPs, " "),
},
},
SecurityContext: &core.SecurityContext{
Capabilities: &core.Capabilities{
Add: []core.Capability{
"NET_ADMIN",
},
},
},
}
ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
}
// Add toleration to noderole.kubernetes.io/master=*:NoSchedule
masterToleration := core.Toleration{
Key: "node-role.kubernetes.io/master",
Operator: "Exists",
Effect: "NoSchedule",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration)
// Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule
controlPlaneToleration := core.Toleration{
Key: "node-role.kubernetes.io/control-plane",
Operator: "Exists",
Effect: "NoSchedule",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration)
// Add toleration to CriticalAddonsOnly
criticalAddonsOnlyToleration := core.Toleration{
Key: "CriticalAddonsOnly",
Operator: "Exists",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration)
// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
selector, err := labels.Parse(daemonsetNodeLabel)
if err != nil {
return nil, err
}
nodesWithLabel, err := h.nodeCache.List(selector)
if err != nil {
return nil, err
}
if len(nodesWithLabel) > 0 {
ds.Spec.Template.Spec.NodeSelector = map[string]string{
daemonsetNodeLabel: "true",
}
// Add node selector for "svccontroller.k3s.cattle.io/lbpool=<pool>" if service has lbpool label
if svc.Labels[daemonsetNodePoolLabel] != "" {
ds.Spec.Template.Spec.NodeSelector[daemonsetNodePoolLabel] = svc.Labels[daemonsetNodePoolLabel]
}
ds.Labels[nodeSelectorLabel] = "true"
}
return ds, nil
}
func (h *handler) updateDaemonSets() error {
daemonsets, err := h.daemonsets.DaemonSets("").List(context.TODO(), meta.ListOptions{
LabelSelector: nodeSelectorLabel + "=false",
})
if err != nil {
return err
}
for _, ds := range daemonsets.Items {
ds.Spec.Template.Spec.NodeSelector = map[string]string{
daemonsetNodeLabel: "true",
}
ds.Labels[nodeSelectorLabel] = "true"
if _, err := h.daemonsets.DaemonSets(ds.Namespace).Update(context.TODO(), &ds, meta.UpdateOptions{}); err != nil {
return err
}
}
return nil
}
// deleteOldDeployments ensures that there are no legacy Deployments for ServiceLB pods.
// ServiceLB used to use Deployments before switching to DaemonSets in 875ba28
func (h *handler) deleteOldDeployments(svc *core.Service) error {
name := fmt.Sprintf("svclb-%s", svc.Name)
if _, err := h.deploymentCache.Get(svc.Namespace, name); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
return h.deployments.Deployments(svc.Namespace).Delete(context.TODO(), name, meta.DeleteOptions{})
}
// addFinalizer ensures that there is a finalizer for this controller on the Service
func (h *handler) addFinalizer(svc *core.Service) (*core.Service, error) {
if !h.hasFinalizer(svc) {
svc.Finalizers = append(svc.Finalizers, finalizerName)
return h.services.Services(svc.Namespace).Update(context.TODO(), svc, meta.UpdateOptions{})
}
return svc, nil
}
// removeFinalizer ensures that there is not a finalizer for this controller on the Service
func (h *handler) removeFinalizer(svc *core.Service) (*core.Service, error) {
if !h.hasFinalizer(svc) {
return svc, nil
}
for k, v := range svc.Finalizers {
if v != finalizerName {
continue
}
svc.Finalizers = append(svc.Finalizers[:k], svc.Finalizers[k+1:]...)
}
return h.services.Services(svc.Namespace).Update(context.TODO(), svc, meta.UpdateOptions{})
}
// hasFinalizer returns a boolean indicating whether or not there is a finalizer for this controller on the Service
func (h *handler) hasFinalizer(svc *core.Service) bool {
for _, finalizer := range svc.Finalizers {
if finalizer == finalizerName {
return true
}
}
return false
}
// generateName generates a distinct name for the DaemonSet based on the service name and UID
func generateName(svc *core.Service) string {
return fmt.Sprintf("svclb-%s-%s", svc.Name, svc.UID[:8])
}