diff --git a/pkg/servicelb/controller.go b/pkg/servicelb/controller.go index 901ce2e3ca..6c245688b3 100644 --- a/pkg/servicelb/controller.go +++ b/pkg/servicelb/controller.go @@ -2,9 +2,11 @@ package servicelb import ( "context" + "errors" "fmt" "sort" "strconv" + "strings" "github.com/rancher/k3s/pkg/version" "github.com/rancher/wrangler/pkg/apply" @@ -17,7 +19,7 @@ import ( "github.com/sirupsen/logrus" apps "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -25,6 +27,7 @@ import ( "k8s.io/client-go/kubernetes" v1getter "k8s.io/client-go/kubernetes/typed/apps/v1" coregetter "k8s.io/client-go/kubernetes/typed/core/v1" + utilsnet "k8s.io/utils/net" utilpointer "k8s.io/utils/pointer" ) @@ -32,7 +35,7 @@ var ( svcNameLabel = "svccontroller." + version.Program + ".cattle.io/svcname" daemonsetNodeLabel = "svccontroller." + version.Program + ".cattle.io/enablelb" nodeSelectorLabel = "svccontroller." + version.Program + ".cattle.io/nodeselector" - DefaultLBImage = "rancher/klipper-lb:v0.3.0" + DefaultLBImage = "rancher/klipper-lb:v0.3.2" ) const ( @@ -176,7 +179,7 @@ func (h *handler) updateService(svc *core.Service) (runtime.Object, error) { } existingIPs := serviceIPs(svc) - expectedIPs, err := h.podIPs(pods) + expectedIPs, err := h.podIPs(pods, svc) if err != nil { return svc, err } @@ -216,7 +219,7 @@ func serviceIPs(svc *core.Service) []string { // podIPs returns a list of IPs for Nodes hosting ServiceLB Pods. // If at least one node has External IPs available, only external IPs are returned. // If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned. -func (h *handler) podIPs(pods []*core.Pod) ([]string, error) { +func (h *handler) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) { // Go doesn't have sets so we stuff things into a map of bools and then get lists of keys // to determine the unique set of IPs in use by pods. extIPs := map[string]bool{} @@ -231,7 +234,7 @@ func (h *handler) podIPs(pods []*core.Pod) ([]string, error) { } node, err := h.nodeCache.Get(pod.Spec.NodeName) - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { continue } else if err != nil { return nil, err @@ -260,6 +263,11 @@ func (h *handler) podIPs(pods []*core.Pod) ([]string, error) { ips = keys(intIPs) } + ips, err := filterByIPFamily(ips, svc) + if err != nil { + return nil, err + } + if len(ips) > 0 && h.rootless { return []string{"127.0.0.1"}, nil } @@ -267,6 +275,55 @@ func (h *handler) podIPs(pods []*core.Pod) ([]string, error) { return ips, nil } +// filterByIPFamily filters ips based on dual-stack parameters of the service +func filterByIPFamily(ips []string, svc *core.Service) ([]string, error) { + + var ipv4Addresses []string + var ipv6Addresses []string + + for _, ip := range ips { + if utilsnet.IsIPv4String(ip) { + ipv4Addresses = append(ipv4Addresses, ip) + } + if utilsnet.IsIPv6String(ip) { + ipv6Addresses = append(ipv6Addresses, ip) + } + } + + switch *svc.Spec.IPFamilyPolicy { + case core.IPFamilyPolicySingleStack: + if svc.Spec.IPFamilies[0] == core.IPv4Protocol { + return ipv4Addresses, nil + } + if svc.Spec.IPFamilies[0] == core.IPv6Protocol { + return ipv6Addresses, nil + } + case core.IPFamilyPolicyPreferDualStack: + if svc.Spec.IPFamilies[0] == core.IPv4Protocol { + ipAddresses := append(ipv4Addresses, ipv6Addresses...) + return ipAddresses, nil + } + if svc.Spec.IPFamilies[0] == core.IPv6Protocol { + ipAddresses := append(ipv6Addresses, ipv4Addresses...) + return ipAddresses, nil + } + case core.IPFamilyPolicyRequireDualStack: + if (len(ipv4Addresses) == 0) || (len(ipv6Addresses) == 0) { + return nil, errors.New("one or more IP families did not have addresses available for service with ipFamilyPolicy=RequireDualStack") + } + if svc.Spec.IPFamilies[0] == core.IPv4Protocol { + ipAddresses := append(ipv4Addresses, ipv6Addresses...) + return ipAddresses, nil + } + if svc.Spec.IPFamilies[0] == core.IPv6Protocol { + ipAddresses := append(ipv6Addresses, ipv4Addresses...) + return ipAddresses, nil + } + } + + return nil, errors.New("unhandled ipFamilyPolicy") +} + // deployPod ensures that there is a DaemonSet for each service. // It also ensures that any legacy Deployments from older versions of ServiceLB are deleted. func (h *handler) deployPod(svc *core.Service) error { @@ -369,8 +426,8 @@ func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { Value: strconv.Itoa(int(port.Port)), }, { - Name: "DEST_IP", - Value: svc.Spec.ClusterIP, + Name: "DEST_IPS", + Value: strings.Join(svc.Spec.ClusterIPs, " "), }, }, SecurityContext: &core.SecurityContext{ @@ -453,7 +510,7 @@ func (h *handler) updateDaemonSets() error { func (h *handler) deleteOldDeployments(svc *core.Service) error { name := fmt.Sprintf("svclb-%s", svc.Name) if _, err := h.deploymentCache.Get(svc.Namespace, name); err != nil { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { return nil } return err diff --git a/scripts/airgap/image-list.txt b/scripts/airgap/image-list.txt index d3369d51f7..df80f1c977 100644 --- a/scripts/airgap/image-list.txt +++ b/scripts/airgap/image-list.txt @@ -1,5 +1,5 @@ docker.io/rancher/klipper-helm:v0.6.5-build20210915 -docker.io/rancher/klipper-lb:v0.3.0 +docker.io/rancher/klipper-lb:v0.3.2 docker.io/rancher/local-path-provisioner:v0.0.20 docker.io/rancher/mirrored-coredns-coredns:1.8.4 docker.io/rancher/mirrored-library-busybox:1.32.1