diff --git a/go.mod b/go.mod index 51bc55041e..d93b33bc8b 100644 --- a/go.mod +++ b/go.mod @@ -126,6 +126,7 @@ require ( k8s.io/controller-manager v0.21.2 k8s.io/cri-api v0.21.2 k8s.io/klog v1.0.0 + k8s.io/klog/v2 v2.8.0 k8s.io/kubectl v0.21.2 k8s.io/kubernetes v1.21.2 k8s.io/utils v0.0.0-20201110183641-67b214c5f920 diff --git a/pkg/agent/netpol/namespace.go b/pkg/agent/netpol/namespace.go index 90fb95cc15..073e7a3db6 100644 --- a/pkg/agent/netpol/namespace.go +++ b/pkg/agent/netpol/namespace.go @@ -1,5 +1,5 @@ // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) -// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/namespace.go +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/namespace.go // +build !windows @@ -10,7 +10,7 @@ import ( api "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" - glog "k8s.io/klog" + "k8s.io/klog/v2" ) func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler { @@ -32,7 +32,7 @@ func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEve return } default: - glog.Errorf("unexpected object type: %v", obj) + klog.Errorf("unexpected object type: %v", obj) } }, } @@ -42,7 +42,7 @@ func (npc *NetworkPolicyController) handleNamespaceAdd(obj *api.Namespace) { if obj.Labels == nil { return } - glog.V(2).Infof("Received update for namespace: %s", obj.Name) + klog.V(2).Infof("Received update for namespace: %s", obj.Name) npc.RequestFullSync() } @@ -51,7 +51,7 @@ func (npc *NetworkPolicyController) handleNamespaceUpdate(oldObj, newObj *api.Na if reflect.DeepEqual(oldObj.Labels, newObj.Labels) { return } - glog.V(2).Infof("Received update for namespace: %s", newObj.Name) + klog.V(2).Infof("Received update for namespace: %s", newObj.Name) npc.RequestFullSync() } @@ -60,7 +60,7 @@ func (npc *NetworkPolicyController) handleNamespaceDelete(obj *api.Namespace) { if obj.Labels == nil { return } - glog.V(2).Infof("Received namespace: %s delete event", obj.Name) + klog.V(2).Infof("Received namespace: %s delete event", obj.Name) npc.RequestFullSync() } diff --git a/pkg/agent/netpol/netpol.go b/pkg/agent/netpol/netpol.go index bac05a6ee6..757ff5b857 100644 --- a/pkg/agent/netpol/netpol.go +++ b/pkg/agent/netpol/netpol.go @@ -1,9 +1,13 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol.go + // +build !windows package netpol import ( "context" + "sync" "github.com/rancher/k3s/pkg/agent/netpol/utils" "github.com/rancher/k3s/pkg/daemons/config" @@ -48,7 +52,7 @@ func Run(ctx context.Context, nodeConfig *config.Node) error { informerFactory.Start(stopCh) informerFactory.WaitForCacheSync(stopCh) - npc, err := NewNetworkPolicyController(client, nodeConfig, podInformer, npInformer, nsInformer) + npc, err := NewNetworkPolicyController(client, nodeConfig, podInformer, npInformer, nsInformer, &sync.Mutex{}) if err != nil { return err } diff --git a/pkg/agent/netpol/network_policy_controller.go b/pkg/agent/netpol/network_policy_controller.go index 9cdb57ab0b..23b0317633 100644 --- a/pkg/agent/netpol/network_policy_controller.go +++ b/pkg/agent/netpol/network_policy_controller.go @@ -1,11 +1,12 @@ // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) -// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/network_policy_controller.go +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/network_policy_controller.go // +build !windows package netpol import ( + "bytes" "crypto/sha256" "encoding/base32" "fmt" @@ -22,7 +23,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - glog "k8s.io/klog" + "k8s.io/klog/v2" ) const ( @@ -33,6 +34,7 @@ const ( kubeInputChainName = "KUBE-ROUTER-INPUT" kubeForwardChainName = "KUBE-ROUTER-FORWARD" kubeOutputChainName = "KUBE-ROUTER-OUTPUT" + kubeDefaultNetpolChain = "KUBE-NWPLCY-DEFAULT" defaultSyncPeriod = 5 * time.Minute ) @@ -47,7 +49,7 @@ const ( // followed by one or more network policy chains, till there is a match which will accept the packet, or gets // dropped by the rule in the pod chain, if there is no match. -// NetworkPolicyController strcut to hold information required by NetworkPolicyController +// NetworkPolicyController struct to hold information required by NetworkPolicyController type NetworkPolicyController struct { nodeIP net.IP nodeHostName string @@ -57,6 +59,7 @@ type NetworkPolicyController struct { mu sync.Mutex syncPeriod time.Duration fullSyncRequestChan chan struct{} + ipsetMutex *sync.Mutex ipSetHandler *utils.IPSet @@ -67,6 +70,8 @@ type NetworkPolicyController struct { PodEventHandler cache.ResourceEventHandler NamespaceEventHandler cache.ResourceEventHandler NetworkPolicyEventHandler cache.ResourceEventHandler + + filterTableRules bytes.Buffer } // internal structure to represent a network policy @@ -119,6 +124,7 @@ type egressRule struct { type protocolAndPort struct { protocol string port string + endport string } type endPoints struct { @@ -135,29 +141,32 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) { t := time.NewTicker(npc.syncPeriod) defer t.Stop() - glog.Info("Starting network policy controller") + klog.Info("Starting network policy controller") - // setup kube-router specific top level cutoms chains + // setup kube-router specific top level custom chains (KUBE-ROUTER-INPUT, KUBE-ROUTER-FORWARD, KUBE-ROUTER-OUTPUT) npc.ensureTopLevelChains() + // setup default network policy chain that is applied to traffic from/to the pods that does not match any network policy + npc.ensureDefaultNetworkPolicyChain() + // Full syncs of the network policy controller take a lot of time and can only be processed one at a time, // therefore, we start it in it's own goroutine and request a sync through a single item channel - glog.Info("Starting network policy controller full sync goroutine") + klog.Info("Starting network policy controller full sync goroutine") go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}) { for { // Add an additional non-blocking select to ensure that if the stopCh channel is closed it is handled first select { case <-stopCh: - glog.Info("Shutting down network policies full sync goroutine") + klog.Info("Shutting down network policies full sync goroutine") return default: } select { case <-stopCh: - glog.Info("Shutting down network policies full sync goroutine") + klog.Info("Shutting down network policies full sync goroutine") return case <-fullSyncRequest: - glog.V(3).Info("Received request for a full sync, processing") + klog.V(3).Info("Received request for a full sync, processing") npc.fullPolicySync() // fullPolicySync() is a blocking request here } } @@ -165,11 +174,11 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) { // loop forever till notified to stop on stopCh for { - glog.V(1).Info("Requesting periodic sync of iptables to reflect network policies") + klog.V(1).Info("Requesting periodic sync of iptables to reflect network policies") npc.RequestFullSync() select { case <-stopCh: - glog.Infof("Shutting down network policies controller") + klog.Infof("Shutting down network policies controller") return case <-t.C: } @@ -180,9 +189,9 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) { func (npc *NetworkPolicyController) RequestFullSync() { select { case npc.fullSyncRequestChan <- struct{}{}: - glog.V(3).Info("Full sync request queue was empty so a full sync request was successfully sent") + klog.V(3).Info("Full sync request queue was empty so a full sync request was successfully sent") default: // Don't block if the buffered channel is full, return quickly so that we don't block callee execution - glog.V(1).Info("Full sync request queue was full, skipping...") + klog.V(1).Info("Full sync request queue was full, skipping...") } } @@ -198,35 +207,55 @@ func (npc *NetworkPolicyController) fullPolicySync() { syncVersion := strconv.FormatInt(start.UnixNano(), 10) defer func() { endTime := time.Since(start) - glog.V(1).Infof("sync iptables took %v", endTime) + klog.V(1).Infof("sync iptables took %v", endTime) }() - glog.V(1).Infof("Starting sync of iptables with version: %s", syncVersion) + klog.V(1).Infof("Starting sync of iptables with version: %s", syncVersion) // ensure kube-router specific top level chains and corresponding rules exist npc.ensureTopLevelChains() + // ensure default network policy chain that is applied to traffic from/to the pods that does not match any network policy + npc.ensureDefaultNetworkPolicyChain() + networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo() if err != nil { - glog.Errorf("Aborting sync. Failed to build network policies: %v", err.Error()) + klog.Errorf("Aborting sync. Failed to build network policies: %v", err.Error()) + return + } + + npc.filterTableRules.Reset() + if err := utils.SaveInto("filter", &npc.filterTableRules); err != nil { + klog.Errorf("Aborting sync. Failed to run iptables-save: %v" + err.Error()) return } activePolicyChains, activePolicyIPSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion) if err != nil { - glog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error()) + klog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error()) return } activePodFwChains, err := npc.syncPodFirewallChains(networkPoliciesInfo, syncVersion) if err != nil { - glog.Errorf("Aborting sync. Failed to sync pod firewalls: %v", err.Error()) + klog.Errorf("Aborting sync. Failed to sync pod firewalls: %v", err.Error()) return } - err = cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets) + err = npc.cleanupStaleRules(activePolicyChains, activePodFwChains) if err != nil { - glog.Errorf("Aborting sync. Failed to cleanup stale iptables rules: %v", err.Error()) + klog.Errorf("Aborting sync. Failed to cleanup stale iptables rules: %v", err.Error()) + return + } + + if err := utils.Restore("filter", npc.filterTableRules.Bytes()); err != nil { + klog.Errorf("Aborting sync. Failed to run iptables-restore: %v\n%s", err.Error(), npc.filterTableRules.String()) + return + } + + err = npc.cleanupStaleIPSets(activePolicyIPSets) + if err != nil { + klog.Errorf("Failed to cleanup stale ipsets: %v", err.Error()) return } } @@ -240,7 +269,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { iptablesCmdHandler, err := iptables.New() if err != nil { - glog.Fatalf("Failed to initialize iptables executor due to %s", err.Error()) + klog.Fatalf("Failed to initialize iptables executor due to %s", err.Error()) } addUUIDForRuleSpec := func(chain string, ruleSpec *[]string) (string, error) { @@ -258,18 +287,18 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { ensureRuleAtPosition := func(chain string, ruleSpec []string, uuid string, position int) { exists, err := iptablesCmdHandler.Exists("filter", chain, ruleSpec...) if err != nil { - glog.Fatalf("Failed to verify rule exists in %s chain due to %s", chain, err.Error()) + klog.Fatalf("Failed to verify rule exists in %s chain due to %s", chain, err.Error()) } if !exists { err := iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...) if err != nil { - glog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error()) + klog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error()) } return } rules, err := iptablesCmdHandler.List("filter", chain) if err != nil { - glog.Fatalf("failed to list rules in filter table %s chain due to %s", chain, err.Error()) + klog.Fatalf("failed to list rules in filter table %s chain due to %s", chain, err.Error()) } var ruleNo, ruleIndexOffset int @@ -290,11 +319,11 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { if ruleNo != position { err = iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...) if err != nil { - glog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error()) + klog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error()) } err = iptablesCmdHandler.Delete("filter", chain, strconv.Itoa(ruleNo+1)) if err != nil { - glog.Fatalf("Failed to delete incorrect rule in %s chain due to %s", chain, err.Error()) + klog.Fatalf("Failed to delete incorrect rule in %s chain due to %s", chain, err.Error()) } } } @@ -304,12 +333,12 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { for builtinChain, customChain := range chains { err = iptablesCmdHandler.NewChain("filter", customChain) if err != nil && err.(*iptables.Error).ExitStatus() != 1 { - glog.Fatalf("Failed to run iptables command to create %s chain due to %s", customChain, err.Error()) + klog.Fatalf("Failed to run iptables command to create %s chain due to %s", customChain, err.Error()) } args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain} uuid, err := addUUIDForRuleSpec(builtinChain, &args) if err != nil { - glog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) } ensureRuleAtPosition(builtinChain, args, uuid, 1) } @@ -317,7 +346,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to cluster IP", "-d", npc.serviceClusterIPRange.String(), "-j", "RETURN"} uuid, err := addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips) if err != nil { - glog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) } ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, 1) @@ -325,7 +354,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { "-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"} uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistTCPNodeports) if err != nil { - glog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) } ensureRuleAtPosition(kubeInputChainName, whitelistTCPNodeports, uuid, 2) @@ -333,7 +362,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { "-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"} uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistUDPNodeports) if err != nil { - glog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) } ensureRuleAtPosition(kubeInputChainName, whitelistUDPNodeports, uuid, 3) @@ -341,40 +370,66 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to external IP range: " + externalIPRange.String(), "-d", externalIPRange.String(), "-j", "RETURN"} uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips) if err != nil { - glog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) } ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, externalIPIndex+4) } + // for the traffic to/from the local pod's let network policy controller be + // authoritative entity to ACCEPT the traffic if it complies to network policies + for _, chain := range chains { + comment := "rule to explicitly ACCEPT traffic that comply to network policies" + args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", "0x20000/0x20000", "-j", "ACCEPT"} + err = iptablesCmdHandler.AppendUnique("filter", chain, args...) + if err != nil { + klog.Fatalf("Failed to run iptables command: %s", err.Error()) + } + } } -func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets map[string]bool) error { +// Creates custom chains KUBE-NWPLCY-DEFAULT +func (npc *NetworkPolicyController) ensureDefaultNetworkPolicyChain() { + + iptablesCmdHandler, err := iptables.New() + if err != nil { + klog.Fatalf("Failed to initialize iptables executor due to %s", err.Error()) + } + + markArgs := make([]string, 0) + markComment := "rule to mark traffic matching a network policy" + markArgs = append(markArgs, "-j", "MARK", "-m", "comment", "--comment", markComment, "--set-xmark", "0x10000/0x10000") + + err = iptablesCmdHandler.NewChain("filter", kubeDefaultNetpolChain) + if err != nil && err.(*iptables.Error).ExitStatus() != 1 { + klog.Fatalf("Failed to run iptables command to create %s chain due to %s", kubeDefaultNetpolChain, err.Error()) + } + err = iptablesCmdHandler.AppendUnique("filter", kubeDefaultNetpolChain, markArgs...) + if err != nil { + klog.Fatalf("Failed to run iptables command: %s", err.Error()) + } +} + +func (npc *NetworkPolicyController) cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) error { cleanupPodFwChains := make([]string, 0) cleanupPolicyChains := make([]string, 0) - cleanupPolicyIPSets := make([]*utils.Set, 0) // initialize tool sets for working with iptables and ipset iptablesCmdHandler, err := iptables.New() if err != nil { - glog.Fatalf("failed to initialize iptables command executor due to %s", err.Error()) - } - ipsets, err := utils.NewIPSet(false) - if err != nil { - glog.Fatalf("failed to create ipsets command executor due to %s", err.Error()) - } - err = ipsets.Save() - if err != nil { - glog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error()) + return fmt.Errorf("failed to initialize iptables command executor due to %s", err.Error()) } // find iptables chains and ipsets that are no longer used by comparing current to the active maps we were passed chains, err := iptablesCmdHandler.ListChains("filter") if err != nil { - return fmt.Errorf("Unable to list chains: %s", err) + return fmt.Errorf("unable to list chains: %s", err) } for _, chain := range chains { if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) { + if chain == kubeDefaultNetpolChain { + continue + } if _, ok := activePolicyChains[chain]; !ok { cleanupPolicyChains = append(cleanupPolicyChains, chain) } @@ -385,6 +440,58 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets } } } + + var newChains, newRules, desiredFilterTable bytes.Buffer + rules := strings.Split(npc.filterTableRules.String(), "\n") + if len(rules) > 0 && rules[len(rules)-1] == "" { + rules = rules[:len(rules)-1] + } + for _, rule := range rules { + skipRule := false + for _, podFWChainName := range cleanupPodFwChains { + if strings.Contains(rule, podFWChainName) { + skipRule = true + break + } + } + for _, policyChainName := range cleanupPolicyChains { + if strings.Contains(rule, policyChainName) { + skipRule = true + break + } + } + if strings.Contains(rule, "COMMIT") || strings.HasPrefix(rule, "# ") { + skipRule = true + } + if skipRule { + continue + } + if strings.HasPrefix(rule, ":") { + newChains.WriteString(rule + " - [0:0]\n") + } + if strings.HasPrefix(rule, "-") { + newRules.WriteString(rule + "\n") + } + } + desiredFilterTable.WriteString("*filter" + "\n") + desiredFilterTable.Write(newChains.Bytes()) + desiredFilterTable.Write(newRules.Bytes()) + desiredFilterTable.WriteString("COMMIT" + "\n") + npc.filterTableRules = desiredFilterTable + + return nil +} + +func (npc *NetworkPolicyController) cleanupStaleIPSets(activePolicyIPSets map[string]bool) error { + cleanupPolicyIPSets := make([]*utils.Set, 0) + ipsets, err := utils.NewIPSet(false) + if err != nil { + return fmt.Errorf("failed to create ipsets command executor due to %s", err.Error()) + } + err = ipsets.Save() + if err != nil { + klog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error()) + } for _, set := range ipsets.Sets { if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) || strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) { @@ -393,83 +500,11 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets } } } - - // remove stale iptables podFwChain references from the filter table chains - for _, podFwChain := range cleanupPodFwChains { - - primaryChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName} - for _, egressChain := range primaryChains { - forwardChainRules, err := iptablesCmdHandler.List("filter", egressChain) - if err != nil { - return fmt.Errorf("failed to list rules in filter table, %s podFwChain due to %s", egressChain, err.Error()) - } - - // TODO delete rule by spec, than rule number to avoid extra loop - var realRuleNo int - for i, rule := range forwardChainRules { - if strings.Contains(rule, podFwChain) { - err = iptablesCmdHandler.Delete("filter", egressChain, strconv.Itoa(i-realRuleNo)) - if err != nil { - return fmt.Errorf("failed to delete rule: %s from the %s podFwChain of filter table due to %s", rule, egressChain, err.Error()) - } - realRuleNo++ - } - } - } - } - - // cleanup pod firewall chain - for _, chain := range cleanupPodFwChains { - glog.V(2).Infof("Found pod fw chain to cleanup: %s", chain) - err = iptablesCmdHandler.ClearChain("filter", chain) - if err != nil { - return fmt.Errorf("Failed to flush the rules in chain %s due to %s", chain, err.Error()) - } - err = iptablesCmdHandler.DeleteChain("filter", chain) - if err != nil { - return fmt.Errorf("Failed to delete the chain %s due to %s", chain, err.Error()) - } - glog.V(2).Infof("Deleted pod specific firewall chain: %s from the filter table", chain) - } - - // cleanup network policy chains - for _, policyChain := range cleanupPolicyChains { - glog.V(2).Infof("Found policy chain to cleanup %s", policyChain) - - // first clean up any references from active pod firewall chains - for podFwChain := range activePodFwChains { - podFwChainRules, err := iptablesCmdHandler.List("filter", podFwChain) - if err != nil { - return fmt.Errorf("Unable to list rules from the chain %s: %s", podFwChain, err) - } - for i, rule := range podFwChainRules { - if strings.Contains(rule, policyChain) { - err = iptablesCmdHandler.Delete("filter", podFwChain, strconv.Itoa(i)) - if err != nil { - return fmt.Errorf("Failed to delete rule %s from the chain %s", rule, podFwChain) - } - break - } - } - } - - // now that all stale and active references to the network policy chain have been removed, delete the chain - err = iptablesCmdHandler.ClearChain("filter", policyChain) - if err != nil { - return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err) - } - err = iptablesCmdHandler.DeleteChain("filter", policyChain) - if err != nil { - return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err) - } - glog.V(2).Infof("Deleted network policy chain: %s from the filter table", policyChain) - } - // cleanup network policy ipsets for _, set := range cleanupPolicyIPSets { err = set.Destroy() if err != nil { - return fmt.Errorf("Failed to delete ipset %s due to %s", set.Name, err) + return fmt.Errorf("failed to delete ipset %s due to %s", set.Name, err) } } return nil @@ -478,17 +513,18 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets // Cleanup cleanup configurations done func (npc *NetworkPolicyController) Cleanup() { - glog.Info("Cleaning up iptables configuration permanently done by kube-router") + klog.Info("Cleaning up iptables configuration permanently done by kube-router") iptablesCmdHandler, err := iptables.New() if err != nil { - glog.Errorf("Failed to initialize iptables executor: %s", err.Error()) + klog.Errorf("Failed to initialize iptables executor: %s", err.Error()) + return } // delete jump rules in FORWARD chain to pod specific firewall chain forwardChainRules, err := iptablesCmdHandler.List("filter", kubeForwardChainName) if err != nil { - glog.Errorf("Failed to delete iptables rules as part of cleanup") + klog.Errorf("Failed to delete iptables rules as part of cleanup") return } @@ -498,7 +534,7 @@ func (npc *NetworkPolicyController) Cleanup() { if strings.Contains(rule, kubePodFirewallChainPrefix) { err = iptablesCmdHandler.Delete("filter", kubeForwardChainName, strconv.Itoa(i-realRuleNo)) if err != nil { - glog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err) + klog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err) } realRuleNo++ } @@ -507,7 +543,7 @@ func (npc *NetworkPolicyController) Cleanup() { // delete jump rules in OUTPUT chain to pod specific firewall chain forwardChainRules, err = iptablesCmdHandler.List("filter", kubeOutputChainName) if err != nil { - glog.Errorf("Failed to delete iptables rules as part of cleanup") + klog.Errorf("Failed to delete iptables rules as part of cleanup") return } @@ -517,7 +553,7 @@ func (npc *NetworkPolicyController) Cleanup() { if strings.Contains(rule, kubePodFirewallChainPrefix) { err = iptablesCmdHandler.Delete("filter", kubeOutputChainName, strconv.Itoa(i-realRuleNo)) if err != nil { - glog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err) + klog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err) } realRuleNo++ } @@ -526,19 +562,19 @@ func (npc *NetworkPolicyController) Cleanup() { // flush and delete pod specific firewall chain chains, err := iptablesCmdHandler.ListChains("filter") if err != nil { - glog.Errorf("Unable to list chains: %s", err) + klog.Errorf("Unable to list chains: %s", err) return } for _, chain := range chains { if strings.HasPrefix(chain, kubePodFirewallChainPrefix) { err = iptablesCmdHandler.ClearChain("filter", chain) if err != nil { - glog.Errorf("Failed to cleanup iptables rules: " + err.Error()) + klog.Errorf("Failed to cleanup iptables rules: " + err.Error()) return } err = iptablesCmdHandler.DeleteChain("filter", chain) if err != nil { - glog.Errorf("Failed to cleanup iptables rules: " + err.Error()) + klog.Errorf("Failed to cleanup iptables rules: " + err.Error()) return } } @@ -547,45 +583,53 @@ func (npc *NetworkPolicyController) Cleanup() { // flush and delete per network policy specific chain chains, err = iptablesCmdHandler.ListChains("filter") if err != nil { - glog.Errorf("Unable to list chains: %s", err) + klog.Errorf("Unable to list chains: %s", err) return } for _, chain := range chains { if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) { err = iptablesCmdHandler.ClearChain("filter", chain) if err != nil { - glog.Errorf("Failed to cleanup iptables rules: " + err.Error()) + klog.Errorf("Failed to cleanup iptables rules: " + err.Error()) return } err = iptablesCmdHandler.DeleteChain("filter", chain) if err != nil { - glog.Errorf("Failed to cleanup iptables rules: " + err.Error()) + klog.Errorf("Failed to cleanup iptables rules: " + err.Error()) return } } } // delete all ipsets + klog.V(1).Infof("Attempting to attain ipset mutex lock") + npc.ipsetMutex.Lock() + klog.V(1).Infof("Attained ipset mutex lock, continuing...") + defer func() { + npc.ipsetMutex.Unlock() + klog.V(1).Infof("Returned ipset mutex lock") + }() ipset, err := utils.NewIPSet(false) if err != nil { - glog.Errorf("Failed to clean up ipsets: " + err.Error()) + klog.Errorf("Failed to clean up ipsets: " + err.Error()) + return } err = ipset.Save() if err != nil { - glog.Errorf("Failed to clean up ipsets: " + err.Error()) + klog.Errorf("Failed to clean up ipsets: " + err.Error()) } err = ipset.DestroyAllWithin() if err != nil { - glog.Errorf("Failed to clean up ipsets: " + err.Error()) + klog.Errorf("Failed to clean up ipsets: " + err.Error()) } - glog.Infof("Successfully cleaned the iptables configuration done by kube-router") + klog.Infof("Successfully cleaned the iptables configuration done by kube-router") } // NewNetworkPolicyController returns new NetworkPolicyController object func NewNetworkPolicyController(clientset kubernetes.Interface, config *config.Node, podInformer cache.SharedIndexInformer, - npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) { - npc := NetworkPolicyController{} + npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkPolicyController, error) { + npc := NetworkPolicyController{ipsetMutex: ipsetMutex} // Creating a single-item buffered channel to ensure that we only keep a single full sync request at a time, // additional requests would be pointless to queue since after the first one was processed the system would already @@ -601,24 +645,12 @@ func NewNetworkPolicyController(clientset kubernetes.Interface, return nil, err } - npc.nodeHostName = node.Name - nodeIP, err := utils.GetNodeIP(node) if err != nil { return nil, err } npc.nodeIP = nodeIP - ipset, err := utils.NewIPSet(false) - if err != nil { - return nil, err - } - err = ipset.Save() - if err != nil { - return nil, err - } - npc.ipSetHandler = ipset - npc.podLister = podInformer.GetIndexer() npc.PodEventHandler = npc.newPodEventHandler() diff --git a/pkg/agent/netpol/network_policy_controller_test.go b/pkg/agent/netpol/network_policy_controller_test.go index d6bde79b38..b58ac39d3a 100644 --- a/pkg/agent/netpol/network_policy_controller_test.go +++ b/pkg/agent/netpol/network_policy_controller_test.go @@ -1,19 +1,24 @@ // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) -// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/network_policy_controller_test.go +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/network_policy_controller_test.go // +build !windows package netpol import ( + "bytes" "context" + "fmt" "net" "strings" + "sync" "testing" "time" + "github.com/rancher/k3s/pkg/daemons/config" netv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/cache" v1 "k8s.io/api/core/v1" @@ -23,8 +28,6 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" - - "github.com/rancher/k3s/pkg/daemons/config" ) // newFakeInformersFromClient creates the different informers used in the uneventful network policy controller @@ -213,6 +216,7 @@ type tNetpolTestCase struct { targetPods tPodNamespaceMap inSourcePods tPodNamespaceMap outDestPods tPodNamespaceMap + expectedRule string } // tGetNotTargetedPods finds set of pods that should not be targeted by netpol selectors @@ -417,6 +421,182 @@ func TestNewNetworkPolicySelectors(t *testing.T) { } } +func TestNetworkPolicyBuilder(t *testing.T) { + port, port1 := intstr.FromInt(30000), intstr.FromInt(34000) + ingressPort := intstr.FromInt(37000) + endPort, endPort1 := int32(31000), int32(35000) + testCases := []tNetpolTestCase{ + { + name: "Simple Egress Destination Port", + netpol: tNetpol{name: "simple-egress", namespace: "nsA", + podSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: "In", + Values: []string{"a"}, + }, + }, + }, + egress: []netv1.NetworkPolicyEgressRule{ + { + Ports: []netv1.NetworkPolicyPort{ + { + Port: &port, + }, + }, + }, + }, + }, + expectedRule: "-A KUBE-NWPLCY-QHFGOTFJZFXUJVTH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-QHFGOTFJZFXUJVTH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + }, + { + name: "Simple Ingress/Egress Destination Port", + netpol: tNetpol{name: "simple-ingress-egress", namespace: "nsA", + podSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: "In", + Values: []string{"a"}, + }, + }, + }, + egress: []netv1.NetworkPolicyEgressRule{ + { + Ports: []netv1.NetworkPolicyPort{ + { + Port: &port, + }, + }, + }, + }, + ingress: []netv1.NetworkPolicyIngressRule{ + { + Ports: []netv1.NetworkPolicyPort{ + { + Port: &ingressPort, + }, + }, + }, + }, + }, + expectedRule: "-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n" + + "-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + }, + { + name: "Simple Egress Destination Port Range", + netpol: tNetpol{name: "simple-egress-pr", namespace: "nsA", + podSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: "In", + Values: []string{"a"}, + }, + }, + }, + egress: []netv1.NetworkPolicyEgressRule{ + { + Ports: []netv1.NetworkPolicyPort{ + { + Port: &port, + EndPort: &endPort, + }, + { + Port: &port1, + EndPort: &endPort1, + }, + }, + }, + }, + }, + expectedRule: "-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -m mark --mark 0x10000/0x10000 -j RETURN \n" + + "-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + }, + { + name: "Port > EndPort (invalid condition, should drop endport)", + netpol: tNetpol{name: "invalid-endport", namespace: "nsA", + podSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: "In", + Values: []string{"a"}, + }, + }, + }, + egress: []netv1.NetworkPolicyEgressRule{ + { + Ports: []netv1.NetworkPolicyPort{ + { + Port: &port1, + EndPort: &endPort, + }, + }, + }, + }, + }, + expectedRule: "-A KUBE-NWPLCY-2A4DPWPR5REBS66I -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-2A4DPWPR5REBS66I -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + }, + } + + client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", "10.10.10.10")}}) + informerFactory, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informerFactory.Start(ctx.Done()) + cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced) + krNetPol, _ := newUneventfulNetworkPolicyController(podInformer, netpolInformer, nsInformer) + tCreateFakePods(t, podInformer, nsInformer) + for _, test := range testCases { + test.netpol.createFakeNetpol(t, netpolInformer) + netpols, err := krNetPol.buildNetworkPoliciesInfo() + if err != nil { + t.Errorf("Problems building policies: %s", err) + } + for _, np := range netpols { + fmt.Printf(np.policyType) + if np.policyType == "egress" || np.policyType == "both" { + err = krNetPol.processEgressRules(np, "", nil, "1") + if err != nil { + t.Errorf("Error syncing the rules: %s", err) + } + } + if np.policyType == "ingress" || np.policyType == "both" { + err = krNetPol.processIngressRules(np, "", nil, "1") + if err != nil { + t.Errorf("Error syncing the rules: %s", err) + } + } + } + + if !bytes.Equal([]byte(test.expectedRule), krNetPol.filterTableRules.Bytes()) { + t.Errorf("Invalid rule %s created:\nExpected:\n%s \nGot:\n%s", test.name, test.expectedRule, krNetPol.filterTableRules.String()) + } + key := fmt.Sprintf("%s/%s", test.netpol.namespace, test.netpol.name) + obj, exists, err := krNetPol.npLister.GetByKey(key) + if err != nil { + t.Errorf("Failed to get Netpol from store: %s", err) + } + if exists { + err = krNetPol.npLister.Delete(obj) + if err != nil { + t.Errorf("Failed to remove Netpol from store: %s", err) + } + } + krNetPol.filterTableRules.Reset() + + } + +} + func TestNetworkPolicyController(t *testing.T) { testCases := []tNetPolConfigTestCase{ { @@ -429,7 +609,7 @@ func TestNetworkPolicyController(t *testing.T) { "Missing nodename fails appropriately", newMinimalNodeConfig("", "", "", nil), true, - "Failed to identify the node by NODE_NAME, hostname or --hostname-override", + "failed to identify the node by NODE_NAME, hostname or --hostname-override", }, { "Test good cluster CIDR (using single IP with a /32)", @@ -466,7 +646,7 @@ func TestNetworkPolicyController(t *testing.T) { _, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client) for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - _, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer) + _, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer, &sync.Mutex{}) if err == nil && test.expectError { t.Error("This config should have failed, but it was successful instead") } else if err != nil { diff --git a/pkg/agent/netpol/pod.go b/pkg/agent/netpol/pod.go index f0ca85716d..ff22387e96 100644 --- a/pkg/agent/netpol/pod.go +++ b/pkg/agent/netpol/pod.go @@ -1,5 +1,5 @@ // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) -// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/pod.go +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/pod.go // +build !windows @@ -8,26 +8,40 @@ package netpol import ( "crypto/sha256" "encoding/base32" - "fmt" "strings" - "github.com/coreos/go-iptables/iptables" api "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" - glog "k8s.io/klog" + "k8s.io/klog/v2" ) func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - npc.OnPodUpdate(obj) - + if podObj, ok := obj.(*api.Pod); ok { + // If the pod isn't yet actionable there is no action to take here anyway, so skip it. When it becomes + // actionable, we'll get an update below. + if isNetPolActionable(podObj) { + npc.OnPodUpdate(obj) + } + } }, UpdateFunc: func(oldObj, newObj interface{}) { - newPoObj := newObj.(*api.Pod) - oldPoObj := oldObj.(*api.Pod) - if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP { - // for the network policies, we are only interested in pod status phase change or IP change + var newPodObj, oldPodObj *api.Pod + var ok bool + + // If either of these objects are not pods, quit now + if newPodObj, ok = newObj.(*api.Pod); !ok { + return + } + if oldPodObj, ok = oldObj.(*api.Pod); !ok { + return + } + + // We don't check isNetPolActionable here, because if it is transitioning in or out of the actionable state + // we want to run the full sync so that it can be added or removed from the existing network policy of the host + // For the network policies, we are only interested in some changes, most pod changes aren't relevant to network policy + if isPodUpdateNetPolRelevant(oldPodObj, newPodObj) { npc.OnPodUpdate(newObj) } }, @@ -40,7 +54,7 @@ func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHand // OnPodUpdate handles updates to pods from the Kubernetes api server func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) { pod := obj.(*api.Pod) - glog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name) + klog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name) npc.RequestFullSync() } @@ -50,15 +64,15 @@ func (npc *NetworkPolicyController) handlePodDelete(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("unexpected object type: %v", obj) + klog.Errorf("unexpected object type: %v", obj) return } if pod, ok = tombstone.Obj.(*api.Pod); !ok { - glog.Errorf("unexpected object type: %v", obj) + klog.Errorf("unexpected object type: %v", obj) return } } - glog.V(2).Infof("Received pod: %s/%s delete event", pod.Namespace, pod.Name) + klog.V(2).Infof("Received pod: %s/%s delete event", pod.Namespace, pod.Name) npc.RequestFullSync() } @@ -67,315 +81,184 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo [] activePodFwChains := make(map[string]bool) - iptablesCmdHandler, err := iptables.New() - if err != nil { - glog.Fatalf("Failed to initialize iptables executor: %s", err.Error()) - } - dropUnmarkedTrafficRules := func(podName, podNamespace, podFwChainName string) error { // add rule to log the packets that will be dropped due to network policy enforcement - comment := "rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace - args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10"} - err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) - if err != nil { - return fmt.Errorf("Failed to run iptables command: %s", err.Error()) + comment := "\"rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace + "\"" + args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10", "\n"} + // This used to be AppendUnique when we were using iptables directly, this checks to make sure we didn't drop unmarked for this chain already + if strings.Contains(npc.filterTableRules.String(), strings.Join(args, " ")) { + return nil } + npc.filterTableRules.WriteString(strings.Join(args, " ")) // add rule to DROP if no applicable network policy permits the traffic - comment = "rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace - args = []string{"-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT"} - err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) - if err != nil { - return fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } + comment = "\"rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace + "\"" + args = []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT", "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) // reset mark to let traffic pass through rest of the chains - args = []string{"-j", "MARK", "--set-mark", "0/0x10000"} - err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) - if err != nil { - return fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } + args = []string{"-A", podFwChainName, "-j", "MARK", "--set-mark", "0/0x10000", "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) return nil } - // loop through the pods running on the node which to which ingress network policies to be applied - ingressNetworkPolicyEnabledPods, err := npc.getIngressNetworkPolicyEnabledPods(networkPoliciesInfo, npc.nodeIP.String()) + // loop through the pods running on the node + allLocalPods, err := npc.getLocalPods(npc.nodeIP.String()) if err != nil { return nil, err } - for _, pod := range *ingressNetworkPolicyEnabledPods { - - // below condition occurs when we get trasient update while removing or adding pod - // subsequent update will do the correct action - if len(pod.ip) == 0 || pod.ip == "" { - continue - } + for _, pod := range *allLocalPods { // ensure pod specific firewall chain exist for all the pods that need ingress firewall podFwChainName := podFirewallChainName(pod.namespace, pod.name, version) - err = iptablesCmdHandler.NewChain("filter", podFwChainName) - if err != nil && err.(*iptables.Error).ExitStatus() != 1 { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } + npc.filterTableRules.WriteString(":" + podFwChainName + "\n") + activePodFwChains[podFwChainName] = true - // add entries in pod firewall to run through required network policies - for _, policy := range networkPoliciesInfo { - if _, ok := policy.targetPods[pod.ip]; ok { - comment := "run through nw policy " + policy.name - policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) - args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName} - exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - if !exists { - err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) - if err != nil && err.(*iptables.Error).ExitStatus() != 1 { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } - } - } + // setup rules to run through applicable ingress/egress network policies for the pod + npc.setupPodNetpolRules(&pod, podFwChainName, networkPoliciesInfo, version) - comment := "rule to permit the traffic traffic to pods when source is the pod's local node" - args := []string{"-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT"} - exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - if !exists { - err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } + // setup rules to intercept inbound traffic to the pods + npc.interceptPodInboundTraffic(&pod, podFwChainName) - // ensure statefull firewall, that permits return traffic for the traffic originated by the pod - comment = "rule for stateful firewall for pod" - args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"} - exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - if !exists { - err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } - - // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain - // this rule applies to the traffic getting routed (coming for other node pods) - comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName - args = []string{"-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName} - exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - if !exists { - err := iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } - - // ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain - // this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy - exists, err = iptablesCmdHandler.Exists("filter", kubeOutputChainName, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - if !exists { - err := iptablesCmdHandler.Insert("filter", kubeOutputChainName, 1, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } - - // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain - // this rule applies to the traffic getting switched (coming for same node pods) - comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName - args = []string{"-m", "physdev", "--physdev-is-bridged", - "-m", "comment", "--comment", comment, - "-d", pod.ip, - "-j", podFwChainName} - exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - if !exists { - err = iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } + // setup rules to intercept inbound traffic to the pods + npc.interceptPodOutboundTraffic(&pod, podFwChainName) err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName) if err != nil { return nil, err } - } - // loop through the pods running on the node which egress network policies to be applied - egressNetworkPolicyEnabledPods, err := npc.getEgressNetworkPolicyEnabledPods(networkPoliciesInfo, npc.nodeIP.String()) - if err != nil { - return nil, err - } - for _, pod := range *egressNetworkPolicyEnabledPods { - - // below condition occurs when we get trasient update while removing or adding pod - // subsequent update will do the correct action - if len(pod.ip) == 0 || pod.ip == "" { - continue - } - - // ensure pod specific firewall chain exist for all the pods that need egress firewall - podFwChainName := podFirewallChainName(pod.namespace, pod.name, version) - err = iptablesCmdHandler.NewChain("filter", podFwChainName) - if err != nil && err.(*iptables.Error).ExitStatus() != 1 { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - activePodFwChains[podFwChainName] = true - - // add entries in pod firewall to run through required network policies - for _, policy := range networkPoliciesInfo { - if _, ok := policy.targetPods[pod.ip]; ok { - comment := "run through nw policy " + policy.name - policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) - args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName} - exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - if !exists { - err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) - if err != nil && err.(*iptables.Error).ExitStatus() != 1 { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } - } - } - - // ensure statefull firewall, that permits return traffic for the traffic originated by the pod - comment := "rule for stateful firewall for pod" - args := []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"} - exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - if !exists { - err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } - - egressFilterChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName} - for _, chain := range egressFilterChains { - // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain - // this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted - // to pod on a different node) - comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName - args = []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName} - exists, err = iptablesCmdHandler.Exists("filter", chain, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - if !exists { - err := iptablesCmdHandler.AppendUnique("filter", chain, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } - } - - // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain - // this rule applies to the traffic getting switched (coming for same node pods) - comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName - args = []string{"-m", "physdev", "--physdev-is-bridged", - "-m", "comment", "--comment", comment, - "-s", pod.ip, - "-j", podFwChainName} - exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - if !exists { - err = iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...) - if err != nil { - return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } - } - - err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName) - if err != nil { - return nil, err - } + // set mark to indicate traffic from/to the pod passed network policies. + // Mark will be checked to explicitly ACCEPT the traffic + comment := "\"set mark to ACCEPT traffic that comply to network policies\"" + args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-j", "MARK", "--set-mark", "0x20000/0x20000", "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) } return activePodFwChains, nil } -func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIP string) (*map[string]podInfo, error) { - nodePods := make(map[string]podInfo) +// setup rules to jump to applicable network policy chains for the traffic from/to the pod +func (npc *NetworkPolicyController) setupPodNetpolRules(pod *podInfo, podFwChainName string, networkPoliciesInfo []networkPolicyInfo, version string) { - for _, obj := range npc.podLister.List() { - pod := obj.(*api.Pod) + hasIngressPolicy := false + hasEgressPolicy := false - if strings.Compare(pod.Status.HostIP, nodeIP) != 0 { + // add entries in pod firewall to run through applicable network policies + for _, policy := range networkPoliciesInfo { + if _, ok := policy.targetPods[pod.ip]; !ok { continue } - for _, policy := range networkPoliciesInfo { - if policy.namespace != pod.ObjectMeta.Namespace { - continue - } - _, ok := policy.targetPods[pod.Status.PodIP] - if ok && (policy.policyType == "both" || policy.policyType == "ingress") { - glog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.") - nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, - name: pod.ObjectMeta.Name, - namespace: pod.ObjectMeta.Namespace, - labels: pod.ObjectMeta.Labels} - break - } + comment := "\"run through nw policy " + policy.name + "\"" + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) + var args []string + switch policy.policyType { + case "both": + hasIngressPolicy = true + hasEgressPolicy = true + args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"} + case "ingress": + hasIngressPolicy = true + args = []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"} + case "egress": + hasEgressPolicy = true + args = []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"} } + npc.filterTableRules.WriteString(strings.Join(args, " ")) } - return &nodePods, nil + + // if pod does not have any network policy which applies rules for pod's ingress traffic + // then apply default network policy + if !hasIngressPolicy { + comment := "\"run through default ingress network policy chain\"" + args := []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, "-j", kubeDefaultNetpolChain, "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + } + + // if pod does not have any network policy which applies rules for pod's egress traffic + // then apply default network policy + if !hasEgressPolicy { + comment := "\"run through default egress network policy chain\"" + args := []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, "-j", kubeDefaultNetpolChain, "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + } + + comment := "\"rule to permit the traffic traffic to pods when source is the pod's local node\"" + args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT", "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + + // ensure statefull firewall that permits RELATED,ESTABLISHED traffic from/to the pod + comment = "\"rule for stateful firewall for pod\"" + args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) } -func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIP string) (*map[string]podInfo, error) { +func (npc *NetworkPolicyController) interceptPodInboundTraffic(pod *podInfo, podFwChainName string) { + // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain + // this rule applies to the traffic getting routed (coming for other node pods) + comment := "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args := []string{"-I", kubeForwardChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) - nodePods := make(map[string]podInfo) + // ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain + // this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy + args = []string{"-I", kubeOutputChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain + // this rule applies to the traffic getting switched (coming for same node pods) + comment = "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args = []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged", + "-m", "comment", "--comment", comment, + "-d", pod.ip, + "-j", podFwChainName, "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) +} + +// setup iptable rules to intercept outbound traffic from pods and run it across the +// firewall chain corresponding to the pod so that egress network policies are enforced +func (npc *NetworkPolicyController) interceptPodOutboundTraffic(pod *podInfo, podFwChainName string) { + egressFilterChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName} + for _, chain := range egressFilterChains { + // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain + // this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted + // to pod on a different node) + comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args := []string{"-I", chain, "1", "-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName, "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) + } + + // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain + // this rule applies to the traffic getting switched (coming for same node pods) + comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args := []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged", + "-m", "comment", "--comment", comment, + "-s", pod.ip, + "-j", podFwChainName, "\n"} + npc.filterTableRules.WriteString(strings.Join(args, " ")) +} + +func (npc *NetworkPolicyController) getLocalPods(nodeIP string) (*map[string]podInfo, error) { + localPods := make(map[string]podInfo) for _, obj := range npc.podLister.List() { pod := obj.(*api.Pod) - - if strings.Compare(pod.Status.HostIP, nodeIP) != 0 { + // ignore the pods running on the different node and pods that are not actionable + if strings.Compare(pod.Status.HostIP, nodeIP) != 0 || !isNetPolActionable(pod) { continue } - for _, policy := range networkPoliciesInfo { - if policy.namespace != pod.ObjectMeta.Namespace { - continue - } - _, ok := policy.targetPods[pod.Status.PodIP] - if ok && (policy.policyType == "both" || policy.policyType == "egress") { - glog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.") - nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, - name: pod.ObjectMeta.Name, - namespace: pod.ObjectMeta.Namespace, - labels: pod.ObjectMeta.Labels} - break - } - } + localPods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, + name: pod.ObjectMeta.Name, + namespace: pod.ObjectMeta.Namespace, + labels: pod.ObjectMeta.Labels} } - return &nodePods, nil + return &localPods, nil } func podFirewallChainName(namespace, podName string, version string) string { diff --git a/pkg/agent/netpol/policy.go b/pkg/agent/netpol/policy.go index 3590bf731b..7b3342bdf8 100644 --- a/pkg/agent/netpol/policy.go +++ b/pkg/agent/netpol/policy.go @@ -1,5 +1,5 @@ // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) -// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/pod.go +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/policy.go // +build !windows @@ -14,7 +14,6 @@ import ( "strings" "time" - "github.com/coreos/go-iptables/iptables" "github.com/rancher/k3s/pkg/agent/netpol/utils" api "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" @@ -23,7 +22,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" - glog "k8s.io/klog" + "k8s.io/klog/v2" ) func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler { @@ -45,7 +44,7 @@ func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.Resourc // OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) { netpol := obj.(*networking.NetworkPolicy) - glog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name) + klog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name) npc.RequestFullSync() } @@ -55,15 +54,15 @@ func (npc *NetworkPolicyController) handleNetworkPolicyDelete(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("unexpected object type: %v", obj) + klog.Errorf("unexpected object type: %v", obj) return } if netpol, ok = tombstone.Obj.(*networking.NetworkPolicy); !ok { - glog.Errorf("unexpected object type: %v", obj) + klog.Errorf("unexpected object type: %v", obj) return } } - glog.V(2).Infof("Received network policy: %s/%s delete event", netpol.Namespace, netpol.Name) + klog.V(2).Infof("Received network policy: %s/%s delete event", netpol.Namespace, netpol.Name) npc.RequestFullSync() } @@ -77,25 +76,36 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo start := time.Now() defer func() { endTime := time.Since(start) - glog.V(2).Infof("Syncing network policy chains took %v", endTime) + klog.V(2).Infof("Syncing network policy chains took %v", endTime) }() + + klog.V(1).Infof("Attempting to attain ipset mutex lock") + npc.ipsetMutex.Lock() + klog.V(1).Infof("Attained ipset mutex lock, continuing...") + defer func() { + npc.ipsetMutex.Unlock() + klog.V(1).Infof("Returned ipset mutex lock") + }() + + ipset, err := utils.NewIPSet(false) + if err != nil { + return nil, nil, err + } + err = ipset.Save() + if err != nil { + return nil, nil, err + } + npc.ipSetHandler = ipset + activePolicyChains := make(map[string]bool) activePolicyIPSets := make(map[string]bool) - iptablesCmdHandler, err := iptables.New() - if err != nil { - glog.Fatalf("Failed to initialize iptables executor due to: %s", err.Error()) - } - // run through all network policies for _, policy := range networkPoliciesInfo { // ensure there is a unique chain per network policy in filter table policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) - err := iptablesCmdHandler.NewChain("filter", policyChainName) - if err != nil && err.(*iptables.Error).ExitStatus() != 1 { - return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } + npc.filterTableRules.WriteString(":" + policyChainName + "\n") activePolicyChains[policyChainName] = true @@ -107,42 +117,39 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo if policy.policyType == "both" || policy.policyType == "ingress" { // create a ipset for all destination pod ip's matched by the policy spec PodSelector targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name) - targetDestPodIPSet, err := npc.ipSetHandler.Create(targetDestPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") - if err != nil { - return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) - } - err = targetDestPodIPSet.Refresh(currnetPodIps) - if err != nil { - glog.Errorf("failed to refresh targetDestPodIPSet,: " + err.Error()) + setEntries := make([][]string, 0) + for _, podIP := range currnetPodIps { + setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"}) } + npc.ipSetHandler.RefreshSet(targetDestPodIPSetName, setEntries, utils.TypeHashIP) err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version) if err != nil { return nil, nil, err } - activePolicyIPSets[targetDestPodIPSet.Name] = true + activePolicyIPSets[targetDestPodIPSetName] = true } - if policy.policyType == "both" || policy.policyType == "egress" { // create a ipset for all source pod ip's matched by the policy spec PodSelector targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name) - targetSourcePodIPSet, err := npc.ipSetHandler.Create(targetSourcePodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") - if err != nil { - return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) - } - err = targetSourcePodIPSet.Refresh(currnetPodIps) - if err != nil { - glog.Errorf("failed to refresh targetSourcePodIPSet: " + err.Error()) + setEntries := make([][]string, 0) + for _, podIP := range currnetPodIps { + setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"}) } + npc.ipSetHandler.RefreshSet(targetSourcePodIPSetName, setEntries, utils.TypeHashIP) err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version) if err != nil { return nil, nil, err } - activePolicyIPSets[targetSourcePodIPSet.Name] = true + activePolicyIPSets[targetSourcePodIPSetName] = true } - } - glog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.") + err = npc.ipSetHandler.Restore() + if err != nil { + return nil, nil, fmt.Errorf("failed to perform ipset restore: %s", err.Error()) + } + + klog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.") return activePolicyChains, activePolicyIPSets, nil } @@ -156,11 +163,6 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo return nil } - iptablesCmdHandler, err := iptables.New() - if err != nil { - return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error()) - } - policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) // run through all the ingress rules in the spec and create iptables rules @@ -169,21 +171,12 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo if len(ingressRule.srcPods) != 0 { srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, i) - srcPodIPSet, err := npc.ipSetHandler.Create(srcPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") - if err != nil { - return fmt.Errorf("failed to create ipset: %s", err.Error()) - } - - activePolicyIPSets[srcPodIPSet.Name] = true - - ingressRuleSrcPodIPs := make([]string, 0, len(ingressRule.srcPods)) + activePolicyIPSets[srcPodIPSetName] = true + setEntries := make([][]string, 0) for _, pod := range ingressRule.srcPods { - ingressRuleSrcPodIPs = append(ingressRuleSrcPodIPs, pod.ip) - } - err = srcPodIPSet.Refresh(ingressRuleSrcPodIPs) - if err != nil { - glog.Errorf("failed to refresh srcPodIPSet: " + err.Error()) + setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"}) } + npc.ipSetHandler.RefreshSet(srcPodIPSetName, setEntries, utils.TypeHashIP) if len(ingressRule.ports) != 0 { // case where 'ports' details and 'from' details specified in the ingress rule @@ -191,7 +184,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo for _, portProtocol := range ingressRule.ports { comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { return err } } @@ -200,18 +193,16 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo if len(ingressRule.namedPorts) != 0 { for j, endPoints := range ingressRule.namedPorts { namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) - namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") - if err != nil { - return fmt.Errorf("failed to create ipset: %s", err.Error()) - } - activePolicyIPSets[namedPortIPSet.Name] = true - err = namedPortIPSet.Refresh(endPoints.ips) - if err != nil { - glog.Errorf("failed to refresh namedPortIPSet: " + err.Error()) + activePolicyIPSets[namedPortIPSetName] = true + setEntries := make([][]string, 0) + for _, ip := range endPoints.ips { + setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"}) } + npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP) + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil { return err } } @@ -222,7 +213,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo // so match on specified source and destination ip with all port and protocol comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", "", ""); err != nil { return err } } @@ -234,27 +225,23 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo for _, portProtocol := range ingressRule.ports { comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { return err } } for j, endPoints := range ingressRule.namedPorts { namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) - namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") - if err != nil { - return fmt.Errorf("failed to create ipset: %s", err.Error()) + activePolicyIPSets[namedPortIPSetName] = true + setEntries := make([][]string, 0) + for _, ip := range endPoints.ips { + setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"}) } + npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP) - activePolicyIPSets[namedPortIPSet.Name] = true - - err = namedPortIPSet.Refresh(endPoints.ips) - if err != nil { - glog.Errorf("failed to refresh namedPortIPSet: " + err.Error()) - } comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil { return err } } @@ -265,47 +252,36 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo if ingressRule.matchAllSource && ingressRule.matchAllPorts { comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, "", "", ""); err != nil { return err } } if len(ingressRule.srcIPBlocks) != 0 { srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, i) - srcIPBlockIPSet, err := npc.ipSetHandler.Create(srcIPBlockIPSetName, utils.TypeHashNet, utils.OptionTimeout, "0") - if err != nil { - return fmt.Errorf("failed to create ipset: %s", err.Error()) - } - activePolicyIPSets[srcIPBlockIPSet.Name] = true - err = srcIPBlockIPSet.RefreshWithBuiltinOptions(ingressRule.srcIPBlocks) - if err != nil { - glog.Errorf("failed to refresh srcIPBlockIPSet: " + err.Error()) - } + activePolicyIPSets[srcIPBlockIPSetName] = true + npc.ipSetHandler.RefreshSet(srcIPBlockIPSetName, ingressRule.srcIPBlocks, utils.TypeHashNet) + if !ingressRule.matchAllPorts { for _, portProtocol := range ingressRule.ports { comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { return err } } for j, endPoints := range ingressRule.namedPorts { namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) - namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") - if err != nil { - return fmt.Errorf("failed to create ipset: %s", err.Error()) - } - - activePolicyIPSets[namedPortIPSet.Name] = true - - err = namedPortIPSet.Refresh(endPoints.ips) - if err != nil { - glog.Errorf("failed to refresh namedPortIPSet: " + err.Error()) + activePolicyIPSets[namedPortIPSetName] = true + setEntries := make([][]string, 0) + for _, ip := range endPoints.ips { + setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"}) } + npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashNet) comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil { return err } } @@ -313,7 +289,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo if ingressRule.matchAllPorts { comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", "", ""); err != nil { return err } } @@ -332,11 +308,6 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, return nil } - iptablesCmdHandler, err := iptables.New() - if err != nil { - return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error()) - } - policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) // run through all the egress rules in the spec and create iptables rules @@ -345,28 +316,19 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, if len(egressRule.dstPods) != 0 { dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, i) - dstPodIPSet, err := npc.ipSetHandler.Create(dstPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") - if err != nil { - return fmt.Errorf("failed to create ipset: %s", err.Error()) - } - - activePolicyIPSets[dstPodIPSet.Name] = true - - egressRuleDstPodIps := make([]string, 0, len(egressRule.dstPods)) + activePolicyIPSets[dstPodIPSetName] = true + setEntries := make([][]string, 0) for _, pod := range egressRule.dstPods { - egressRuleDstPodIps = append(egressRuleDstPodIps, pod.ip) - } - err = dstPodIPSet.Refresh(egressRuleDstPodIps) - if err != nil { - glog.Errorf("failed to refresh dstPodIPSet: " + err.Error()) + setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"}) } + npc.ipSetHandler.RefreshSet(dstPodIPSetName, setEntries, utils.TypeHashIP) if len(egressRule.ports) != 0 { // case where 'ports' details and 'from' details specified in the egress rule // so match on specified source and destination ip's and specified port (if any) and protocol for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { return err } } @@ -375,20 +337,15 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, if len(egressRule.namedPorts) != 0 { for j, endPoints := range egressRule.namedPorts { namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, i, j) - namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") - if err != nil { - return fmt.Errorf("failed to create ipset: %s", err.Error()) - } - - activePolicyIPSets[namedPortIPSet.Name] = true - - err = namedPortIPSet.Refresh(endPoints.ips) - if err != nil { - glog.Errorf("failed to refresh namedPortIPSet: " + err.Error()) + activePolicyIPSets[namedPortIPSetName] = true + setEntries := make([][]string, 0) + for _, ip := range endPoints.ips { + setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"}) } + npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP) comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil { return err } } @@ -400,7 +357,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, // so match on specified source and destination ip with all port and protocol comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", "", ""); err != nil { return err } } @@ -412,7 +369,14 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { + return err + } + } + for _, portProtocol := range egressRule.namedPorts { + comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { return err } } @@ -423,26 +387,19 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, if egressRule.matchAllDestinations && egressRule.matchAllPorts { comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", "", "", ""); err != nil { return err } } if len(egressRule.dstIPBlocks) != 0 { dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, i) - dstIPBlockIPSet, err := npc.ipSetHandler.Create(dstIPBlockIPSetName, utils.TypeHashNet, utils.OptionTimeout, "0") - if err != nil { - return fmt.Errorf("failed to create ipset: %s", err.Error()) - } - activePolicyIPSets[dstIPBlockIPSet.Name] = true - err = dstIPBlockIPSet.RefreshWithBuiltinOptions(egressRule.dstIPBlocks) - if err != nil { - glog.Errorf("failed to refresh dstIPBlockIPSet: " + err.Error()) - } + activePolicyIPSets[dstIPBlockIPSetName] = true + npc.ipSetHandler.RefreshSet(dstIPBlockIPSetName, egressRule.dstIPBlocks, utils.TypeHashNet) if !egressRule.matchAllPorts { for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { return err } } @@ -450,7 +407,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, if egressRule.matchAllPorts { comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", "", ""); err != nil { return err } } @@ -459,13 +416,13 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, return nil } -func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *iptables.IPTables, policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort string) error { - if iptablesCmdHandler == nil { - return fmt.Errorf("Failed to run iptables command: iptablesCmdHandler is nil") - } +func (npc *NetworkPolicyController) appendRuleToPolicyChain(policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort, endDport string) error { + args := make([]string, 0) + args = append(args, "-A", policyChainName) + if comment != "" { - args = append(args, "-m", "comment", "--comment", comment) + args = append(args, "-m", "comment", "--comment", "\""+comment+"\"") } if srcIPSetName != "" { args = append(args, "-m", "set", "--match-set", srcIPSetName, "src") @@ -477,22 +434,19 @@ func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler * args = append(args, "-p", protocol) } if dPort != "" { - args = append(args, "--dport", dPort) + if endDport != "" { + multiport := fmt.Sprintf("%s:%s", dPort, endDport) + args = append(args, "--dport", multiport) + } else { + args = append(args, "--dport", dPort) + } } - markComment := "rule to mark traffic matching a network policy" - markArgs := append(args, "-j", "MARK", "-m", "comment", "--comment", markComment, "--set-xmark", "0x10000/0x10000") - err := iptablesCmdHandler.AppendUnique("filter", policyChainName, markArgs...) - if err != nil { - return fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } + markArgs := append(args, "-j", "MARK", "--set-xmark", "0x10000/0x10000", "\n") + npc.filterTableRules.WriteString(strings.Join(markArgs, " ")) - returnComment := "rule to RETURN traffic matching a network policy" - returnArgs := append(args, "-m", "comment", "--comment", returnComment, "-m", "mark", "--mark", "0x10000/0x10000", "-j", "RETURN") - err = iptablesCmdHandler.AppendUnique("filter", policyChainName, returnArgs...) - if err != nil { - return fmt.Errorf("Failed to run iptables command: %s", err.Error()) - } + returnArgs := append(args, "-m", "mark", "--mark", "0x10000/0x10000", "-j", "RETURN", "\n") + npc.filterTableRules.WriteString(strings.Join(returnArgs, " ")) return nil } @@ -506,7 +460,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI policy, ok := policyObj.(*networking.NetworkPolicy) podSelector, _ := v1.LabelSelectorAsSelector(&policy.Spec.PodSelector) if !ok { - return nil, fmt.Errorf("Failed to convert") + return nil, fmt.Errorf("failed to convert") } newPolicy := networkPolicyInfo{ name: policy.Name, @@ -537,7 +491,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI namedPort2IngressEps := make(namedPort2eps) if err == nil { for _, matchingPod := range matchingPods { - if matchingPod.Status.PodIP == "" { + if !isNetPolActionable(matchingPod) { continue } newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, @@ -573,7 +527,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI for _, peer := range specIngressRule.From { if peerPods, err := npc.evalPodPeer(policy, peer); err == nil { for _, peerPod := range peerPods { - if peerPod.Status.PodIP == "" { + if !isNetPolActionable(peerPod) { continue } ingressRule.srcPods = append(ingressRule.srcPods, @@ -609,12 +563,23 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI // If this field is empty or missing in the spec, this rule matches all sources if len(specEgressRule.To) == 0 { egressRule.matchAllDestinations = true + // if rule.To is empty but rule.Ports not, we must try to grab NamedPort from pods that in same namespace, + // so that we can design iptables rule to describe "match all dst but match some named dst-port" egress rule + if policyRulePortsHasNamedPort(specEgressRule.Ports) { + matchingPeerPods, _ := npc.ListPodsByNamespaceAndLabels(policy.Namespace, labels.Everything()) + for _, peerPod := range matchingPeerPods { + if !isNetPolActionable(peerPod) { + continue + } + npc.grabNamedPortFromPod(peerPod, &namedPort2EgressEps) + } + } } else { egressRule.matchAllDestinations = false for _, peer := range specEgressRule.To { if peerPods, err := npc.evalPodPeer(policy, peer); err == nil { for _, peerPod := range peerPods { - if peerPod.Status.PodIP == "" { + if !isNetPolActionable(peerPod) { continue } egressRule.dstPods = append(egressRule.dstPods, @@ -683,13 +648,24 @@ func (npc *NetworkPolicyController) evalPodPeer(policy *networking.NetworkPolicy func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networking.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) { numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0) for _, npPort := range npPorts { + var protocol string + if npPort.Protocol != nil { + protocol = string(*npPort.Protocol) + } if npPort.Port == nil { - numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)}) + numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: protocol}) } else if npPort.Port.Type == intstr.Int { - numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)}) + var portproto protocolAndPort + if npPort.EndPort != nil { + if *npPort.EndPort >= npPort.Port.IntVal { + portproto.endport = strconv.Itoa(int(*npPort.EndPort)) + } + } + portproto.protocol, portproto.port = protocol, npPort.Port.String() + numericPorts = append(numericPorts, portproto) } else { if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok { - if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok { + if numericPort2eps, ok := protocol2eps[protocol]; ok { for _, eps := range numericPort2eps { namedPorts = append(namedPorts, *eps) } @@ -818,3 +794,12 @@ func policyIndexedEgressNamedPortIPSetName(namespace, policyName string, egressR encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeDestinationIPSetPrefix + encoded[:16] } + +func policyRulePortsHasNamedPort(npPorts []networking.NetworkPolicyPort) bool { + for _, npPort := range npPorts { + if npPort.Port != nil && npPort.Port.Type == intstr.String { + return true + } + } + return false +} diff --git a/pkg/agent/netpol/utils.go b/pkg/agent/netpol/utils.go new file mode 100644 index 0000000000..1b046285ae --- /dev/null +++ b/pkg/agent/netpol/utils.go @@ -0,0 +1,66 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/utils.go + +package netpol + +import ( + "fmt" + "reflect" + "regexp" + "strconv" + + api "k8s.io/api/core/v1" +) + +const ( + PodCompleted api.PodPhase = "Completed" +) + +// isPodUpdateNetPolRelevant checks the attributes that we care about for building NetworkPolicies on the host and if it +// finds a relevant change, it returns true otherwise it returns false. The things we care about for NetworkPolicies: +// 1) Is the phase of the pod changing? (matters for catching completed, succeeded, or failed jobs) +// 2) Is the pod IP changing? (changes how the network policy is applied to the host) +// 3) Is the pod's host IP changing? (should be caught in the above, with the CNI kube-router runs with but we check this as well for sanity) +// 4) Is a pod's label changing? (potentially changes which NetworkPolicies select this pod) +func isPodUpdateNetPolRelevant(oldPod, newPod *api.Pod) bool { + return newPod.Status.Phase != oldPod.Status.Phase || + newPod.Status.PodIP != oldPod.Status.PodIP || + !reflect.DeepEqual(newPod.Status.PodIPs, oldPod.Status.PodIPs) || + newPod.Status.HostIP != oldPod.Status.HostIP || + !reflect.DeepEqual(newPod.Labels, oldPod.Labels) +} + +func isNetPolActionable(pod *api.Pod) bool { + return !isFinished(pod) && pod.Status.PodIP != "" && !pod.Spec.HostNetwork +} + +func isFinished(pod *api.Pod) bool { + switch pod.Status.Phase { + case api.PodFailed, api.PodSucceeded, PodCompleted: + return true + } + return false +} + +func validateNodePortRange(nodePortOption string) (string, error) { + nodePortValidator := regexp.MustCompile(`^([0-9]+)[:-]([0-9]+)$`) + if matched := nodePortValidator.MatchString(nodePortOption); !matched { + return "", fmt.Errorf("failed to parse node port range given: '%s' please see specification in help text", nodePortOption) + } + matches := nodePortValidator.FindStringSubmatch(nodePortOption) + if len(matches) != 3 { + return "", fmt.Errorf("could not parse port number from range given: '%s'", nodePortOption) + } + port1, err := strconv.ParseUint(matches[1], 10, 16) + if err != nil { + return "", fmt.Errorf("could not parse first port number from range given: '%s'", nodePortOption) + } + port2, err := strconv.ParseUint(matches[2], 10, 16) + if err != nil { + return "", fmt.Errorf("could not parse second port number from range given: '%s'", nodePortOption) + } + if port1 >= port2 { + return "", fmt.Errorf("port 1 is greater than or equal to port 2 in range given: '%s'", nodePortOption) + } + return fmt.Sprintf("%d:%d", port1, port2), nil +} diff --git a/pkg/agent/netpol/utils/ipset.go b/pkg/agent/netpol/utils/ipset.go index 3f6afda64d..44bd5890f0 100644 --- a/pkg/agent/netpol/utils/ipset.go +++ b/pkg/agent/netpol/utils/ipset.go @@ -1,5 +1,5 @@ // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) -// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/utils/ipset.go +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/ipset.go // +build !windows @@ -7,15 +7,18 @@ package utils import ( "bytes" + "crypto/sha1" + "encoding/base32" "errors" "fmt" "os/exec" + "sort" "strings" ) var ( // Error returned when ipset binary is not found. - errIpsetNotFound = errors.New("Ipset utility not found") + errIpsetNotFound = errors.New("ipset utility not found") ) const ( @@ -82,6 +85,9 @@ const ( OptionNoMatch = "nomatch" // OptionForceAdd All hash set types support the optional forceadd parameter when creating a set. When sets created with this option become full the next addition to the set may succeed and evict a random entry from the set. OptionForceAdd = "forceadd" + + // tmpIPSetPrefix Is the prefix added to temporary ipset names used in the atomic swap operations during ipset restore. You should never see these on your system because they only exist during the restore. + tmpIPSetPrefix = "TMP-" ) // IPSet represent ipset sets managed by. @@ -181,7 +187,7 @@ func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error // Determine if set with the same name is already active on the system setIsActive, err := ipset.Sets[setName].IsActive() if err != nil { - return nil, fmt.Errorf("Failed to determine if ipset set %s exists: %s", + return nil, fmt.Errorf("failed to determine if ipset set %s exists: %s", setName, err) } @@ -193,20 +199,20 @@ func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error args = append(args, createOptions...) args = append(args, "family", "inet6") if _, err := ipset.run(args...); err != nil { - return nil, fmt.Errorf("Failed to create ipset set on system: %s", err) + return nil, fmt.Errorf("failed to create ipset set on system: %s", err) } } else { _, err := ipset.run(append([]string{"create", "-exist", setName}, createOptions...)...) if err != nil { - return nil, fmt.Errorf("Failed to create ipset set on system: %s", err) + return nil, fmt.Errorf("failed to create ipset set on system: %s", err) } } } return ipset.Sets[setName], nil } -// Adds a given Set to an IPSet +// Add a given Set to an IPSet func (ipset *IPSet) Add(set *Set) error { _, err := ipset.Create(set.Name, set.Options...) if err != nil { @@ -226,6 +232,22 @@ func (ipset *IPSet) Add(set *Set) error { return nil } +// RefreshSet add/update internal Sets with a Set of entries but does not run restore command +func (ipset *IPSet) RefreshSet(setName string, entriesWithOptions [][]string, setType string) { + if ipset.Get(setName) == nil { + ipset.Sets[setName] = &Set{ + Name: setName, + Options: []string{setType, OptionTimeout, "0"}, + Parent: ipset, + } + } + entries := make([]*Entry, len(entriesWithOptions)) + for i, entry := range entriesWithOptions { + entries[i] = &Entry{Set: ipset.Sets[setName], Options: entry} + } + ipset.Get(setName).Entries = entries +} + // Add a given entry to the set. If the -exist option is specified, ipset // ignores if the entry already added to the set. // Note: if you need to add multiple entries (e.g., in a loop), use BatchAdd instead, @@ -243,7 +265,7 @@ func (set *Set) Add(addOptions ...string) (*Entry, error) { return entry, nil } -// Adds given entries (with their options) to the set. +// BatchAdd given entries (with their options) to the set. // For multiple items, this is much faster than Add(). func (set *Set) BatchAdd(addOptions [][]string) error { newEntries := make([]*Entry, len(addOptions)) @@ -389,14 +411,59 @@ func parseIPSetSave(ipset *IPSet, result string) map[string]*Set { // create KUBE-DST-3YNVZWWGX3UQQ4VQ hash:ip family inet hashsize 1024 maxelem 65536 timeout 0 // add KUBE-DST-3YNVZWWGX3UQQ4VQ 100.96.1.6 timeout 0 func buildIPSetRestore(ipset *IPSet) string { - ipSetRestore := "" - for _, set := range ipset.Sets { - ipSetRestore += fmt.Sprintf("create %s %s\n", set.Name, strings.Join(set.Options[:], " ")) - for _, entry := range set.Entries { - ipSetRestore += fmt.Sprintf("add %s %s\n", set.Name, strings.Join(entry.Options[:], " ")) - } + setNames := make([]string, 0, len(ipset.Sets)) + for setName := range ipset.Sets { + // we need setNames in some consistent order so that we can unit-test this method has a predictable output: + setNames = append(setNames, setName) } - return ipSetRestore + + sort.Strings(setNames) + + tmpSets := map[string]string{} + ipSetRestore := &strings.Builder{} + for _, setName := range setNames { + set := ipset.Sets[setName] + setOptions := strings.Join(set.Options, " ") + + tmpSetName := tmpSets[setOptions] + if tmpSetName == "" { + // create a temporary set per unique set-options: + hash := sha1.Sum([]byte("tmp:" + setOptions)) + tmpSetName = tmpIPSetPrefix + base32.StdEncoding.EncodeToString(hash[:10]) + ipSetRestore.WriteString(fmt.Sprintf("create %s %s\n", tmpSetName, setOptions)) + // just in case we are starting up after a crash, we should flush the TMP ipset to be safe if it + // already existed, so we do not pollute other ipsets: + ipSetRestore.WriteString(fmt.Sprintf("flush %s\n", tmpSetName)) + tmpSets[setOptions] = tmpSetName + } + + for _, entry := range set.Entries { + // add entries to the tmp set: + ipSetRestore.WriteString(fmt.Sprintf("add %s %s\n", tmpSetName, strings.Join(entry.Options, " "))) + } + + // now create the actual IPSet (this is a noop if it already exists, because we run with -exists): + ipSetRestore.WriteString(fmt.Sprintf("create %s %s\n", set.Name, setOptions)) + + // now that both exist, we can swap them: + ipSetRestore.WriteString(fmt.Sprintf("swap %s %s\n", tmpSetName, set.Name)) + + // empty the tmp set (which is actually the old one now): + ipSetRestore.WriteString(fmt.Sprintf("flush %s\n", tmpSetName)) + } + + setsToDestroy := make([]string, 0, len(tmpSets)) + for _, tmpSetName := range tmpSets { + setsToDestroy = append(setsToDestroy, tmpSetName) + } + // need to destroy the sets in a predictable order for unit test! + sort.Strings(setsToDestroy) + for _, tmpSetName := range setsToDestroy { + // finally, destroy the tmp sets. + ipSetRestore.WriteString(fmt.Sprintf("destroy %s\n", tmpSetName)) + } + + return ipSetRestore.String() } // Save the given set, or all sets if none is given to stdout in a format that @@ -489,7 +556,7 @@ func (set *Set) Refresh(entries []string, extraOptions ...string) error { return set.RefreshWithBuiltinOptions(entriesWithOptions) } -// Refresh a Set with new entries with built-in options. +// RefreshWithBuiltinOptions refresh a Set with new entries with built-in options. func (set *Set) RefreshWithBuiltinOptions(entries [][]string) error { var err error diff --git a/pkg/agent/netpol/utils/ipset_test.go b/pkg/agent/netpol/utils/ipset_test.go new file mode 100644 index 0000000000..fd3a7974c4 --- /dev/null +++ b/pkg/agent/netpol/utils/ipset_test.go @@ -0,0 +1,77 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/ipset_test.go + +package utils + +import "testing" + +func Test_buildIPSetRestore(t *testing.T) { + type args struct { + ipset *IPSet + } + tests := []struct { + name string + args args + want string + }{ + { + name: "simple-restore", + args: args{ + ipset: &IPSet{Sets: map[string]*Set{ + "foo": { + Name: "foo", + Options: []string{"hash:ip", "yolo", "things", "12345"}, + Entries: []*Entry{ + {Options: []string{"1.2.3.4"}}, + }, + }, + "google-dns-servers": { + Name: "google-dns-servers", + Options: []string{"hash:ip", "lol"}, + Entries: []*Entry{ + {Options: []string{"4.4.4.4"}}, + {Options: []string{"8.8.8.8"}}, + }, + }, + // this one and the one above share the same exact options -- and therefore will reuse the same + // tmp ipset: + "more-ip-addresses": { + Name: "google-dns-servers", + Options: []string{"hash:ip", "lol"}, + Entries: []*Entry{ + {Options: []string{"5.5.5.5"}}, + {Options: []string{"6.6.6.6"}}, + }, + }, + }}, + }, + want: "create TMP-7NOTZDOMLXBX6DAJ hash:ip yolo things 12345\n" + + "flush TMP-7NOTZDOMLXBX6DAJ\n" + + "add TMP-7NOTZDOMLXBX6DAJ 1.2.3.4\n" + + "create foo hash:ip yolo things 12345\n" + + "swap TMP-7NOTZDOMLXBX6DAJ foo\n" + + "flush TMP-7NOTZDOMLXBX6DAJ\n" + + "create TMP-XD7BSSQZELS7TP35 hash:ip lol\n" + + "flush TMP-XD7BSSQZELS7TP35\n" + + "add TMP-XD7BSSQZELS7TP35 4.4.4.4\n" + + "add TMP-XD7BSSQZELS7TP35 8.8.8.8\n" + + "create google-dns-servers hash:ip lol\n" + + "swap TMP-XD7BSSQZELS7TP35 google-dns-servers\n" + + "flush TMP-XD7BSSQZELS7TP35\n" + + "add TMP-XD7BSSQZELS7TP35 5.5.5.5\n" + + "add TMP-XD7BSSQZELS7TP35 6.6.6.6\n" + + "create google-dns-servers hash:ip lol\n" + + "swap TMP-XD7BSSQZELS7TP35 google-dns-servers\n" + + "flush TMP-XD7BSSQZELS7TP35\n" + + "destroy TMP-7NOTZDOMLXBX6DAJ\n" + + "destroy TMP-XD7BSSQZELS7TP35\n", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := buildIPSetRestore(tt.args.ipset); got != tt.want { + t.Errorf("buildIPSetRestore() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/agent/netpol/utils/iptables.go b/pkg/agent/netpol/utils/iptables.go new file mode 100644 index 0000000000..c77fdbdec6 --- /dev/null +++ b/pkg/agent/netpol/utils/iptables.go @@ -0,0 +1,75 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/iptables.go + +package utils + +import ( + "bytes" + "fmt" + "os/exec" + "strings" +) + +var hasWait bool + +func init() { + path, err := exec.LookPath("iptables-restore") + if err != nil { + return + } + args := []string{"iptables-restore", "--help"} + cmd := exec.Cmd{ + Path: path, + Args: args, + } + cmdOutput, err := cmd.CombinedOutput() + if err != nil { + return + } + hasWait = strings.Contains(string(cmdOutput), "wait") +} + +// SaveInto calls `iptables-save` for given table and stores result in a given buffer. +func SaveInto(table string, buffer *bytes.Buffer) error { + path, err := exec.LookPath("iptables-save") + if err != nil { + return err + } + stderrBuffer := bytes.NewBuffer(nil) + args := []string{"iptables-save", "-t", table} + cmd := exec.Cmd{ + Path: path, + Args: args, + Stdout: buffer, + Stderr: stderrBuffer, + } + if err := cmd.Run(); err != nil { + return fmt.Errorf("%v (%s)", err, stderrBuffer) + } + return nil +} + +// Restore runs `iptables-restore` passing data through []byte. +func Restore(table string, data []byte) error { + path, err := exec.LookPath("iptables-restore") + if err != nil { + return err + } + var args []string + if hasWait { + args = []string{"iptables-restore", "--wait", "-T", table} + } else { + args = []string{"iptables-restore", "-T", table} + } + cmd := exec.Cmd{ + Path: path, + Args: args, + Stdin: bytes.NewBuffer(data), + } + b, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("%v (%s)", err, b) + } + + return nil +} diff --git a/pkg/agent/netpol/utils/node.go b/pkg/agent/netpol/utils/node.go index 7283586f27..571e1031da 100644 --- a/pkg/agent/netpol/utils/node.go +++ b/pkg/agent/netpol/utils/node.go @@ -1,5 +1,5 @@ // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) -// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/utils/node.go +// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/node.go // +build !windows @@ -43,7 +43,7 @@ func GetNodeObject(clientset kubernetes.Interface, hostnameOverride string) (*ap } } - return nil, fmt.Errorf("Failed to identify the node by NODE_NAME, hostname or --hostname-override") + return nil, fmt.Errorf("failed to identify the node by NODE_NAME, hostname or --hostname-override") } // GetNodeIP returns the most valid external facing IP address for a node. diff --git a/vendor/modules.txt b/vendor/modules.txt index 68f7c51398..e39bb4852b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2142,6 +2142,7 @@ k8s.io/heapster/metrics/api/v1/types ## explicit k8s.io/klog # k8s.io/klog/v2 v2.8.0 +## explicit k8s.io/klog/v2 # k8s.io/kube-aggregator v0.18.0 => github.com/k3s-io/kubernetes/staging/src/k8s.io/kube-aggregator v1.21.2-k3s1 k8s.io/kube-aggregator/pkg/apis/apiregistration