Collect IPs from all pods before deciding to use internal or external addresses (#2909)

* Collect IPs from all pods before deciding to use internal or external addresses

@Taloth correctly noted that the code that iterates over ServiceLB pods
to collect IP addresses was failing to add additional internal IPs once
the map contained ANY entry from a previous node. This may date back to
when ServiceLB used a Deployment instead of a DaemonSet, so there was
only ever a single pod.

The new behavior is to collect all internal and external IPs, and then
construct the address list of a single type - external if there are any,
otherwise internal.

https://github.com/k3s-io/k3s/issues/1652#issuecomment-774497788

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
Co-authored-by: Brian Downs <brian.downs@gmail.com>
This commit is contained in:
Brad Davidson 2021-02-09 16:26:57 -08:00 committed by GitHub
parent e06119729b
commit 374271e9a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -122,6 +122,7 @@ func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) (
}, nil
}
// onChangeService handles changes to Services.
func (h *handler) onChangeService(key string, svc *core.Service) (*core.Service, error) {
if svc == nil {
return nil, nil
@ -141,6 +142,8 @@ func (h *handler) onChangeService(key string, svc *core.Service) (*core.Service,
return nil, err
}
// onChangeNode handles changes to Nodes. We need to handle this as we may need to kick the DaemonSet
// to add or remove pods from nodes if labels have changed.
func (h *handler) onChangeNode(key string, node *core.Node) (*core.Node, error) {
if node == nil {
return nil, nil
@ -156,6 +159,8 @@ func (h *handler) onChangeNode(key string, node *core.Node) (*core.Node, error)
return node, nil
}
// updateService ensures that the Service ingress IP address list is in sync
// with the Nodes actually running pods for this service.
func (h *handler) updateService(svc *core.Service) (runtime.Object, error) {
if !h.enabled {
return svc, nil
@ -194,6 +199,7 @@ func (h *handler) updateService(svc *core.Service) (runtime.Object, error) {
return h.services.Services(svc.Namespace).UpdateStatus(context.TODO(), svc, meta.UpdateOptions{})
}
// serviceIPs returns the list of ingress IP addresses from the Service
func serviceIPs(svc *core.Service) []string {
var ips []string
@ -206,8 +212,14 @@ func serviceIPs(svc *core.Service) []string {
return ips
}
// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods.
// If at least one node has External IPs available, only external IPs are returned.
// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned.
func (h *handler) podIPs(pods []*core.Pod) ([]string, error) {
ips := map[string]bool{}
// Go doesn't have sets so we stuff things into a map of bools and then get lists of keys
// to determine the unique set of IPs in use by pods.
extIPs := map[string]bool{}
intIPs := map[string]bool{}
for _, pod := range pods {
if pod.Spec.NodeName == "" || pod.Status.PodIP == "" {
@ -226,30 +238,36 @@ func (h *handler) podIPs(pods []*core.Pod) ([]string, error) {
for _, addr := range node.Status.Addresses {
if addr.Type == core.NodeExternalIP {
ips[addr.Address] = true
}
}
if len(ips) == 0 {
for _, addr := range node.Status.Addresses {
if addr.Type == core.NodeInternalIP {
ips[addr.Address] = true
}
extIPs[addr.Address] = true
} else if addr.Type == core.NodeInternalIP {
intIPs[addr.Address] = true
}
}
}
var ipList []string
for k := range ips {
ipList = append(ipList, k)
keys := func(addrs map[string]bool) (ips []string) {
for k := range addrs {
ips = append(ips, k)
}
return ips
}
if len(ipList) > 0 && h.rootless {
var ips []string
if len(extIPs) > 0 {
ips = keys(extIPs)
} else {
ips = keys(intIPs)
}
if len(ips) > 0 && h.rootless {
return []string{"127.0.0.1"}, nil
}
return ipList, nil
return ips, nil
}
// deployPod ensures that there is a DaemonSet for each service.
// It also ensures that any legacy Deployments from older versions of ServiceLB are deleted.
func (h *handler) deployPod(svc *core.Service) error {
if err := h.deleteOldDeployments(svc); err != nil {
return err
@ -269,6 +287,8 @@ func (h *handler) deployPod(svc *core.Service) error {
return h.processor.WithOwner(svc).Apply(objs)
}
// newDaemonSet creates a DaemonSet to ensure that ServiceLB pods are run on
// each eligible node.
func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
name := fmt.Sprintf("svclb-%s", svc.Name)
oneInt := intstr.FromInt(1)
@ -424,6 +444,8 @@ func (h *handler) updateDaemonSets() error {
return nil
}
// deleteOldDeployments ensures that there are no legacy Deployments for ServiceLB pods.
// ServiceLB used to use Deployments before switching to DaemonSets in 875ba28
func (h *handler) deleteOldDeployments(svc *core.Service) error {
name := fmt.Sprintf("svclb-%s", svc.Name)
if _, err := h.deploymentCache.Get(svc.Namespace, name); err != nil {