diff --git a/pkg/agent/netpol/namespace.go b/pkg/agent/netpol/namespace.go index 4c342092f2..90fb95cc15 100644 --- a/pkg/agent/netpol/namespace.go +++ b/pkg/agent/netpol/namespace.go @@ -1,11 +1,16 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/namespace.go + +// +build !windows + package netpol import ( "reflect" - "github.com/golang/glog" api "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" + glog "k8s.io/klog" ) func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler { diff --git a/pkg/agent/netpol/netpol.go b/pkg/agent/netpol/netpol.go new file mode 100644 index 0000000000..bac05a6ee6 --- /dev/null +++ b/pkg/agent/netpol/netpol.go @@ -0,0 +1,63 @@ +// +build !windows + +package netpol + +import ( + "context" + + "github.com/rancher/k3s/pkg/agent/netpol/utils" + "github.com/rancher/k3s/pkg/daemons/config" + "github.com/sirupsen/logrus" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +// Run creates and starts a new instance of the kube-router network policy controller +// The code in this function is cribbed from the upstream controller at: +// https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/cmd/kube-router.go#L78 +// The NewNetworkPolicyController function has also been modified to use the k3s config.Node struct instead of KubeRouter's +// CLI configuration, eliminate use of a WaitGroup for shutdown sequencing, and drop Prometheus metrics support. +func Run(ctx context.Context, nodeConfig *config.Node) error { + set, err := utils.NewIPSet(false) + if err != nil { + logrus.Warnf("Skipping network policy controller start, ipset unavailable: %v", err) + return nil + } + + if err := set.Save(); err != nil { + logrus.Warnf("Skipping network policy controller start, ipset save failed: %v", err) + return nil + } + + restConfig, err := clientcmd.BuildConfigFromFlags("", nodeConfig.AgentConfig.KubeConfigK3sController) + if err != nil { + return err + } + + client, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return err + } + + stopCh := ctx.Done() + informerFactory := informers.NewSharedInformerFactory(client, 0) + podInformer := informerFactory.Core().V1().Pods().Informer() + nsInformer := informerFactory.Core().V1().Namespaces().Informer() + npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer() + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + + npc, err := NewNetworkPolicyController(client, nodeConfig, podInformer, npInformer, nsInformer) + if err != nil { + return err + } + + podInformer.AddEventHandler(npc.PodEventHandler) + nsInformer.AddEventHandler(npc.NamespaceEventHandler) + npInformer.AddEventHandler(npc.NetworkPolicyEventHandler) + + go npc.Run(stopCh) + + return nil +} diff --git a/pkg/agent/netpol/network_policy.go b/pkg/agent/netpol/network_policy.go deleted file mode 100644 index 9615dcce4e..0000000000 --- a/pkg/agent/netpol/network_policy.go +++ /dev/null @@ -1,59 +0,0 @@ -// +build !windows - -package netpol - -import ( - "context" - "time" - - "github.com/rancher/k3s/pkg/daemons/config" - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/retry" -) - -func Run(ctx context.Context, nodeConfig *config.Node) error { - if _, err := NewSavedIPSet(false); err != nil { - logrus.Warnf("Skipping network policy controller start, ipset unavailable: %v", err) - return nil - } - - restConfig, err := clientcmd.BuildConfigFromFlags("", nodeConfig.AgentConfig.KubeConfigK3sController) - if err != nil { - return err - } - - client, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return err - } - - // retry backoff to wait for the clusterrolebinding for the k3s tunnel controller (system:k3s-controller or equivalent) - // which has to occur before it can bring up the connection to the API server. - retryBackoff := wait.Backoff{ - Steps: 6, - Duration: 100 * time.Millisecond, - Factor: 3.0, - Cap: 30 * time.Second, - } - retryErr := retry.OnError(retryBackoff, errors.IsForbidden, func() error { - _, err := client.NetworkingV1().NetworkPolicies("").List(ctx, metav1.ListOptions{}) - return err - }) - if retryErr != nil { - return retryErr - } - - npc, err := NewNetworkPolicyController(ctx.Done(), client, time.Minute, nodeConfig.AgentConfig.NodeName) - if err != nil { - return err - } - - go npc.Run(ctx.Done()) - - return nil -} diff --git a/pkg/agent/netpol/network_policy_controller.go b/pkg/agent/netpol/network_policy_controller.go index f3d54b604f..1859f41f65 100644 --- a/pkg/agent/netpol/network_policy_controller.go +++ b/pkg/agent/netpol/network_policy_controller.go @@ -1,3 +1,8 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/network_policy_controller.go + +// +build !windows + package netpol import ( @@ -5,23 +10,19 @@ import ( "encoding/base32" "fmt" "net" - "regexp" "strconv" "strings" "sync" "time" - "github.com/cloudnativelabs/kube-router/pkg/healthcheck" - "github.com/cloudnativelabs/kube-router/pkg/metrics" - "github.com/cloudnativelabs/kube-router/pkg/options" - "github.com/cloudnativelabs/kube-router/pkg/utils" "github.com/coreos/go-iptables/iptables" - "github.com/golang/glog" - "github.com/prometheus/client_golang/prometheus" + "github.com/rancher/k3s/pkg/agent/netpol/utils" + "github.com/rancher/k3s/pkg/daemons/config" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + glog "k8s.io/klog" ) const ( @@ -32,6 +33,7 @@ const ( kubeInputChainName = "KUBE-ROUTER-INPUT" kubeForwardChainName = "KUBE-ROUTER-FORWARD" kubeOutputChainName = "KUBE-ROUTER-OUTPUT" + defaultSyncPeriod = 5 * time.Minute ) // Network policy controller provides both ingress and egress filtering for the pods as per the defined network @@ -54,8 +56,6 @@ type NetworkPolicyController struct { serviceNodePortRange string mu sync.Mutex syncPeriod time.Duration - MetricsEnabled bool - healthChan chan<- *healthcheck.ControllerHeartbeat fullSyncRequestChan chan struct{} ipSetHandler *utils.IPSet @@ -131,13 +131,11 @@ type protocol2eps map[string]numericPort2eps type namedPort2eps map[string]protocol2eps // Run runs forever till we receive notification on stopCh -func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) { +func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) { t := time.NewTicker(npc.syncPeriod) defer t.Stop() - defer wg.Done() glog.Info("Starting network policy controller") - npc.healthChan = healthChan // setup kube-router specific top level cutoms chains npc.ensureTopLevelChains() @@ -145,9 +143,7 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.Controlle // Full syncs of the network policy controller take a lot of time and can only be processed one at a time, // therefore, we start it in it's own goroutine and request a sync through a single item channel glog.Info("Starting network policy controller full sync goroutine") - wg.Add(1) - go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}, wg *sync.WaitGroup) { - defer wg.Done() + go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}) { for { // Add an additional non-blocking select to ensure that if the stopCh channel is closed it is handled first select { @@ -165,7 +161,7 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.Controlle npc.fullPolicySync() // fullPolicySync() is a blocking request here } } - }(npc.fullSyncRequestChan, stopCh, wg) + }(npc.fullSyncRequestChan, stopCh) // loop forever till notified to stop on stopCh for { @@ -198,14 +194,10 @@ func (npc *NetworkPolicyController) fullPolicySync() { npc.mu.Lock() defer npc.mu.Unlock() - healthcheck.SendHeartBeat(npc.healthChan, "NPC") start := time.Now() syncVersion := strconv.FormatInt(start.UnixNano(), 10) defer func() { endTime := time.Since(start) - if npc.MetricsEnabled { - metrics.ControllerIptablesSyncTime.Observe(endTime.Seconds()) - } glog.V(1).Infof("sync iptables took %v", endTime) }() @@ -591,7 +583,7 @@ func (npc *NetworkPolicyController) Cleanup() { // NewNetworkPolicyController returns new NetworkPolicyController object func NewNetworkPolicyController(clientset kubernetes.Interface, - config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer, + config *config.Node, podInformer cache.SharedIndexInformer, npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) { npc := NetworkPolicyController{} @@ -600,54 +592,11 @@ func NewNetworkPolicyController(clientset kubernetes.Interface, // be up to date with all of the policy changes from any enqueued request after that npc.fullSyncRequestChan = make(chan struct{}, 1) - // Validate and parse ClusterIP service range - _, ipnet, err := net.ParseCIDR(config.ClusterIPCIDR) - if err != nil { - return nil, fmt.Errorf("failed to get parse --service-cluster-ip-range parameter: %s", err.Error()) - } - npc.serviceClusterIPRange = *ipnet + npc.serviceClusterIPRange = config.AgentConfig.ServiceCIDR + npc.serviceNodePortRange = strings.ReplaceAll(config.AgentConfig.ServiceNodePortRange.String(), "-", ":") + npc.syncPeriod = defaultSyncPeriod - // Validate and parse NodePort range - nodePortValidator := regexp.MustCompile(`^([0-9]+)[:-]{1}([0-9]+)$`) - if matched := nodePortValidator.MatchString(config.NodePortRange); !matched { - return nil, fmt.Errorf("failed to parse node port range given: '%s' please see specification in help text", config.NodePortRange) - } - matches := nodePortValidator.FindStringSubmatch(config.NodePortRange) - if len(matches) != 3 { - return nil, fmt.Errorf("could not parse port number from range given: '%s'", config.NodePortRange) - } - port1, err := strconv.ParseInt(matches[1], 10, 16) - if err != nil { - return nil, fmt.Errorf("could not parse first port number from range given: '%s'", config.NodePortRange) - } - port2, err := strconv.ParseInt(matches[2], 10, 16) - if err != nil { - return nil, fmt.Errorf("could not parse second port number from range given: '%s'", config.NodePortRange) - } - if port1 >= port2 { - return nil, fmt.Errorf("port 1 is greater than or equal to port 2 in range given: '%s'", config.NodePortRange) - } - npc.serviceNodePortRange = fmt.Sprintf("%d:%d", port1, port2) - - // Validate and parse ExternalIP service range - for _, externalIPRange := range config.ExternalIPCIDRs { - _, ipnet, err := net.ParseCIDR(externalIPRange) - if err != nil { - return nil, fmt.Errorf("failed to get parse --service-external-ip-range parameter: '%s'. Error: %s", externalIPRange, err.Error()) - } - npc.serviceExternalIPRanges = append(npc.serviceExternalIPRanges, *ipnet) - } - - if config.MetricsEnabled { - //Register the metrics for this controller - prometheus.MustRegister(metrics.ControllerIptablesSyncTime) - prometheus.MustRegister(metrics.ControllerPolicyChainsSyncTime) - npc.MetricsEnabled = true - } - - npc.syncPeriod = config.IPTablesSyncPeriod - - node, err := utils.GetNodeObject(clientset, config.HostnameOverride) + node, err := utils.GetNodeObject(clientset, config.AgentConfig.NodeName) if err != nil { return nil, err } diff --git a/pkg/agent/netpol/network_policy_controller_test.go b/pkg/agent/netpol/network_policy_controller_test.go index d74d87f1ab..d141e3c80b 100644 --- a/pkg/agent/netpol/network_policy_controller_test.go +++ b/pkg/agent/netpol/network_policy_controller_test.go @@ -1,3 +1,8 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/network_policy_controller_test.go + +// +build !windows + package netpol import ( @@ -14,11 +19,12 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" - "github.com/cloudnativelabs/kube-router/pkg/options" + "github.com/rancher/k3s/pkg/daemons/config" ) // newFakeInformersFromClient creates the different informers used in the uneventful network policy controller @@ -247,26 +253,40 @@ func testForMissingOrUnwanted(t *testing.T, targetMsg string, got []podInfo, wan } } -func newMinimalKubeRouterConfig(clusterIPCIDR string, nodePortRange string, hostNameOverride string, externalIPs []string) *options.KubeRouterConfig { - kubeConfig := options.NewKubeRouterConfig() +func newMinimalNodeConfig(clusterIPCIDR string, nodePortRange string, hostNameOverride string, externalIPs []string) *config.Node { + nodeConfig := &config.Node{AgentConfig: config.Agent{}} + if clusterIPCIDR != "" { - kubeConfig.ClusterIPCIDR = clusterIPCIDR + _, cidr, err := net.ParseCIDR(clusterIPCIDR) + if err != nil { + panic("failed to get parse --service-cluster-ip-range parameter: " + err.Error()) + } + nodeConfig.AgentConfig.ClusterCIDR = *cidr } if nodePortRange != "" { - kubeConfig.NodePortRange = nodePortRange + portRange, err := utilnet.ParsePortRange(nodePortRange) + if err != nil { + panic("failed to get parse --service-node-port-range:" + err.Error()) + } + nodeConfig.AgentConfig.ServiceNodePortRange = *portRange } if hostNameOverride != "" { - kubeConfig.HostnameOverride = hostNameOverride + nodeConfig.AgentConfig.NodeName = hostNameOverride } if externalIPs != nil { - kubeConfig.ExternalIPCIDRs = externalIPs + // TODO: We don't currently have a way to set these through the K3s CLI; if we ever do then test that here. + for _, cidr := range externalIPs { + if _, _, err := net.ParseCIDR(cidr); err != nil { + panic("failed to get parse --service-external-ip-range parameter: " + err.Error()) + } + } } - return kubeConfig + return nodeConfig } type tNetPolConfigTestCase struct { name string - config *options.KubeRouterConfig + config *config.Node expectError bool errorText string } @@ -399,121 +419,43 @@ func TestNetworkPolicyController(t *testing.T) { testCases := []tNetPolConfigTestCase{ { "Default options are successful", - newMinimalKubeRouterConfig("", "", "node", nil), + newMinimalNodeConfig("", "", "node", nil), false, "", }, { "Missing nodename fails appropriately", - newMinimalKubeRouterConfig("", "", "", nil), + newMinimalNodeConfig("", "", "", nil), true, "Failed to identify the node by NODE_NAME, hostname or --hostname-override", }, - { - "Test bad cluster CIDR (not properly formatting ip address)", - newMinimalKubeRouterConfig("10.10.10", "", "node", nil), - true, - "failed to get parse --service-cluster-ip-range parameter: invalid CIDR address: 10.10.10", - }, - { - "Test bad cluster CIDR (not using an ip address)", - newMinimalKubeRouterConfig("foo", "", "node", nil), - true, - "failed to get parse --service-cluster-ip-range parameter: invalid CIDR address: foo", - }, - { - "Test bad cluster CIDR (using an ip address that is not a CIDR)", - newMinimalKubeRouterConfig("10.10.10.10", "", "node", nil), - true, - "failed to get parse --service-cluster-ip-range parameter: invalid CIDR address: 10.10.10.10", - }, { "Test good cluster CIDR (using single IP with a /32)", - newMinimalKubeRouterConfig("10.10.10.10/32", "", "node", nil), + newMinimalNodeConfig("10.10.10.10/32", "", "node", nil), false, "", }, { "Test good cluster CIDR (using normal range with /24)", - newMinimalKubeRouterConfig("10.10.10.0/24", "", "node", nil), - false, - "", - }, - { - "Test bad node port specification (using commas)", - newMinimalKubeRouterConfig("", "8080,8081", "node", nil), - true, - "failed to parse node port range given: '8080,8081' please see specification in help text", - }, - { - "Test bad node port specification (not using numbers)", - newMinimalKubeRouterConfig("", "foo:bar", "node", nil), - true, - "failed to parse node port range given: 'foo:bar' please see specification in help text", - }, - { - "Test bad node port specification (using anything in addition to range)", - newMinimalKubeRouterConfig("", "8080,8081-8090", "node", nil), - true, - "failed to parse node port range given: '8080,8081-8090' please see specification in help text", - }, - { - "Test bad node port specification (using reversed range)", - newMinimalKubeRouterConfig("", "8090-8080", "node", nil), - true, - "port 1 is greater than or equal to port 2 in range given: '8090-8080'", - }, - { - "Test bad node port specification (port out of available range)", - newMinimalKubeRouterConfig("", "132000-132001", "node", nil), - true, - "could not parse first port number from range given: '132000-132001'", - }, - { - "Test good node port specification (using colon separator)", - newMinimalKubeRouterConfig("", "8080:8090", "node", nil), + newMinimalNodeConfig("10.10.10.0/24", "", "node", nil), false, "", }, { "Test good node port specification (using hyphen separator)", - newMinimalKubeRouterConfig("", "8080-8090", "node", nil), + newMinimalNodeConfig("", "8080-8090", "node", nil), false, "", }, - { - "Test bad external IP CIDR (not properly formatting ip address)", - newMinimalKubeRouterConfig("", "", "node", []string{"199.10.10"}), - true, - "failed to get parse --service-external-ip-range parameter: '199.10.10'. Error: invalid CIDR address: 199.10.10", - }, - { - "Test bad external IP CIDR (not using an ip address)", - newMinimalKubeRouterConfig("", "", "node", []string{"foo"}), - true, - "failed to get parse --service-external-ip-range parameter: 'foo'. Error: invalid CIDR address: foo", - }, - { - "Test bad external IP CIDR (using an ip address that is not a CIDR)", - newMinimalKubeRouterConfig("", "", "node", []string{"199.10.10.10"}), - true, - "failed to get parse --service-external-ip-range parameter: '199.10.10.10'. Error: invalid CIDR address: 199.10.10.10", - }, - { - "Test bad external IP CIDR (making sure that it processes all items in the list)", - newMinimalKubeRouterConfig("", "", "node", []string{"199.10.10.10/32", "199.10.10.11"}), - true, - "failed to get parse --service-external-ip-range parameter: '199.10.10.11'. Error: invalid CIDR address: 199.10.10.11", - }, { "Test good external IP CIDR (using single IP with a /32)", - newMinimalKubeRouterConfig("", "", "node", []string{"199.10.10.10/32"}), + newMinimalNodeConfig("", "", "node", []string{"199.10.10.10/32"}), false, "", }, { "Test good external IP CIDR (using normal range with /24)", - newMinimalKubeRouterConfig("", "", "node", []string{"199.10.10.10/24"}), + newMinimalNodeConfig("", "", "node", []string{"199.10.10.10/24"}), false, "", }, diff --git a/pkg/agent/netpol/pod.go b/pkg/agent/netpol/pod.go index 42e59fc676..f0ca85716d 100644 --- a/pkg/agent/netpol/pod.go +++ b/pkg/agent/netpol/pod.go @@ -1,3 +1,8 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/pod.go + +// +build !windows + package netpol import ( @@ -7,9 +12,9 @@ import ( "strings" "github.com/coreos/go-iptables/iptables" - "github.com/golang/glog" api "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" + glog "k8s.io/klog" ) func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler { diff --git a/pkg/agent/netpol/policy.go b/pkg/agent/netpol/policy.go index 627966200d..3590bf731b 100644 --- a/pkg/agent/netpol/policy.go +++ b/pkg/agent/netpol/policy.go @@ -1,3 +1,8 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/pod.go + +// +build !windows + package netpol import ( @@ -9,10 +14,8 @@ import ( "strings" "time" - "github.com/cloudnativelabs/kube-router/pkg/metrics" - "github.com/cloudnativelabs/kube-router/pkg/utils" "github.com/coreos/go-iptables/iptables" - "github.com/golang/glog" + "github.com/rancher/k3s/pkg/agent/netpol/utils" api "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -20,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + glog "k8s.io/klog" ) func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler { @@ -73,7 +77,6 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo start := time.Now() defer func() { endTime := time.Since(start) - metrics.ControllerPolicyChainsSyncTime.Observe(endTime.Seconds()) glog.V(2).Infof("Syncing network policy chains took %v", endTime) }() activePolicyChains := make(map[string]bool) diff --git a/pkg/agent/netpol/utils/ipset.go b/pkg/agent/netpol/utils/ipset.go index 8e9a39db52..3f6afda64d 100644 --- a/pkg/agent/netpol/utils/ipset.go +++ b/pkg/agent/netpol/utils/ipset.go @@ -1,3 +1,8 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/utils/ipset.go + +// +build !windows + package utils import ( diff --git a/pkg/agent/netpol/utils/node.go b/pkg/agent/netpol/utils/node.go index 6321437807..7283586f27 100644 --- a/pkg/agent/netpol/utils/node.go +++ b/pkg/agent/netpol/utils/node.go @@ -1,3 +1,8 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/utils/node.go + +// +build !windows + package utils import (