Replace options.KubeRouterConfig with config.Node and remove metrics/waitgroup stuff

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2021-02-01 11:20:24 -08:00 committed by Brad Davidson
parent 07256cf7ab
commit 65c78cc397
9 changed files with 145 additions and 227 deletions

View File

@ -1,11 +1,16 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/namespace.go
// +build !windows
package netpol
import (
"reflect"
"github.com/golang/glog"
api "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
glog "k8s.io/klog"
)
func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler {

View File

@ -0,0 +1,63 @@
// +build !windows
package netpol
import (
"context"
"github.com/rancher/k3s/pkg/agent/netpol/utils"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/sirupsen/logrus"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
// Run creates and starts a new instance of the kube-router network policy controller
// The code in this function is cribbed from the upstream controller at:
// https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/cmd/kube-router.go#L78
// The NewNetworkPolicyController function has also been modified to use the k3s config.Node struct instead of KubeRouter's
// CLI configuration, eliminate use of a WaitGroup for shutdown sequencing, and drop Prometheus metrics support.
func Run(ctx context.Context, nodeConfig *config.Node) error {
set, err := utils.NewIPSet(false)
if err != nil {
logrus.Warnf("Skipping network policy controller start, ipset unavailable: %v", err)
return nil
}
if err := set.Save(); err != nil {
logrus.Warnf("Skipping network policy controller start, ipset save failed: %v", err)
return nil
}
restConfig, err := clientcmd.BuildConfigFromFlags("", nodeConfig.AgentConfig.KubeConfigK3sController)
if err != nil {
return err
}
client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}
stopCh := ctx.Done()
informerFactory := informers.NewSharedInformerFactory(client, 0)
podInformer := informerFactory.Core().V1().Pods().Informer()
nsInformer := informerFactory.Core().V1().Namespaces().Informer()
npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer()
informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)
npc, err := NewNetworkPolicyController(client, nodeConfig, podInformer, npInformer, nsInformer)
if err != nil {
return err
}
podInformer.AddEventHandler(npc.PodEventHandler)
nsInformer.AddEventHandler(npc.NamespaceEventHandler)
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)
go npc.Run(stopCh)
return nil
}

View File

@ -1,59 +0,0 @@
// +build !windows
package netpol
import (
"context"
"time"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
)
func Run(ctx context.Context, nodeConfig *config.Node) error {
if _, err := NewSavedIPSet(false); err != nil {
logrus.Warnf("Skipping network policy controller start, ipset unavailable: %v", err)
return nil
}
restConfig, err := clientcmd.BuildConfigFromFlags("", nodeConfig.AgentConfig.KubeConfigK3sController)
if err != nil {
return err
}
client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}
// retry backoff to wait for the clusterrolebinding for the k3s tunnel controller (system:k3s-controller or equivalent)
// which has to occur before it can bring up the connection to the API server.
retryBackoff := wait.Backoff{
Steps: 6,
Duration: 100 * time.Millisecond,
Factor: 3.0,
Cap: 30 * time.Second,
}
retryErr := retry.OnError(retryBackoff, errors.IsForbidden, func() error {
_, err := client.NetworkingV1().NetworkPolicies("").List(ctx, metav1.ListOptions{})
return err
})
if retryErr != nil {
return retryErr
}
npc, err := NewNetworkPolicyController(ctx.Done(), client, time.Minute, nodeConfig.AgentConfig.NodeName)
if err != nil {
return err
}
go npc.Run(ctx.Done())
return nil
}

View File

@ -1,3 +1,8 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/network_policy_controller.go
// +build !windows
package netpol
import (
@ -5,23 +10,19 @@ import (
"encoding/base32"
"fmt"
"net"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/cloudnativelabs/kube-router/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/pkg/metrics"
"github.com/cloudnativelabs/kube-router/pkg/options"
"github.com/cloudnativelabs/kube-router/pkg/utils"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/rancher/k3s/pkg/agent/netpol/utils"
"github.com/rancher/k3s/pkg/daemons/config"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
glog "k8s.io/klog"
)
const (
@ -32,6 +33,7 @@ const (
kubeInputChainName = "KUBE-ROUTER-INPUT"
kubeForwardChainName = "KUBE-ROUTER-FORWARD"
kubeOutputChainName = "KUBE-ROUTER-OUTPUT"
defaultSyncPeriod = 5 * time.Minute
)
// Network policy controller provides both ingress and egress filtering for the pods as per the defined network
@ -54,8 +56,6 @@ type NetworkPolicyController struct {
serviceNodePortRange string
mu sync.Mutex
syncPeriod time.Duration
MetricsEnabled bool
healthChan chan<- *healthcheck.ControllerHeartbeat
fullSyncRequestChan chan struct{}
ipSetHandler *utils.IPSet
@ -131,13 +131,11 @@ type protocol2eps map[string]numericPort2eps
type namedPort2eps map[string]protocol2eps
// Run runs forever till we receive notification on stopCh
func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) {
t := time.NewTicker(npc.syncPeriod)
defer t.Stop()
defer wg.Done()
glog.Info("Starting network policy controller")
npc.healthChan = healthChan
// setup kube-router specific top level cutoms chains
npc.ensureTopLevelChains()
@ -145,9 +143,7 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.Controlle
// Full syncs of the network policy controller take a lot of time and can only be processed one at a time,
// therefore, we start it in it's own goroutine and request a sync through a single item channel
glog.Info("Starting network policy controller full sync goroutine")
wg.Add(1)
go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}) {
for {
// Add an additional non-blocking select to ensure that if the stopCh channel is closed it is handled first
select {
@ -165,7 +161,7 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.Controlle
npc.fullPolicySync() // fullPolicySync() is a blocking request here
}
}
}(npc.fullSyncRequestChan, stopCh, wg)
}(npc.fullSyncRequestChan, stopCh)
// loop forever till notified to stop on stopCh
for {
@ -198,14 +194,10 @@ func (npc *NetworkPolicyController) fullPolicySync() {
npc.mu.Lock()
defer npc.mu.Unlock()
healthcheck.SendHeartBeat(npc.healthChan, "NPC")
start := time.Now()
syncVersion := strconv.FormatInt(start.UnixNano(), 10)
defer func() {
endTime := time.Since(start)
if npc.MetricsEnabled {
metrics.ControllerIptablesSyncTime.Observe(endTime.Seconds())
}
glog.V(1).Infof("sync iptables took %v", endTime)
}()
@ -591,7 +583,7 @@ func (npc *NetworkPolicyController) Cleanup() {
// NewNetworkPolicyController returns new NetworkPolicyController object
func NewNetworkPolicyController(clientset kubernetes.Interface,
config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer,
config *config.Node, podInformer cache.SharedIndexInformer,
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{}
@ -600,54 +592,11 @@ func NewNetworkPolicyController(clientset kubernetes.Interface,
// be up to date with all of the policy changes from any enqueued request after that
npc.fullSyncRequestChan = make(chan struct{}, 1)
// Validate and parse ClusterIP service range
_, ipnet, err := net.ParseCIDR(config.ClusterIPCIDR)
if err != nil {
return nil, fmt.Errorf("failed to get parse --service-cluster-ip-range parameter: %s", err.Error())
}
npc.serviceClusterIPRange = *ipnet
npc.serviceClusterIPRange = config.AgentConfig.ServiceCIDR
npc.serviceNodePortRange = strings.ReplaceAll(config.AgentConfig.ServiceNodePortRange.String(), "-", ":")
npc.syncPeriod = defaultSyncPeriod
// Validate and parse NodePort range
nodePortValidator := regexp.MustCompile(`^([0-9]+)[:-]{1}([0-9]+)$`)
if matched := nodePortValidator.MatchString(config.NodePortRange); !matched {
return nil, fmt.Errorf("failed to parse node port range given: '%s' please see specification in help text", config.NodePortRange)
}
matches := nodePortValidator.FindStringSubmatch(config.NodePortRange)
if len(matches) != 3 {
return nil, fmt.Errorf("could not parse port number from range given: '%s'", config.NodePortRange)
}
port1, err := strconv.ParseInt(matches[1], 10, 16)
if err != nil {
return nil, fmt.Errorf("could not parse first port number from range given: '%s'", config.NodePortRange)
}
port2, err := strconv.ParseInt(matches[2], 10, 16)
if err != nil {
return nil, fmt.Errorf("could not parse second port number from range given: '%s'", config.NodePortRange)
}
if port1 >= port2 {
return nil, fmt.Errorf("port 1 is greater than or equal to port 2 in range given: '%s'", config.NodePortRange)
}
npc.serviceNodePortRange = fmt.Sprintf("%d:%d", port1, port2)
// Validate and parse ExternalIP service range
for _, externalIPRange := range config.ExternalIPCIDRs {
_, ipnet, err := net.ParseCIDR(externalIPRange)
if err != nil {
return nil, fmt.Errorf("failed to get parse --service-external-ip-range parameter: '%s'. Error: %s", externalIPRange, err.Error())
}
npc.serviceExternalIPRanges = append(npc.serviceExternalIPRanges, *ipnet)
}
if config.MetricsEnabled {
//Register the metrics for this controller
prometheus.MustRegister(metrics.ControllerIptablesSyncTime)
prometheus.MustRegister(metrics.ControllerPolicyChainsSyncTime)
npc.MetricsEnabled = true
}
npc.syncPeriod = config.IPTablesSyncPeriod
node, err := utils.GetNodeObject(clientset, config.HostnameOverride)
node, err := utils.GetNodeObject(clientset, config.AgentConfig.NodeName)
if err != nil {
return nil, err
}

View File

@ -1,3 +1,8 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/network_policy_controller_test.go
// +build !windows
package netpol
import (
@ -14,11 +19,12 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"github.com/cloudnativelabs/kube-router/pkg/options"
"github.com/rancher/k3s/pkg/daemons/config"
)
// newFakeInformersFromClient creates the different informers used in the uneventful network policy controller
@ -247,26 +253,40 @@ func testForMissingOrUnwanted(t *testing.T, targetMsg string, got []podInfo, wan
}
}
func newMinimalKubeRouterConfig(clusterIPCIDR string, nodePortRange string, hostNameOverride string, externalIPs []string) *options.KubeRouterConfig {
kubeConfig := options.NewKubeRouterConfig()
func newMinimalNodeConfig(clusterIPCIDR string, nodePortRange string, hostNameOverride string, externalIPs []string) *config.Node {
nodeConfig := &config.Node{AgentConfig: config.Agent{}}
if clusterIPCIDR != "" {
kubeConfig.ClusterIPCIDR = clusterIPCIDR
_, cidr, err := net.ParseCIDR(clusterIPCIDR)
if err != nil {
panic("failed to get parse --service-cluster-ip-range parameter: " + err.Error())
}
nodeConfig.AgentConfig.ClusterCIDR = *cidr
}
if nodePortRange != "" {
kubeConfig.NodePortRange = nodePortRange
portRange, err := utilnet.ParsePortRange(nodePortRange)
if err != nil {
panic("failed to get parse --service-node-port-range:" + err.Error())
}
nodeConfig.AgentConfig.ServiceNodePortRange = *portRange
}
if hostNameOverride != "" {
kubeConfig.HostnameOverride = hostNameOverride
nodeConfig.AgentConfig.NodeName = hostNameOverride
}
if externalIPs != nil {
kubeConfig.ExternalIPCIDRs = externalIPs
// TODO: We don't currently have a way to set these through the K3s CLI; if we ever do then test that here.
for _, cidr := range externalIPs {
if _, _, err := net.ParseCIDR(cidr); err != nil {
panic("failed to get parse --service-external-ip-range parameter: " + err.Error())
}
}
}
return kubeConfig
return nodeConfig
}
type tNetPolConfigTestCase struct {
name string
config *options.KubeRouterConfig
config *config.Node
expectError bool
errorText string
}
@ -399,121 +419,43 @@ func TestNetworkPolicyController(t *testing.T) {
testCases := []tNetPolConfigTestCase{
{
"Default options are successful",
newMinimalKubeRouterConfig("", "", "node", nil),
newMinimalNodeConfig("", "", "node", nil),
false,
"",
},
{
"Missing nodename fails appropriately",
newMinimalKubeRouterConfig("", "", "", nil),
newMinimalNodeConfig("", "", "", nil),
true,
"Failed to identify the node by NODE_NAME, hostname or --hostname-override",
},
{
"Test bad cluster CIDR (not properly formatting ip address)",
newMinimalKubeRouterConfig("10.10.10", "", "node", nil),
true,
"failed to get parse --service-cluster-ip-range parameter: invalid CIDR address: 10.10.10",
},
{
"Test bad cluster CIDR (not using an ip address)",
newMinimalKubeRouterConfig("foo", "", "node", nil),
true,
"failed to get parse --service-cluster-ip-range parameter: invalid CIDR address: foo",
},
{
"Test bad cluster CIDR (using an ip address that is not a CIDR)",
newMinimalKubeRouterConfig("10.10.10.10", "", "node", nil),
true,
"failed to get parse --service-cluster-ip-range parameter: invalid CIDR address: 10.10.10.10",
},
{
"Test good cluster CIDR (using single IP with a /32)",
newMinimalKubeRouterConfig("10.10.10.10/32", "", "node", nil),
newMinimalNodeConfig("10.10.10.10/32", "", "node", nil),
false,
"",
},
{
"Test good cluster CIDR (using normal range with /24)",
newMinimalKubeRouterConfig("10.10.10.0/24", "", "node", nil),
false,
"",
},
{
"Test bad node port specification (using commas)",
newMinimalKubeRouterConfig("", "8080,8081", "node", nil),
true,
"failed to parse node port range given: '8080,8081' please see specification in help text",
},
{
"Test bad node port specification (not using numbers)",
newMinimalKubeRouterConfig("", "foo:bar", "node", nil),
true,
"failed to parse node port range given: 'foo:bar' please see specification in help text",
},
{
"Test bad node port specification (using anything in addition to range)",
newMinimalKubeRouterConfig("", "8080,8081-8090", "node", nil),
true,
"failed to parse node port range given: '8080,8081-8090' please see specification in help text",
},
{
"Test bad node port specification (using reversed range)",
newMinimalKubeRouterConfig("", "8090-8080", "node", nil),
true,
"port 1 is greater than or equal to port 2 in range given: '8090-8080'",
},
{
"Test bad node port specification (port out of available range)",
newMinimalKubeRouterConfig("", "132000-132001", "node", nil),
true,
"could not parse first port number from range given: '132000-132001'",
},
{
"Test good node port specification (using colon separator)",
newMinimalKubeRouterConfig("", "8080:8090", "node", nil),
newMinimalNodeConfig("10.10.10.0/24", "", "node", nil),
false,
"",
},
{
"Test good node port specification (using hyphen separator)",
newMinimalKubeRouterConfig("", "8080-8090", "node", nil),
newMinimalNodeConfig("", "8080-8090", "node", nil),
false,
"",
},
{
"Test bad external IP CIDR (not properly formatting ip address)",
newMinimalKubeRouterConfig("", "", "node", []string{"199.10.10"}),
true,
"failed to get parse --service-external-ip-range parameter: '199.10.10'. Error: invalid CIDR address: 199.10.10",
},
{
"Test bad external IP CIDR (not using an ip address)",
newMinimalKubeRouterConfig("", "", "node", []string{"foo"}),
true,
"failed to get parse --service-external-ip-range parameter: 'foo'. Error: invalid CIDR address: foo",
},
{
"Test bad external IP CIDR (using an ip address that is not a CIDR)",
newMinimalKubeRouterConfig("", "", "node", []string{"199.10.10.10"}),
true,
"failed to get parse --service-external-ip-range parameter: '199.10.10.10'. Error: invalid CIDR address: 199.10.10.10",
},
{
"Test bad external IP CIDR (making sure that it processes all items in the list)",
newMinimalKubeRouterConfig("", "", "node", []string{"199.10.10.10/32", "199.10.10.11"}),
true,
"failed to get parse --service-external-ip-range parameter: '199.10.10.11'. Error: invalid CIDR address: 199.10.10.11",
},
{
"Test good external IP CIDR (using single IP with a /32)",
newMinimalKubeRouterConfig("", "", "node", []string{"199.10.10.10/32"}),
newMinimalNodeConfig("", "", "node", []string{"199.10.10.10/32"}),
false,
"",
},
{
"Test good external IP CIDR (using normal range with /24)",
newMinimalKubeRouterConfig("", "", "node", []string{"199.10.10.10/24"}),
newMinimalNodeConfig("", "", "node", []string{"199.10.10.10/24"}),
false,
"",
},

View File

@ -1,3 +1,8 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/pod.go
// +build !windows
package netpol
import (
@ -7,9 +12,9 @@ import (
"strings"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog"
api "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
glog "k8s.io/klog"
)
func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler {

View File

@ -1,3 +1,8 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/pod.go
// +build !windows
package netpol
import (
@ -9,10 +14,8 @@ import (
"strings"
"time"
"github.com/cloudnativelabs/kube-router/pkg/metrics"
"github.com/cloudnativelabs/kube-router/pkg/utils"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog"
"github.com/rancher/k3s/pkg/agent/netpol/utils"
api "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -20,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
glog "k8s.io/klog"
)
func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler {
@ -73,7 +77,6 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
start := time.Now()
defer func() {
endTime := time.Since(start)
metrics.ControllerPolicyChainsSyncTime.Observe(endTime.Seconds())
glog.V(2).Infof("Syncing network policy chains took %v", endTime)
}()
activePolicyChains := make(map[string]bool)

View File

@ -1,3 +1,8 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/utils/ipset.go
// +build !windows
package utils
import (

View File

@ -1,3 +1,8 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/utils/node.go
// +build !windows
package utils
import (