[Backport 1.21] Update embedded kube-router (#3557) (#3595)

* Update embedded kube-router

Signed-off-by: dereknola <derek.nola@suse.com>
This commit is contained in:
Derek Nola 2021-07-07 15:04:23 -07:00 committed by GitHub
parent 0c2d8376d0
commit 0c5577a8ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 992 additions and 621 deletions

1
go.mod
View File

@ -125,6 +125,7 @@ require (
k8s.io/controller-manager v0.21.2 k8s.io/controller-manager v0.21.2
k8s.io/cri-api v0.21.2 k8s.io/cri-api v0.21.2
k8s.io/klog v1.0.0 k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.8.0
k8s.io/kubectl v0.21.2 k8s.io/kubectl v0.21.2
k8s.io/kubernetes v1.21.2 k8s.io/kubernetes v1.21.2
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 k8s.io/utils v0.0.0-20201110183641-67b214c5f920

View File

@ -1,5 +1,5 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/namespace.go // - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/namespace.go
// +build !windows // +build !windows
@ -10,7 +10,7 @@ import (
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
glog "k8s.io/klog" "k8s.io/klog/v2"
) )
func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler { func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler {
@ -32,7 +32,7 @@ func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEve
return return
} }
default: default:
glog.Errorf("unexpected object type: %v", obj) klog.Errorf("unexpected object type: %v", obj)
} }
}, },
} }
@ -42,7 +42,7 @@ func (npc *NetworkPolicyController) handleNamespaceAdd(obj *api.Namespace) {
if obj.Labels == nil { if obj.Labels == nil {
return return
} }
glog.V(2).Infof("Received update for namespace: %s", obj.Name) klog.V(2).Infof("Received update for namespace: %s", obj.Name)
npc.RequestFullSync() npc.RequestFullSync()
} }
@ -51,7 +51,7 @@ func (npc *NetworkPolicyController) handleNamespaceUpdate(oldObj, newObj *api.Na
if reflect.DeepEqual(oldObj.Labels, newObj.Labels) { if reflect.DeepEqual(oldObj.Labels, newObj.Labels) {
return return
} }
glog.V(2).Infof("Received update for namespace: %s", newObj.Name) klog.V(2).Infof("Received update for namespace: %s", newObj.Name)
npc.RequestFullSync() npc.RequestFullSync()
} }
@ -60,7 +60,7 @@ func (npc *NetworkPolicyController) handleNamespaceDelete(obj *api.Namespace) {
if obj.Labels == nil { if obj.Labels == nil {
return return
} }
glog.V(2).Infof("Received namespace: %s delete event", obj.Name) klog.V(2).Infof("Received namespace: %s delete event", obj.Name)
npc.RequestFullSync() npc.RequestFullSync()
} }

View File

@ -1,9 +1,13 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol.go
// +build !windows // +build !windows
package netpol package netpol
import ( import (
"context" "context"
"sync"
"github.com/rancher/k3s/pkg/agent/netpol/utils" "github.com/rancher/k3s/pkg/agent/netpol/utils"
"github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/daemons/config"
@ -48,7 +52,7 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
informerFactory.Start(stopCh) informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh) informerFactory.WaitForCacheSync(stopCh)
npc, err := NewNetworkPolicyController(client, nodeConfig, podInformer, npInformer, nsInformer) npc, err := NewNetworkPolicyController(client, nodeConfig, podInformer, npInformer, nsInformer, &sync.Mutex{})
if err != nil { if err != nil {
return err return err
} }

View File

@ -1,11 +1,12 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/network_policy_controller.go // - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/network_policy_controller.go
// +build !windows // +build !windows
package netpol package netpol
import ( import (
"bytes"
"crypto/sha256" "crypto/sha256"
"encoding/base32" "encoding/base32"
"fmt" "fmt"
@ -22,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
glog "k8s.io/klog" "k8s.io/klog/v2"
) )
const ( const (
@ -33,6 +34,7 @@ const (
kubeInputChainName = "KUBE-ROUTER-INPUT" kubeInputChainName = "KUBE-ROUTER-INPUT"
kubeForwardChainName = "KUBE-ROUTER-FORWARD" kubeForwardChainName = "KUBE-ROUTER-FORWARD"
kubeOutputChainName = "KUBE-ROUTER-OUTPUT" kubeOutputChainName = "KUBE-ROUTER-OUTPUT"
kubeDefaultNetpolChain = "KUBE-NWPLCY-DEFAULT"
defaultSyncPeriod = 5 * time.Minute defaultSyncPeriod = 5 * time.Minute
) )
@ -47,7 +49,7 @@ const (
// followed by one or more network policy chains, till there is a match which will accept the packet, or gets // followed by one or more network policy chains, till there is a match which will accept the packet, or gets
// dropped by the rule in the pod chain, if there is no match. // dropped by the rule in the pod chain, if there is no match.
// NetworkPolicyController strcut to hold information required by NetworkPolicyController // NetworkPolicyController struct to hold information required by NetworkPolicyController
type NetworkPolicyController struct { type NetworkPolicyController struct {
nodeIP net.IP nodeIP net.IP
nodeHostName string nodeHostName string
@ -57,6 +59,7 @@ type NetworkPolicyController struct {
mu sync.Mutex mu sync.Mutex
syncPeriod time.Duration syncPeriod time.Duration
fullSyncRequestChan chan struct{} fullSyncRequestChan chan struct{}
ipsetMutex *sync.Mutex
ipSetHandler *utils.IPSet ipSetHandler *utils.IPSet
@ -67,6 +70,8 @@ type NetworkPolicyController struct {
PodEventHandler cache.ResourceEventHandler PodEventHandler cache.ResourceEventHandler
NamespaceEventHandler cache.ResourceEventHandler NamespaceEventHandler cache.ResourceEventHandler
NetworkPolicyEventHandler cache.ResourceEventHandler NetworkPolicyEventHandler cache.ResourceEventHandler
filterTableRules bytes.Buffer
} }
// internal structure to represent a network policy // internal structure to represent a network policy
@ -119,6 +124,7 @@ type egressRule struct {
type protocolAndPort struct { type protocolAndPort struct {
protocol string protocol string
port string port string
endport string
} }
type endPoints struct { type endPoints struct {
@ -135,29 +141,32 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) {
t := time.NewTicker(npc.syncPeriod) t := time.NewTicker(npc.syncPeriod)
defer t.Stop() defer t.Stop()
glog.Info("Starting network policy controller") klog.Info("Starting network policy controller")
// setup kube-router specific top level cutoms chains // setup kube-router specific top level custom chains (KUBE-ROUTER-INPUT, KUBE-ROUTER-FORWARD, KUBE-ROUTER-OUTPUT)
npc.ensureTopLevelChains() npc.ensureTopLevelChains()
// setup default network policy chain that is applied to traffic from/to the pods that does not match any network policy
npc.ensureDefaultNetworkPolicyChain()
// Full syncs of the network policy controller take a lot of time and can only be processed one at a time, // Full syncs of the network policy controller take a lot of time and can only be processed one at a time,
// therefore, we start it in it's own goroutine and request a sync through a single item channel // therefore, we start it in it's own goroutine and request a sync through a single item channel
glog.Info("Starting network policy controller full sync goroutine") klog.Info("Starting network policy controller full sync goroutine")
go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}) { go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}) {
for { for {
// Add an additional non-blocking select to ensure that if the stopCh channel is closed it is handled first // Add an additional non-blocking select to ensure that if the stopCh channel is closed it is handled first
select { select {
case <-stopCh: case <-stopCh:
glog.Info("Shutting down network policies full sync goroutine") klog.Info("Shutting down network policies full sync goroutine")
return return
default: default:
} }
select { select {
case <-stopCh: case <-stopCh:
glog.Info("Shutting down network policies full sync goroutine") klog.Info("Shutting down network policies full sync goroutine")
return return
case <-fullSyncRequest: case <-fullSyncRequest:
glog.V(3).Info("Received request for a full sync, processing") klog.V(3).Info("Received request for a full sync, processing")
npc.fullPolicySync() // fullPolicySync() is a blocking request here npc.fullPolicySync() // fullPolicySync() is a blocking request here
} }
} }
@ -165,11 +174,11 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) {
// loop forever till notified to stop on stopCh // loop forever till notified to stop on stopCh
for { for {
glog.V(1).Info("Requesting periodic sync of iptables to reflect network policies") klog.V(1).Info("Requesting periodic sync of iptables to reflect network policies")
npc.RequestFullSync() npc.RequestFullSync()
select { select {
case <-stopCh: case <-stopCh:
glog.Infof("Shutting down network policies controller") klog.Infof("Shutting down network policies controller")
return return
case <-t.C: case <-t.C:
} }
@ -180,9 +189,9 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) {
func (npc *NetworkPolicyController) RequestFullSync() { func (npc *NetworkPolicyController) RequestFullSync() {
select { select {
case npc.fullSyncRequestChan <- struct{}{}: case npc.fullSyncRequestChan <- struct{}{}:
glog.V(3).Info("Full sync request queue was empty so a full sync request was successfully sent") klog.V(3).Info("Full sync request queue was empty so a full sync request was successfully sent")
default: // Don't block if the buffered channel is full, return quickly so that we don't block callee execution default: // Don't block if the buffered channel is full, return quickly so that we don't block callee execution
glog.V(1).Info("Full sync request queue was full, skipping...") klog.V(1).Info("Full sync request queue was full, skipping...")
} }
} }
@ -198,35 +207,55 @@ func (npc *NetworkPolicyController) fullPolicySync() {
syncVersion := strconv.FormatInt(start.UnixNano(), 10) syncVersion := strconv.FormatInt(start.UnixNano(), 10)
defer func() { defer func() {
endTime := time.Since(start) endTime := time.Since(start)
glog.V(1).Infof("sync iptables took %v", endTime) klog.V(1).Infof("sync iptables took %v", endTime)
}() }()
glog.V(1).Infof("Starting sync of iptables with version: %s", syncVersion) klog.V(1).Infof("Starting sync of iptables with version: %s", syncVersion)
// ensure kube-router specific top level chains and corresponding rules exist // ensure kube-router specific top level chains and corresponding rules exist
npc.ensureTopLevelChains() npc.ensureTopLevelChains()
// ensure default network policy chain that is applied to traffic from/to the pods that does not match any network policy
npc.ensureDefaultNetworkPolicyChain()
networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo() networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo()
if err != nil { if err != nil {
glog.Errorf("Aborting sync. Failed to build network policies: %v", err.Error()) klog.Errorf("Aborting sync. Failed to build network policies: %v", err.Error())
return
}
npc.filterTableRules.Reset()
if err := utils.SaveInto("filter", &npc.filterTableRules); err != nil {
klog.Errorf("Aborting sync. Failed to run iptables-save: %v" + err.Error())
return return
} }
activePolicyChains, activePolicyIPSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion) activePolicyChains, activePolicyIPSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion)
if err != nil { if err != nil {
glog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error()) klog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error())
return return
} }
activePodFwChains, err := npc.syncPodFirewallChains(networkPoliciesInfo, syncVersion) activePodFwChains, err := npc.syncPodFirewallChains(networkPoliciesInfo, syncVersion)
if err != nil { if err != nil {
glog.Errorf("Aborting sync. Failed to sync pod firewalls: %v", err.Error()) klog.Errorf("Aborting sync. Failed to sync pod firewalls: %v", err.Error())
return return
} }
err = cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets) err = npc.cleanupStaleRules(activePolicyChains, activePodFwChains)
if err != nil { if err != nil {
glog.Errorf("Aborting sync. Failed to cleanup stale iptables rules: %v", err.Error()) klog.Errorf("Aborting sync. Failed to cleanup stale iptables rules: %v", err.Error())
return
}
if err := utils.Restore("filter", npc.filterTableRules.Bytes()); err != nil {
klog.Errorf("Aborting sync. Failed to run iptables-restore: %v\n%s", err.Error(), npc.filterTableRules.String())
return
}
err = npc.cleanupStaleIPSets(activePolicyIPSets)
if err != nil {
klog.Errorf("Failed to cleanup stale ipsets: %v", err.Error())
return return
} }
} }
@ -240,7 +269,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
iptablesCmdHandler, err := iptables.New() iptablesCmdHandler, err := iptables.New()
if err != nil { if err != nil {
glog.Fatalf("Failed to initialize iptables executor due to %s", err.Error()) klog.Fatalf("Failed to initialize iptables executor due to %s", err.Error())
} }
addUUIDForRuleSpec := func(chain string, ruleSpec *[]string) (string, error) { addUUIDForRuleSpec := func(chain string, ruleSpec *[]string) (string, error) {
@ -258,18 +287,18 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
ensureRuleAtPosition := func(chain string, ruleSpec []string, uuid string, position int) { ensureRuleAtPosition := func(chain string, ruleSpec []string, uuid string, position int) {
exists, err := iptablesCmdHandler.Exists("filter", chain, ruleSpec...) exists, err := iptablesCmdHandler.Exists("filter", chain, ruleSpec...)
if err != nil { if err != nil {
glog.Fatalf("Failed to verify rule exists in %s chain due to %s", chain, err.Error()) klog.Fatalf("Failed to verify rule exists in %s chain due to %s", chain, err.Error())
} }
if !exists { if !exists {
err := iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...) err := iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...)
if err != nil { if err != nil {
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error()) klog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error())
} }
return return
} }
rules, err := iptablesCmdHandler.List("filter", chain) rules, err := iptablesCmdHandler.List("filter", chain)
if err != nil { if err != nil {
glog.Fatalf("failed to list rules in filter table %s chain due to %s", chain, err.Error()) klog.Fatalf("failed to list rules in filter table %s chain due to %s", chain, err.Error())
} }
var ruleNo, ruleIndexOffset int var ruleNo, ruleIndexOffset int
@ -290,11 +319,11 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
if ruleNo != position { if ruleNo != position {
err = iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...) err = iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...)
if err != nil { if err != nil {
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error()) klog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error())
} }
err = iptablesCmdHandler.Delete("filter", chain, strconv.Itoa(ruleNo+1)) err = iptablesCmdHandler.Delete("filter", chain, strconv.Itoa(ruleNo+1))
if err != nil { if err != nil {
glog.Fatalf("Failed to delete incorrect rule in %s chain due to %s", chain, err.Error()) klog.Fatalf("Failed to delete incorrect rule in %s chain due to %s", chain, err.Error())
} }
} }
} }
@ -304,12 +333,12 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
for builtinChain, customChain := range chains { for builtinChain, customChain := range chains {
err = iptablesCmdHandler.NewChain("filter", customChain) err = iptablesCmdHandler.NewChain("filter", customChain)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 { if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
glog.Fatalf("Failed to run iptables command to create %s chain due to %s", customChain, err.Error()) klog.Fatalf("Failed to run iptables command to create %s chain due to %s", customChain, err.Error())
} }
args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain} args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain}
uuid, err := addUUIDForRuleSpec(builtinChain, &args) uuid, err := addUUIDForRuleSpec(builtinChain, &args)
if err != nil { if err != nil {
glog.Fatalf("Failed to get uuid for rule: %s", err.Error()) klog.Fatalf("Failed to get uuid for rule: %s", err.Error())
} }
ensureRuleAtPosition(builtinChain, args, uuid, 1) ensureRuleAtPosition(builtinChain, args, uuid, 1)
} }
@ -317,7 +346,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to cluster IP", "-d", npc.serviceClusterIPRange.String(), "-j", "RETURN"} whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to cluster IP", "-d", npc.serviceClusterIPRange.String(), "-j", "RETURN"}
uuid, err := addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips) uuid, err := addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips)
if err != nil { if err != nil {
glog.Fatalf("Failed to get uuid for rule: %s", err.Error()) klog.Fatalf("Failed to get uuid for rule: %s", err.Error())
} }
ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, 1) ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, 1)
@ -325,7 +354,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
"-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"} "-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"}
uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistTCPNodeports) uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistTCPNodeports)
if err != nil { if err != nil {
glog.Fatalf("Failed to get uuid for rule: %s", err.Error()) klog.Fatalf("Failed to get uuid for rule: %s", err.Error())
} }
ensureRuleAtPosition(kubeInputChainName, whitelistTCPNodeports, uuid, 2) ensureRuleAtPosition(kubeInputChainName, whitelistTCPNodeports, uuid, 2)
@ -333,7 +362,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
"-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"} "-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"}
uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistUDPNodeports) uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistUDPNodeports)
if err != nil { if err != nil {
glog.Fatalf("Failed to get uuid for rule: %s", err.Error()) klog.Fatalf("Failed to get uuid for rule: %s", err.Error())
} }
ensureRuleAtPosition(kubeInputChainName, whitelistUDPNodeports, uuid, 3) ensureRuleAtPosition(kubeInputChainName, whitelistUDPNodeports, uuid, 3)
@ -341,40 +370,66 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to external IP range: " + externalIPRange.String(), "-d", externalIPRange.String(), "-j", "RETURN"} whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to external IP range: " + externalIPRange.String(), "-d", externalIPRange.String(), "-j", "RETURN"}
uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips) uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips)
if err != nil { if err != nil {
glog.Fatalf("Failed to get uuid for rule: %s", err.Error()) klog.Fatalf("Failed to get uuid for rule: %s", err.Error())
} }
ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, externalIPIndex+4) ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, externalIPIndex+4)
} }
// for the traffic to/from the local pod's let network policy controller be
// authoritative entity to ACCEPT the traffic if it complies to network policies
for _, chain := range chains {
comment := "rule to explicitly ACCEPT traffic that comply to network policies"
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", "0x20000/0x20000", "-j", "ACCEPT"}
err = iptablesCmdHandler.AppendUnique("filter", chain, args...)
if err != nil {
klog.Fatalf("Failed to run iptables command: %s", err.Error())
}
}
} }
func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets map[string]bool) error { // Creates custom chains KUBE-NWPLCY-DEFAULT
func (npc *NetworkPolicyController) ensureDefaultNetworkPolicyChain() {
iptablesCmdHandler, err := iptables.New()
if err != nil {
klog.Fatalf("Failed to initialize iptables executor due to %s", err.Error())
}
markArgs := make([]string, 0)
markComment := "rule to mark traffic matching a network policy"
markArgs = append(markArgs, "-j", "MARK", "-m", "comment", "--comment", markComment, "--set-xmark", "0x10000/0x10000")
err = iptablesCmdHandler.NewChain("filter", kubeDefaultNetpolChain)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
klog.Fatalf("Failed to run iptables command to create %s chain due to %s", kubeDefaultNetpolChain, err.Error())
}
err = iptablesCmdHandler.AppendUnique("filter", kubeDefaultNetpolChain, markArgs...)
if err != nil {
klog.Fatalf("Failed to run iptables command: %s", err.Error())
}
}
func (npc *NetworkPolicyController) cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) error {
cleanupPodFwChains := make([]string, 0) cleanupPodFwChains := make([]string, 0)
cleanupPolicyChains := make([]string, 0) cleanupPolicyChains := make([]string, 0)
cleanupPolicyIPSets := make([]*utils.Set, 0)
// initialize tool sets for working with iptables and ipset // initialize tool sets for working with iptables and ipset
iptablesCmdHandler, err := iptables.New() iptablesCmdHandler, err := iptables.New()
if err != nil { if err != nil {
glog.Fatalf("failed to initialize iptables command executor due to %s", err.Error()) return fmt.Errorf("failed to initialize iptables command executor due to %s", err.Error())
}
ipsets, err := utils.NewIPSet(false)
if err != nil {
glog.Fatalf("failed to create ipsets command executor due to %s", err.Error())
}
err = ipsets.Save()
if err != nil {
glog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error())
} }
// find iptables chains and ipsets that are no longer used by comparing current to the active maps we were passed // find iptables chains and ipsets that are no longer used by comparing current to the active maps we were passed
chains, err := iptablesCmdHandler.ListChains("filter") chains, err := iptablesCmdHandler.ListChains("filter")
if err != nil { if err != nil {
return fmt.Errorf("Unable to list chains: %s", err) return fmt.Errorf("unable to list chains: %s", err)
} }
for _, chain := range chains { for _, chain := range chains {
if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) { if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) {
if chain == kubeDefaultNetpolChain {
continue
}
if _, ok := activePolicyChains[chain]; !ok { if _, ok := activePolicyChains[chain]; !ok {
cleanupPolicyChains = append(cleanupPolicyChains, chain) cleanupPolicyChains = append(cleanupPolicyChains, chain)
} }
@ -385,6 +440,58 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
} }
} }
} }
var newChains, newRules, desiredFilterTable bytes.Buffer
rules := strings.Split(npc.filterTableRules.String(), "\n")
if len(rules) > 0 && rules[len(rules)-1] == "" {
rules = rules[:len(rules)-1]
}
for _, rule := range rules {
skipRule := false
for _, podFWChainName := range cleanupPodFwChains {
if strings.Contains(rule, podFWChainName) {
skipRule = true
break
}
}
for _, policyChainName := range cleanupPolicyChains {
if strings.Contains(rule, policyChainName) {
skipRule = true
break
}
}
if strings.Contains(rule, "COMMIT") || strings.HasPrefix(rule, "# ") {
skipRule = true
}
if skipRule {
continue
}
if strings.HasPrefix(rule, ":") {
newChains.WriteString(rule + " - [0:0]\n")
}
if strings.HasPrefix(rule, "-") {
newRules.WriteString(rule + "\n")
}
}
desiredFilterTable.WriteString("*filter" + "\n")
desiredFilterTable.Write(newChains.Bytes())
desiredFilterTable.Write(newRules.Bytes())
desiredFilterTable.WriteString("COMMIT" + "\n")
npc.filterTableRules = desiredFilterTable
return nil
}
func (npc *NetworkPolicyController) cleanupStaleIPSets(activePolicyIPSets map[string]bool) error {
cleanupPolicyIPSets := make([]*utils.Set, 0)
ipsets, err := utils.NewIPSet(false)
if err != nil {
return fmt.Errorf("failed to create ipsets command executor due to %s", err.Error())
}
err = ipsets.Save()
if err != nil {
klog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error())
}
for _, set := range ipsets.Sets { for _, set := range ipsets.Sets {
if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) || if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) ||
strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) { strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) {
@ -393,83 +500,11 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
} }
} }
} }
// remove stale iptables podFwChain references from the filter table chains
for _, podFwChain := range cleanupPodFwChains {
primaryChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName}
for _, egressChain := range primaryChains {
forwardChainRules, err := iptablesCmdHandler.List("filter", egressChain)
if err != nil {
return fmt.Errorf("failed to list rules in filter table, %s podFwChain due to %s", egressChain, err.Error())
}
// TODO delete rule by spec, than rule number to avoid extra loop
var realRuleNo int
for i, rule := range forwardChainRules {
if strings.Contains(rule, podFwChain) {
err = iptablesCmdHandler.Delete("filter", egressChain, strconv.Itoa(i-realRuleNo))
if err != nil {
return fmt.Errorf("failed to delete rule: %s from the %s podFwChain of filter table due to %s", rule, egressChain, err.Error())
}
realRuleNo++
}
}
}
}
// cleanup pod firewall chain
for _, chain := range cleanupPodFwChains {
glog.V(2).Infof("Found pod fw chain to cleanup: %s", chain)
err = iptablesCmdHandler.ClearChain("filter", chain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", chain, err.Error())
}
err = iptablesCmdHandler.DeleteChain("filter", chain)
if err != nil {
return fmt.Errorf("Failed to delete the chain %s due to %s", chain, err.Error())
}
glog.V(2).Infof("Deleted pod specific firewall chain: %s from the filter table", chain)
}
// cleanup network policy chains
for _, policyChain := range cleanupPolicyChains {
glog.V(2).Infof("Found policy chain to cleanup %s", policyChain)
// first clean up any references from active pod firewall chains
for podFwChain := range activePodFwChains {
podFwChainRules, err := iptablesCmdHandler.List("filter", podFwChain)
if err != nil {
return fmt.Errorf("Unable to list rules from the chain %s: %s", podFwChain, err)
}
for i, rule := range podFwChainRules {
if strings.Contains(rule, policyChain) {
err = iptablesCmdHandler.Delete("filter", podFwChain, strconv.Itoa(i))
if err != nil {
return fmt.Errorf("Failed to delete rule %s from the chain %s", rule, podFwChain)
}
break
}
}
}
// now that all stale and active references to the network policy chain have been removed, delete the chain
err = iptablesCmdHandler.ClearChain("filter", policyChain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
}
err = iptablesCmdHandler.DeleteChain("filter", policyChain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
}
glog.V(2).Infof("Deleted network policy chain: %s from the filter table", policyChain)
}
// cleanup network policy ipsets // cleanup network policy ipsets
for _, set := range cleanupPolicyIPSets { for _, set := range cleanupPolicyIPSets {
err = set.Destroy() err = set.Destroy()
if err != nil { if err != nil {
return fmt.Errorf("Failed to delete ipset %s due to %s", set.Name, err) return fmt.Errorf("failed to delete ipset %s due to %s", set.Name, err)
} }
} }
return nil return nil
@ -478,17 +513,18 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
// Cleanup cleanup configurations done // Cleanup cleanup configurations done
func (npc *NetworkPolicyController) Cleanup() { func (npc *NetworkPolicyController) Cleanup() {
glog.Info("Cleaning up iptables configuration permanently done by kube-router") klog.Info("Cleaning up iptables configuration permanently done by kube-router")
iptablesCmdHandler, err := iptables.New() iptablesCmdHandler, err := iptables.New()
if err != nil { if err != nil {
glog.Errorf("Failed to initialize iptables executor: %s", err.Error()) klog.Errorf("Failed to initialize iptables executor: %s", err.Error())
return
} }
// delete jump rules in FORWARD chain to pod specific firewall chain // delete jump rules in FORWARD chain to pod specific firewall chain
forwardChainRules, err := iptablesCmdHandler.List("filter", kubeForwardChainName) forwardChainRules, err := iptablesCmdHandler.List("filter", kubeForwardChainName)
if err != nil { if err != nil {
glog.Errorf("Failed to delete iptables rules as part of cleanup") klog.Errorf("Failed to delete iptables rules as part of cleanup")
return return
} }
@ -498,7 +534,7 @@ func (npc *NetworkPolicyController) Cleanup() {
if strings.Contains(rule, kubePodFirewallChainPrefix) { if strings.Contains(rule, kubePodFirewallChainPrefix) {
err = iptablesCmdHandler.Delete("filter", kubeForwardChainName, strconv.Itoa(i-realRuleNo)) err = iptablesCmdHandler.Delete("filter", kubeForwardChainName, strconv.Itoa(i-realRuleNo))
if err != nil { if err != nil {
glog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err) klog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err)
} }
realRuleNo++ realRuleNo++
} }
@ -507,7 +543,7 @@ func (npc *NetworkPolicyController) Cleanup() {
// delete jump rules in OUTPUT chain to pod specific firewall chain // delete jump rules in OUTPUT chain to pod specific firewall chain
forwardChainRules, err = iptablesCmdHandler.List("filter", kubeOutputChainName) forwardChainRules, err = iptablesCmdHandler.List("filter", kubeOutputChainName)
if err != nil { if err != nil {
glog.Errorf("Failed to delete iptables rules as part of cleanup") klog.Errorf("Failed to delete iptables rules as part of cleanup")
return return
} }
@ -517,7 +553,7 @@ func (npc *NetworkPolicyController) Cleanup() {
if strings.Contains(rule, kubePodFirewallChainPrefix) { if strings.Contains(rule, kubePodFirewallChainPrefix) {
err = iptablesCmdHandler.Delete("filter", kubeOutputChainName, strconv.Itoa(i-realRuleNo)) err = iptablesCmdHandler.Delete("filter", kubeOutputChainName, strconv.Itoa(i-realRuleNo))
if err != nil { if err != nil {
glog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err) klog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err)
} }
realRuleNo++ realRuleNo++
} }
@ -526,19 +562,19 @@ func (npc *NetworkPolicyController) Cleanup() {
// flush and delete pod specific firewall chain // flush and delete pod specific firewall chain
chains, err := iptablesCmdHandler.ListChains("filter") chains, err := iptablesCmdHandler.ListChains("filter")
if err != nil { if err != nil {
glog.Errorf("Unable to list chains: %s", err) klog.Errorf("Unable to list chains: %s", err)
return return
} }
for _, chain := range chains { for _, chain := range chains {
if strings.HasPrefix(chain, kubePodFirewallChainPrefix) { if strings.HasPrefix(chain, kubePodFirewallChainPrefix) {
err = iptablesCmdHandler.ClearChain("filter", chain) err = iptablesCmdHandler.ClearChain("filter", chain)
if err != nil { if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error()) klog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return return
} }
err = iptablesCmdHandler.DeleteChain("filter", chain) err = iptablesCmdHandler.DeleteChain("filter", chain)
if err != nil { if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error()) klog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return return
} }
} }
@ -547,45 +583,53 @@ func (npc *NetworkPolicyController) Cleanup() {
// flush and delete per network policy specific chain // flush and delete per network policy specific chain
chains, err = iptablesCmdHandler.ListChains("filter") chains, err = iptablesCmdHandler.ListChains("filter")
if err != nil { if err != nil {
glog.Errorf("Unable to list chains: %s", err) klog.Errorf("Unable to list chains: %s", err)
return return
} }
for _, chain := range chains { for _, chain := range chains {
if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) { if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) {
err = iptablesCmdHandler.ClearChain("filter", chain) err = iptablesCmdHandler.ClearChain("filter", chain)
if err != nil { if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error()) klog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return return
} }
err = iptablesCmdHandler.DeleteChain("filter", chain) err = iptablesCmdHandler.DeleteChain("filter", chain)
if err != nil { if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error()) klog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return return
} }
} }
} }
// delete all ipsets // delete all ipsets
klog.V(1).Infof("Attempting to attain ipset mutex lock")
npc.ipsetMutex.Lock()
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
defer func() {
npc.ipsetMutex.Unlock()
klog.V(1).Infof("Returned ipset mutex lock")
}()
ipset, err := utils.NewIPSet(false) ipset, err := utils.NewIPSet(false)
if err != nil { if err != nil {
glog.Errorf("Failed to clean up ipsets: " + err.Error()) klog.Errorf("Failed to clean up ipsets: " + err.Error())
return
} }
err = ipset.Save() err = ipset.Save()
if err != nil { if err != nil {
glog.Errorf("Failed to clean up ipsets: " + err.Error()) klog.Errorf("Failed to clean up ipsets: " + err.Error())
} }
err = ipset.DestroyAllWithin() err = ipset.DestroyAllWithin()
if err != nil { if err != nil {
glog.Errorf("Failed to clean up ipsets: " + err.Error()) klog.Errorf("Failed to clean up ipsets: " + err.Error())
} }
glog.Infof("Successfully cleaned the iptables configuration done by kube-router") klog.Infof("Successfully cleaned the iptables configuration done by kube-router")
} }
// NewNetworkPolicyController returns new NetworkPolicyController object // NewNetworkPolicyController returns new NetworkPolicyController object
func NewNetworkPolicyController(clientset kubernetes.Interface, func NewNetworkPolicyController(clientset kubernetes.Interface,
config *config.Node, podInformer cache.SharedIndexInformer, config *config.Node, podInformer cache.SharedIndexInformer,
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) { npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{} npc := NetworkPolicyController{ipsetMutex: ipsetMutex}
// Creating a single-item buffered channel to ensure that we only keep a single full sync request at a time, // Creating a single-item buffered channel to ensure that we only keep a single full sync request at a time,
// additional requests would be pointless to queue since after the first one was processed the system would already // additional requests would be pointless to queue since after the first one was processed the system would already
@ -601,24 +645,12 @@ func NewNetworkPolicyController(clientset kubernetes.Interface,
return nil, err return nil, err
} }
npc.nodeHostName = node.Name
nodeIP, err := utils.GetNodeIP(node) nodeIP, err := utils.GetNodeIP(node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
npc.nodeIP = nodeIP npc.nodeIP = nodeIP
ipset, err := utils.NewIPSet(false)
if err != nil {
return nil, err
}
err = ipset.Save()
if err != nil {
return nil, err
}
npc.ipSetHandler = ipset
npc.podLister = podInformer.GetIndexer() npc.podLister = podInformer.GetIndexer()
npc.PodEventHandler = npc.newPodEventHandler() npc.PodEventHandler = npc.newPodEventHandler()

View File

@ -1,19 +1,24 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/network_policy_controller_test.go // - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/network_policy_controller_test.go
// +build !windows // +build !windows
package netpol package netpol
import ( import (
"bytes"
"context" "context"
"fmt"
"net" "net"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
"github.com/rancher/k3s/pkg/daemons/config"
netv1 "k8s.io/api/networking/v1" netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -23,8 +28,6 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"github.com/rancher/k3s/pkg/daemons/config"
) )
// newFakeInformersFromClient creates the different informers used in the uneventful network policy controller // newFakeInformersFromClient creates the different informers used in the uneventful network policy controller
@ -213,6 +216,7 @@ type tNetpolTestCase struct {
targetPods tPodNamespaceMap targetPods tPodNamespaceMap
inSourcePods tPodNamespaceMap inSourcePods tPodNamespaceMap
outDestPods tPodNamespaceMap outDestPods tPodNamespaceMap
expectedRule string
} }
// tGetNotTargetedPods finds set of pods that should not be targeted by netpol selectors // tGetNotTargetedPods finds set of pods that should not be targeted by netpol selectors
@ -417,6 +421,182 @@ func TestNewNetworkPolicySelectors(t *testing.T) {
} }
} }
func TestNetworkPolicyBuilder(t *testing.T) {
port, port1 := intstr.FromInt(30000), intstr.FromInt(34000)
ingressPort := intstr.FromInt(37000)
endPort, endPort1 := int32(31000), int32(35000)
testCases := []tNetpolTestCase{
{
name: "Simple Egress Destination Port",
netpol: tNetpol{name: "simple-egress", namespace: "nsA",
podSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: "In",
Values: []string{"a"},
},
},
},
egress: []netv1.NetworkPolicyEgressRule{
{
Ports: []netv1.NetworkPolicyPort{
{
Port: &port,
},
},
},
},
},
expectedRule: "-A KUBE-NWPLCY-QHFGOTFJZFXUJVTH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-QHFGOTFJZFXUJVTH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n",
},
{
name: "Simple Ingress/Egress Destination Port",
netpol: tNetpol{name: "simple-ingress-egress", namespace: "nsA",
podSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: "In",
Values: []string{"a"},
},
},
},
egress: []netv1.NetworkPolicyEgressRule{
{
Ports: []netv1.NetworkPolicyPort{
{
Port: &port,
},
},
},
},
ingress: []netv1.NetworkPolicyIngressRule{
{
Ports: []netv1.NetworkPolicyPort{
{
Port: &ingressPort,
},
},
},
},
},
expectedRule: "-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n" +
"-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -m mark --mark 0x10000/0x10000 -j RETURN \n",
},
{
name: "Simple Egress Destination Port Range",
netpol: tNetpol{name: "simple-egress-pr", namespace: "nsA",
podSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: "In",
Values: []string{"a"},
},
},
},
egress: []netv1.NetworkPolicyEgressRule{
{
Ports: []netv1.NetworkPolicyPort{
{
Port: &port,
EndPort: &endPort,
},
{
Port: &port1,
EndPort: &endPort1,
},
},
},
},
},
expectedRule: "-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -m mark --mark 0x10000/0x10000 -j RETURN \n" +
"-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -m mark --mark 0x10000/0x10000 -j RETURN \n",
},
{
name: "Port > EndPort (invalid condition, should drop endport)",
netpol: tNetpol{name: "invalid-endport", namespace: "nsA",
podSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: "In",
Values: []string{"a"},
},
},
},
egress: []netv1.NetworkPolicyEgressRule{
{
Ports: []netv1.NetworkPolicyPort{
{
Port: &port1,
EndPort: &endPort,
},
},
},
},
},
expectedRule: "-A KUBE-NWPLCY-2A4DPWPR5REBS66I -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-2A4DPWPR5REBS66I -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -m mark --mark 0x10000/0x10000 -j RETURN \n",
},
}
client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", "10.10.10.10")}})
informerFactory, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced)
krNetPol, _ := newUneventfulNetworkPolicyController(podInformer, netpolInformer, nsInformer)
tCreateFakePods(t, podInformer, nsInformer)
for _, test := range testCases {
test.netpol.createFakeNetpol(t, netpolInformer)
netpols, err := krNetPol.buildNetworkPoliciesInfo()
if err != nil {
t.Errorf("Problems building policies: %s", err)
}
for _, np := range netpols {
fmt.Printf(np.policyType)
if np.policyType == "egress" || np.policyType == "both" {
err = krNetPol.processEgressRules(np, "", nil, "1")
if err != nil {
t.Errorf("Error syncing the rules: %s", err)
}
}
if np.policyType == "ingress" || np.policyType == "both" {
err = krNetPol.processIngressRules(np, "", nil, "1")
if err != nil {
t.Errorf("Error syncing the rules: %s", err)
}
}
}
if !bytes.Equal([]byte(test.expectedRule), krNetPol.filterTableRules.Bytes()) {
t.Errorf("Invalid rule %s created:\nExpected:\n%s \nGot:\n%s", test.name, test.expectedRule, krNetPol.filterTableRules.String())
}
key := fmt.Sprintf("%s/%s", test.netpol.namespace, test.netpol.name)
obj, exists, err := krNetPol.npLister.GetByKey(key)
if err != nil {
t.Errorf("Failed to get Netpol from store: %s", err)
}
if exists {
err = krNetPol.npLister.Delete(obj)
if err != nil {
t.Errorf("Failed to remove Netpol from store: %s", err)
}
}
krNetPol.filterTableRules.Reset()
}
}
func TestNetworkPolicyController(t *testing.T) { func TestNetworkPolicyController(t *testing.T) {
testCases := []tNetPolConfigTestCase{ testCases := []tNetPolConfigTestCase{
{ {
@ -429,7 +609,7 @@ func TestNetworkPolicyController(t *testing.T) {
"Missing nodename fails appropriately", "Missing nodename fails appropriately",
newMinimalNodeConfig("", "", "", nil), newMinimalNodeConfig("", "", "", nil),
true, true,
"Failed to identify the node by NODE_NAME, hostname or --hostname-override", "failed to identify the node by NODE_NAME, hostname or --hostname-override",
}, },
{ {
"Test good cluster CIDR (using single IP with a /32)", "Test good cluster CIDR (using single IP with a /32)",
@ -466,7 +646,7 @@ func TestNetworkPolicyController(t *testing.T) {
_, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client) _, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client)
for _, test := range testCases { for _, test := range testCases {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
_, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer) _, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer, &sync.Mutex{})
if err == nil && test.expectError { if err == nil && test.expectError {
t.Error("This config should have failed, but it was successful instead") t.Error("This config should have failed, but it was successful instead")
} else if err != nil { } else if err != nil {

View File

@ -1,5 +1,5 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/pod.go // - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/pod.go
// +build !windows // +build !windows
@ -8,26 +8,40 @@ package netpol
import ( import (
"crypto/sha256" "crypto/sha256"
"encoding/base32" "encoding/base32"
"fmt"
"strings" "strings"
"github.com/coreos/go-iptables/iptables"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
glog "k8s.io/klog" "k8s.io/klog/v2"
) )
func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler { func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{ return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
npc.OnPodUpdate(obj) if podObj, ok := obj.(*api.Pod); ok {
// If the pod isn't yet actionable there is no action to take here anyway, so skip it. When it becomes
// actionable, we'll get an update below.
if isNetPolActionable(podObj) {
npc.OnPodUpdate(obj)
}
}
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
newPoObj := newObj.(*api.Pod) var newPodObj, oldPodObj *api.Pod
oldPoObj := oldObj.(*api.Pod) var ok bool
if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP {
// for the network policies, we are only interested in pod status phase change or IP change // If either of these objects are not pods, quit now
if newPodObj, ok = newObj.(*api.Pod); !ok {
return
}
if oldPodObj, ok = oldObj.(*api.Pod); !ok {
return
}
// We don't check isNetPolActionable here, because if it is transitioning in or out of the actionable state
// we want to run the full sync so that it can be added or removed from the existing network policy of the host
// For the network policies, we are only interested in some changes, most pod changes aren't relevant to network policy
if isPodUpdateNetPolRelevant(oldPodObj, newPodObj) {
npc.OnPodUpdate(newObj) npc.OnPodUpdate(newObj)
} }
}, },
@ -40,7 +54,7 @@ func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHand
// OnPodUpdate handles updates to pods from the Kubernetes api server // OnPodUpdate handles updates to pods from the Kubernetes api server
func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) { func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
pod := obj.(*api.Pod) pod := obj.(*api.Pod)
glog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name) klog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name)
npc.RequestFullSync() npc.RequestFullSync()
} }
@ -50,15 +64,15 @@ func (npc *NetworkPolicyController) handlePodDelete(obj interface{}) {
if !ok { if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok { if !ok {
glog.Errorf("unexpected object type: %v", obj) klog.Errorf("unexpected object type: %v", obj)
return return
} }
if pod, ok = tombstone.Obj.(*api.Pod); !ok { if pod, ok = tombstone.Obj.(*api.Pod); !ok {
glog.Errorf("unexpected object type: %v", obj) klog.Errorf("unexpected object type: %v", obj)
return return
} }
} }
glog.V(2).Infof("Received pod: %s/%s delete event", pod.Namespace, pod.Name) klog.V(2).Infof("Received pod: %s/%s delete event", pod.Namespace, pod.Name)
npc.RequestFullSync() npc.RequestFullSync()
} }
@ -67,315 +81,184 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []
activePodFwChains := make(map[string]bool) activePodFwChains := make(map[string]bool)
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("Failed to initialize iptables executor: %s", err.Error())
}
dropUnmarkedTrafficRules := func(podName, podNamespace, podFwChainName string) error { dropUnmarkedTrafficRules := func(podName, podNamespace, podFwChainName string) error {
// add rule to log the packets that will be dropped due to network policy enforcement // add rule to log the packets that will be dropped due to network policy enforcement
comment := "rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace comment := "\"rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace + "\""
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10"} args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10", "\n"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) // This used to be AppendUnique when we were using iptables directly, this checks to make sure we didn't drop unmarked for this chain already
if err != nil { if strings.Contains(npc.filterTableRules.String(), strings.Join(args, " ")) {
return fmt.Errorf("Failed to run iptables command: %s", err.Error()) return nil
} }
npc.filterTableRules.WriteString(strings.Join(args, " "))
// add rule to DROP if no applicable network policy permits the traffic // add rule to DROP if no applicable network policy permits the traffic
comment = "rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace comment = "\"rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace + "\""
args = []string{"-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT"} args = []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT", "\n"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
// reset mark to let traffic pass through rest of the chains // reset mark to let traffic pass through rest of the chains
args = []string{"-j", "MARK", "--set-mark", "0/0x10000"} args = []string{"-A", podFwChainName, "-j", "MARK", "--set-mark", "0/0x10000", "\n"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) npc.filterTableRules.WriteString(strings.Join(args, " "))
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
return nil return nil
} }
// loop through the pods running on the node which to which ingress network policies to be applied // loop through the pods running on the node
ingressNetworkPolicyEnabledPods, err := npc.getIngressNetworkPolicyEnabledPods(networkPoliciesInfo, npc.nodeIP.String()) allLocalPods, err := npc.getLocalPods(npc.nodeIP.String())
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, pod := range *ingressNetworkPolicyEnabledPods { for _, pod := range *allLocalPods {
// below condition occurs when we get trasient update while removing or adding pod
// subsequent update will do the correct action
if len(pod.ip) == 0 || pod.ip == "" {
continue
}
// ensure pod specific firewall chain exist for all the pods that need ingress firewall // ensure pod specific firewall chain exist for all the pods that need ingress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version) podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
err = iptablesCmdHandler.NewChain("filter", podFwChainName) npc.filterTableRules.WriteString(":" + podFwChainName + "\n")
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
activePodFwChains[podFwChainName] = true activePodFwChains[podFwChainName] = true
// add entries in pod firewall to run through required network policies // setup rules to run through applicable ingress/egress network policies for the pod
for _, policy := range networkPoliciesInfo { npc.setupPodNetpolRules(&pod, podFwChainName, networkPoliciesInfo, version)
if _, ok := policy.targetPods[pod.ip]; ok {
comment := "run through nw policy " + policy.name
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
}
}
comment := "rule to permit the traffic traffic to pods when source is the pod's local node" // setup rules to intercept inbound traffic to the pods
args := []string{"-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT"} npc.interceptPodInboundTraffic(&pod, podFwChainName)
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod // setup rules to intercept inbound traffic to the pods
comment = "rule for stateful firewall for pod" npc.interceptPodOutboundTraffic(&pod, podFwChainName)
args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting routed (coming for other node pods)
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain
// this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy
exists, err = iptablesCmdHandler.Exists("filter", kubeOutputChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", kubeOutputChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-d", pod.ip,
"-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err = iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName) err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
}
// loop through the pods running on the node which egress network policies to be applied // set mark to indicate traffic from/to the pod passed network policies.
egressNetworkPolicyEnabledPods, err := npc.getEgressNetworkPolicyEnabledPods(networkPoliciesInfo, npc.nodeIP.String()) // Mark will be checked to explicitly ACCEPT the traffic
if err != nil { comment := "\"set mark to ACCEPT traffic that comply to network policies\""
return nil, err args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-j", "MARK", "--set-mark", "0x20000/0x20000", "\n"}
} npc.filterTableRules.WriteString(strings.Join(args, " "))
for _, pod := range *egressNetworkPolicyEnabledPods {
// below condition occurs when we get trasient update while removing or adding pod
// subsequent update will do the correct action
if len(pod.ip) == 0 || pod.ip == "" {
continue
}
// ensure pod specific firewall chain exist for all the pods that need egress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
activePodFwChains[podFwChainName] = true
// add entries in pod firewall to run through required network policies
for _, policy := range networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; ok {
comment := "run through nw policy " + policy.name
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
}
}
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod
comment := "rule for stateful firewall for pod"
args := []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
egressFilterChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName}
for _, chain := range egressFilterChains {
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted
// to pod on a different node)
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", chain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.AppendUnique("filter", chain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
}
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-s", pod.ip,
"-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err = iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName)
if err != nil {
return nil, err
}
} }
return activePodFwChains, nil return activePodFwChains, nil
} }
func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIP string) (*map[string]podInfo, error) { // setup rules to jump to applicable network policy chains for the traffic from/to the pod
nodePods := make(map[string]podInfo) func (npc *NetworkPolicyController) setupPodNetpolRules(pod *podInfo, podFwChainName string, networkPoliciesInfo []networkPolicyInfo, version string) {
for _, obj := range npc.podLister.List() { hasIngressPolicy := false
pod := obj.(*api.Pod) hasEgressPolicy := false
if strings.Compare(pod.Status.HostIP, nodeIP) != 0 { // add entries in pod firewall to run through applicable network policies
for _, policy := range networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; !ok {
continue continue
} }
for _, policy := range networkPoliciesInfo { comment := "\"run through nw policy " + policy.name + "\""
if policy.namespace != pod.ObjectMeta.Namespace { policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
continue var args []string
} switch policy.policyType {
_, ok := policy.targetPods[pod.Status.PodIP] case "both":
if ok && (policy.policyType == "both" || policy.policyType == "ingress") { hasIngressPolicy = true
glog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.") hasEgressPolicy = true
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"}
name: pod.ObjectMeta.Name, case "ingress":
namespace: pod.ObjectMeta.Namespace, hasIngressPolicy = true
labels: pod.ObjectMeta.Labels} args = []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"}
break case "egress":
} hasEgressPolicy = true
args = []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"}
} }
npc.filterTableRules.WriteString(strings.Join(args, " "))
} }
return &nodePods, nil
// if pod does not have any network policy which applies rules for pod's ingress traffic
// then apply default network policy
if !hasIngressPolicy {
comment := "\"run through default ingress network policy chain\""
args := []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, "-j", kubeDefaultNetpolChain, "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
// if pod does not have any network policy which applies rules for pod's egress traffic
// then apply default network policy
if !hasEgressPolicy {
comment := "\"run through default egress network policy chain\""
args := []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, "-j", kubeDefaultNetpolChain, "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
comment := "\"rule to permit the traffic traffic to pods when source is the pod's local node\""
args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT", "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
// ensure statefull firewall that permits RELATED,ESTABLISHED traffic from/to the pod
comment = "\"rule for stateful firewall for pod\""
args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
} }
func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIP string) (*map[string]podInfo, error) { func (npc *NetworkPolicyController) interceptPodInboundTraffic(pod *podInfo, podFwChainName string) {
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting routed (coming for other node pods)
comment := "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName + "\""
args := []string{"-I", kubeForwardChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
nodePods := make(map[string]podInfo) // ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain
// this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy
args = []string{"-I", kubeOutputChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment = "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName + "\""
args = []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-d", pod.ip,
"-j", podFwChainName, "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
// setup iptable rules to intercept outbound traffic from pods and run it across the
// firewall chain corresponding to the pod so that egress network policies are enforced
func (npc *NetworkPolicyController) interceptPodOutboundTraffic(pod *podInfo, podFwChainName string) {
egressFilterChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName}
for _, chain := range egressFilterChains {
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted
// to pod on a different node)
comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName + "\""
args := []string{"-I", chain, "1", "-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName, "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName + "\""
args := []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-s", pod.ip,
"-j", podFwChainName, "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
func (npc *NetworkPolicyController) getLocalPods(nodeIP string) (*map[string]podInfo, error) {
localPods := make(map[string]podInfo)
for _, obj := range npc.podLister.List() { for _, obj := range npc.podLister.List() {
pod := obj.(*api.Pod) pod := obj.(*api.Pod)
// ignore the pods running on the different node and pods that are not actionable
if strings.Compare(pod.Status.HostIP, nodeIP) != 0 { if strings.Compare(pod.Status.HostIP, nodeIP) != 0 || !isNetPolActionable(pod) {
continue continue
} }
for _, policy := range networkPoliciesInfo { localPods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
if policy.namespace != pod.ObjectMeta.Namespace { name: pod.ObjectMeta.Name,
continue namespace: pod.ObjectMeta.Namespace,
} labels: pod.ObjectMeta.Labels}
_, ok := policy.targetPods[pod.Status.PodIP]
if ok && (policy.policyType == "both" || policy.policyType == "egress") {
glog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.")
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
name: pod.ObjectMeta.Name,
namespace: pod.ObjectMeta.Namespace,
labels: pod.ObjectMeta.Labels}
break
}
}
} }
return &nodePods, nil return &localPods, nil
} }
func podFirewallChainName(namespace, podName string, version string) string { func podFirewallChainName(namespace, podName string, version string) string {

View File

@ -1,5 +1,5 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/pod.go // - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/policy.go
// +build !windows // +build !windows
@ -14,7 +14,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/coreos/go-iptables/iptables"
"github.com/rancher/k3s/pkg/agent/netpol/utils" "github.com/rancher/k3s/pkg/agent/netpol/utils"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1" networking "k8s.io/api/networking/v1"
@ -23,7 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
listers "k8s.io/client-go/listers/core/v1" listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
glog "k8s.io/klog" "k8s.io/klog/v2"
) )
func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler { func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler {
@ -45,7 +44,7 @@ func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.Resourc
// OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server // OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) { func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
netpol := obj.(*networking.NetworkPolicy) netpol := obj.(*networking.NetworkPolicy)
glog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name) klog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name)
npc.RequestFullSync() npc.RequestFullSync()
} }
@ -55,15 +54,15 @@ func (npc *NetworkPolicyController) handleNetworkPolicyDelete(obj interface{}) {
if !ok { if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok { if !ok {
glog.Errorf("unexpected object type: %v", obj) klog.Errorf("unexpected object type: %v", obj)
return return
} }
if netpol, ok = tombstone.Obj.(*networking.NetworkPolicy); !ok { if netpol, ok = tombstone.Obj.(*networking.NetworkPolicy); !ok {
glog.Errorf("unexpected object type: %v", obj) klog.Errorf("unexpected object type: %v", obj)
return return
} }
} }
glog.V(2).Infof("Received network policy: %s/%s delete event", netpol.Namespace, netpol.Name) klog.V(2).Infof("Received network policy: %s/%s delete event", netpol.Namespace, netpol.Name)
npc.RequestFullSync() npc.RequestFullSync()
} }
@ -77,25 +76,36 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
start := time.Now() start := time.Now()
defer func() { defer func() {
endTime := time.Since(start) endTime := time.Since(start)
glog.V(2).Infof("Syncing network policy chains took %v", endTime) klog.V(2).Infof("Syncing network policy chains took %v", endTime)
}() }()
klog.V(1).Infof("Attempting to attain ipset mutex lock")
npc.ipsetMutex.Lock()
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
defer func() {
npc.ipsetMutex.Unlock()
klog.V(1).Infof("Returned ipset mutex lock")
}()
ipset, err := utils.NewIPSet(false)
if err != nil {
return nil, nil, err
}
err = ipset.Save()
if err != nil {
return nil, nil, err
}
npc.ipSetHandler = ipset
activePolicyChains := make(map[string]bool) activePolicyChains := make(map[string]bool)
activePolicyIPSets := make(map[string]bool) activePolicyIPSets := make(map[string]bool)
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("Failed to initialize iptables executor due to: %s", err.Error())
}
// run through all network policies // run through all network policies
for _, policy := range networkPoliciesInfo { for _, policy := range networkPoliciesInfo {
// ensure there is a unique chain per network policy in filter table // ensure there is a unique chain per network policy in filter table
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
err := iptablesCmdHandler.NewChain("filter", policyChainName) npc.filterTableRules.WriteString(":" + policyChainName + "\n")
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
activePolicyChains[policyChainName] = true activePolicyChains[policyChainName] = true
@ -107,42 +117,39 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
if policy.policyType == "both" || policy.policyType == "ingress" { if policy.policyType == "both" || policy.policyType == "ingress" {
// create a ipset for all destination pod ip's matched by the policy spec PodSelector // create a ipset for all destination pod ip's matched by the policy spec PodSelector
targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name) targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name)
targetDestPodIPSet, err := npc.ipSetHandler.Create(targetDestPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") setEntries := make([][]string, 0)
if err != nil { for _, podIP := range currnetPodIps {
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"})
}
err = targetDestPodIPSet.Refresh(currnetPodIps)
if err != nil {
glog.Errorf("failed to refresh targetDestPodIPSet,: " + err.Error())
} }
npc.ipSetHandler.RefreshSet(targetDestPodIPSetName, setEntries, utils.TypeHashIP)
err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version) err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
activePolicyIPSets[targetDestPodIPSet.Name] = true activePolicyIPSets[targetDestPodIPSetName] = true
} }
if policy.policyType == "both" || policy.policyType == "egress" { if policy.policyType == "both" || policy.policyType == "egress" {
// create a ipset for all source pod ip's matched by the policy spec PodSelector // create a ipset for all source pod ip's matched by the policy spec PodSelector
targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name) targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name)
targetSourcePodIPSet, err := npc.ipSetHandler.Create(targetSourcePodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") setEntries := make([][]string, 0)
if err != nil { for _, podIP := range currnetPodIps {
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"})
}
err = targetSourcePodIPSet.Refresh(currnetPodIps)
if err != nil {
glog.Errorf("failed to refresh targetSourcePodIPSet: " + err.Error())
} }
npc.ipSetHandler.RefreshSet(targetSourcePodIPSetName, setEntries, utils.TypeHashIP)
err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version) err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
activePolicyIPSets[targetSourcePodIPSet.Name] = true activePolicyIPSets[targetSourcePodIPSetName] = true
} }
} }
glog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.") err = npc.ipSetHandler.Restore()
if err != nil {
return nil, nil, fmt.Errorf("failed to perform ipset restore: %s", err.Error())
}
klog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.")
return activePolicyChains, activePolicyIPSets, nil return activePolicyChains, activePolicyIPSets, nil
} }
@ -156,11 +163,6 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
return nil return nil
} }
iptablesCmdHandler, err := iptables.New()
if err != nil {
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
}
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
// run through all the ingress rules in the spec and create iptables rules // run through all the ingress rules in the spec and create iptables rules
@ -169,21 +171,12 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if len(ingressRule.srcPods) != 0 { if len(ingressRule.srcPods) != 0 {
srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, i) srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, i)
srcPodIPSet, err := npc.ipSetHandler.Create(srcPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") activePolicyIPSets[srcPodIPSetName] = true
if err != nil { setEntries := make([][]string, 0)
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[srcPodIPSet.Name] = true
ingressRuleSrcPodIPs := make([]string, 0, len(ingressRule.srcPods))
for _, pod := range ingressRule.srcPods { for _, pod := range ingressRule.srcPods {
ingressRuleSrcPodIPs = append(ingressRuleSrcPodIPs, pod.ip) setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"})
}
err = srcPodIPSet.Refresh(ingressRuleSrcPodIPs)
if err != nil {
glog.Errorf("failed to refresh srcPodIPSet: " + err.Error())
} }
npc.ipSetHandler.RefreshSet(srcPodIPSetName, setEntries, utils.TypeHashIP)
if len(ingressRule.ports) != 0 { if len(ingressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the ingress rule // case where 'ports' details and 'from' details specified in the ingress rule
@ -191,7 +184,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
for _, portProtocol := range ingressRule.ports { for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err return err
} }
} }
@ -200,18 +193,16 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if len(ingressRule.namedPorts) != 0 { if len(ingressRule.namedPorts) != 0 {
for j, endPoints := range ingressRule.namedPorts { for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") activePolicyIPSets[namedPortIPSetName] = true
if err != nil { setEntries := make([][]string, 0)
return fmt.Errorf("failed to create ipset: %s", err.Error()) for _, ip := range endPoints.ips {
} setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
activePolicyIPSets[namedPortIPSet.Name] = true
err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
} }
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP)
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
return err return err
} }
} }
@ -222,7 +213,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
// so match on specified source and destination ip with all port and protocol // so match on specified source and destination ip with all port and protocol
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", "", ""); err != nil {
return err return err
} }
} }
@ -234,27 +225,23 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
for _, portProtocol := range ingressRule.ports { for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err return err
} }
} }
for j, endPoints := range ingressRule.namedPorts { for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") activePolicyIPSets[namedPortIPSetName] = true
if err != nil { setEntries := make([][]string, 0)
return fmt.Errorf("failed to create ipset: %s", err.Error()) for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
} }
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP)
activePolicyIPSets[namedPortIPSet.Name] = true
err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
}
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
return err return err
} }
} }
@ -265,47 +252,36 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if ingressRule.matchAllSource && ingressRule.matchAllPorts { if ingressRule.matchAllSource && ingressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, "", "", ""); err != nil {
return err return err
} }
} }
if len(ingressRule.srcIPBlocks) != 0 { if len(ingressRule.srcIPBlocks) != 0 {
srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, i) srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, i)
srcIPBlockIPSet, err := npc.ipSetHandler.Create(srcIPBlockIPSetName, utils.TypeHashNet, utils.OptionTimeout, "0") activePolicyIPSets[srcIPBlockIPSetName] = true
if err != nil { npc.ipSetHandler.RefreshSet(srcIPBlockIPSetName, ingressRule.srcIPBlocks, utils.TypeHashNet)
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[srcIPBlockIPSet.Name] = true
err = srcIPBlockIPSet.RefreshWithBuiltinOptions(ingressRule.srcIPBlocks)
if err != nil {
glog.Errorf("failed to refresh srcIPBlockIPSet: " + err.Error())
}
if !ingressRule.matchAllPorts { if !ingressRule.matchAllPorts {
for _, portProtocol := range ingressRule.ports { for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err return err
} }
} }
for j, endPoints := range ingressRule.namedPorts { for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") activePolicyIPSets[namedPortIPSetName] = true
if err != nil { setEntries := make([][]string, 0)
return fmt.Errorf("failed to create ipset: %s", err.Error()) for _, ip := range endPoints.ips {
} setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
activePolicyIPSets[namedPortIPSet.Name] = true
err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
} }
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashNet)
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
return err return err
} }
} }
@ -313,7 +289,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if ingressRule.matchAllPorts { if ingressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", "", ""); err != nil {
return err return err
} }
} }
@ -332,11 +308,6 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
return nil return nil
} }
iptablesCmdHandler, err := iptables.New()
if err != nil {
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
}
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
// run through all the egress rules in the spec and create iptables rules // run through all the egress rules in the spec and create iptables rules
@ -345,28 +316,19 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if len(egressRule.dstPods) != 0 { if len(egressRule.dstPods) != 0 {
dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, i) dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, i)
dstPodIPSet, err := npc.ipSetHandler.Create(dstPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") activePolicyIPSets[dstPodIPSetName] = true
if err != nil { setEntries := make([][]string, 0)
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[dstPodIPSet.Name] = true
egressRuleDstPodIps := make([]string, 0, len(egressRule.dstPods))
for _, pod := range egressRule.dstPods { for _, pod := range egressRule.dstPods {
egressRuleDstPodIps = append(egressRuleDstPodIps, pod.ip) setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"})
}
err = dstPodIPSet.Refresh(egressRuleDstPodIps)
if err != nil {
glog.Errorf("failed to refresh dstPodIPSet: " + err.Error())
} }
npc.ipSetHandler.RefreshSet(dstPodIPSetName, setEntries, utils.TypeHashIP)
if len(egressRule.ports) != 0 { if len(egressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the egress rule // case where 'ports' details and 'from' details specified in the egress rule
// so match on specified source and destination ip's and specified port (if any) and protocol // so match on specified source and destination ip's and specified port (if any) and protocol
for _, portProtocol := range egressRule.ports { for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err return err
} }
} }
@ -375,20 +337,15 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if len(egressRule.namedPorts) != 0 { if len(egressRule.namedPorts) != 0 {
for j, endPoints := range egressRule.namedPorts { for j, endPoints := range egressRule.namedPorts {
namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, i, j) namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") activePolicyIPSets[namedPortIPSetName] = true
if err != nil { setEntries := make([][]string, 0)
return fmt.Errorf("failed to create ipset: %s", err.Error()) for _, ip := range endPoints.ips {
} setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
activePolicyIPSets[namedPortIPSet.Name] = true
err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
} }
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP)
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
return err return err
} }
} }
@ -400,7 +357,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
// so match on specified source and destination ip with all port and protocol // so match on specified source and destination ip with all port and protocol
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", "", ""); err != nil {
return err return err
} }
} }
@ -412,7 +369,14 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
for _, portProtocol := range egressRule.ports { for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err
}
}
for _, portProtocol := range egressRule.namedPorts {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err return err
} }
} }
@ -423,26 +387,19 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if egressRule.matchAllDestinations && egressRule.matchAllPorts { if egressRule.matchAllDestinations && egressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", "", "", ""); err != nil {
return err return err
} }
} }
if len(egressRule.dstIPBlocks) != 0 { if len(egressRule.dstIPBlocks) != 0 {
dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, i) dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, i)
dstIPBlockIPSet, err := npc.ipSetHandler.Create(dstIPBlockIPSetName, utils.TypeHashNet, utils.OptionTimeout, "0") activePolicyIPSets[dstIPBlockIPSetName] = true
if err != nil { npc.ipSetHandler.RefreshSet(dstIPBlockIPSetName, egressRule.dstIPBlocks, utils.TypeHashNet)
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[dstIPBlockIPSet.Name] = true
err = dstIPBlockIPSet.RefreshWithBuiltinOptions(egressRule.dstIPBlocks)
if err != nil {
glog.Errorf("failed to refresh dstIPBlockIPSet: " + err.Error())
}
if !egressRule.matchAllPorts { if !egressRule.matchAllPorts {
for _, portProtocol := range egressRule.ports { for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err return err
} }
} }
@ -450,7 +407,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if egressRule.matchAllPorts { if egressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
policy.name + " namespace " + policy.namespace policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", ""); err != nil { if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", "", ""); err != nil {
return err return err
} }
} }
@ -459,13 +416,13 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
return nil return nil
} }
func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *iptables.IPTables, policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort string) error { func (npc *NetworkPolicyController) appendRuleToPolicyChain(policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort, endDport string) error {
if iptablesCmdHandler == nil {
return fmt.Errorf("Failed to run iptables command: iptablesCmdHandler is nil")
}
args := make([]string, 0) args := make([]string, 0)
args = append(args, "-A", policyChainName)
if comment != "" { if comment != "" {
args = append(args, "-m", "comment", "--comment", comment) args = append(args, "-m", "comment", "--comment", "\""+comment+"\"")
} }
if srcIPSetName != "" { if srcIPSetName != "" {
args = append(args, "-m", "set", "--match-set", srcIPSetName, "src") args = append(args, "-m", "set", "--match-set", srcIPSetName, "src")
@ -477,22 +434,19 @@ func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *
args = append(args, "-p", protocol) args = append(args, "-p", protocol)
} }
if dPort != "" { if dPort != "" {
args = append(args, "--dport", dPort) if endDport != "" {
multiport := fmt.Sprintf("%s:%s", dPort, endDport)
args = append(args, "--dport", multiport)
} else {
args = append(args, "--dport", dPort)
}
} }
markComment := "rule to mark traffic matching a network policy" markArgs := append(args, "-j", "MARK", "--set-xmark", "0x10000/0x10000", "\n")
markArgs := append(args, "-j", "MARK", "-m", "comment", "--comment", markComment, "--set-xmark", "0x10000/0x10000") npc.filterTableRules.WriteString(strings.Join(markArgs, " "))
err := iptablesCmdHandler.AppendUnique("filter", policyChainName, markArgs...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
returnComment := "rule to RETURN traffic matching a network policy" returnArgs := append(args, "-m", "mark", "--mark", "0x10000/0x10000", "-j", "RETURN", "\n")
returnArgs := append(args, "-m", "comment", "--comment", returnComment, "-m", "mark", "--mark", "0x10000/0x10000", "-j", "RETURN") npc.filterTableRules.WriteString(strings.Join(returnArgs, " "))
err = iptablesCmdHandler.AppendUnique("filter", policyChainName, returnArgs...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
return nil return nil
} }
@ -506,7 +460,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI
policy, ok := policyObj.(*networking.NetworkPolicy) policy, ok := policyObj.(*networking.NetworkPolicy)
podSelector, _ := v1.LabelSelectorAsSelector(&policy.Spec.PodSelector) podSelector, _ := v1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
if !ok { if !ok {
return nil, fmt.Errorf("Failed to convert") return nil, fmt.Errorf("failed to convert")
} }
newPolicy := networkPolicyInfo{ newPolicy := networkPolicyInfo{
name: policy.Name, name: policy.Name,
@ -537,7 +491,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI
namedPort2IngressEps := make(namedPort2eps) namedPort2IngressEps := make(namedPort2eps)
if err == nil { if err == nil {
for _, matchingPod := range matchingPods { for _, matchingPod := range matchingPods {
if matchingPod.Status.PodIP == "" { if !isNetPolActionable(matchingPod) {
continue continue
} }
newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
@ -573,7 +527,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI
for _, peer := range specIngressRule.From { for _, peer := range specIngressRule.From {
if peerPods, err := npc.evalPodPeer(policy, peer); err == nil { if peerPods, err := npc.evalPodPeer(policy, peer); err == nil {
for _, peerPod := range peerPods { for _, peerPod := range peerPods {
if peerPod.Status.PodIP == "" { if !isNetPolActionable(peerPod) {
continue continue
} }
ingressRule.srcPods = append(ingressRule.srcPods, ingressRule.srcPods = append(ingressRule.srcPods,
@ -609,12 +563,23 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI
// If this field is empty or missing in the spec, this rule matches all sources // If this field is empty or missing in the spec, this rule matches all sources
if len(specEgressRule.To) == 0 { if len(specEgressRule.To) == 0 {
egressRule.matchAllDestinations = true egressRule.matchAllDestinations = true
// if rule.To is empty but rule.Ports not, we must try to grab NamedPort from pods that in same namespace,
// so that we can design iptables rule to describe "match all dst but match some named dst-port" egress rule
if policyRulePortsHasNamedPort(specEgressRule.Ports) {
matchingPeerPods, _ := npc.ListPodsByNamespaceAndLabels(policy.Namespace, labels.Everything())
for _, peerPod := range matchingPeerPods {
if !isNetPolActionable(peerPod) {
continue
}
npc.grabNamedPortFromPod(peerPod, &namedPort2EgressEps)
}
}
} else { } else {
egressRule.matchAllDestinations = false egressRule.matchAllDestinations = false
for _, peer := range specEgressRule.To { for _, peer := range specEgressRule.To {
if peerPods, err := npc.evalPodPeer(policy, peer); err == nil { if peerPods, err := npc.evalPodPeer(policy, peer); err == nil {
for _, peerPod := range peerPods { for _, peerPod := range peerPods {
if peerPod.Status.PodIP == "" { if !isNetPolActionable(peerPod) {
continue continue
} }
egressRule.dstPods = append(egressRule.dstPods, egressRule.dstPods = append(egressRule.dstPods,
@ -683,13 +648,24 @@ func (npc *NetworkPolicyController) evalPodPeer(policy *networking.NetworkPolicy
func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networking.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) { func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networking.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) {
numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0) numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0)
for _, npPort := range npPorts { for _, npPort := range npPorts {
var protocol string
if npPort.Protocol != nil {
protocol = string(*npPort.Protocol)
}
if npPort.Port == nil { if npPort.Port == nil {
numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)}) numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: protocol})
} else if npPort.Port.Type == intstr.Int { } else if npPort.Port.Type == intstr.Int {
numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)}) var portproto protocolAndPort
if npPort.EndPort != nil {
if *npPort.EndPort >= npPort.Port.IntVal {
portproto.endport = strconv.Itoa(int(*npPort.EndPort))
}
}
portproto.protocol, portproto.port = protocol, npPort.Port.String()
numericPorts = append(numericPorts, portproto)
} else { } else {
if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok { if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok {
if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok { if numericPort2eps, ok := protocol2eps[protocol]; ok {
for _, eps := range numericPort2eps { for _, eps := range numericPort2eps {
namedPorts = append(namedPorts, *eps) namedPorts = append(namedPorts, *eps)
} }
@ -818,3 +794,12 @@ func policyIndexedEgressNamedPortIPSetName(namespace, policyName string, egressR
encoded := base32.StdEncoding.EncodeToString(hash[:]) encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeDestinationIPSetPrefix + encoded[:16] return kubeDestinationIPSetPrefix + encoded[:16]
} }
func policyRulePortsHasNamedPort(npPorts []networking.NetworkPolicyPort) bool {
for _, npPort := range npPorts {
if npPort.Port != nil && npPort.Port.Type == intstr.String {
return true
}
}
return false
}

66
pkg/agent/netpol/utils.go Normal file
View File

@ -0,0 +1,66 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/utils.go
package netpol
import (
"fmt"
"reflect"
"regexp"
"strconv"
api "k8s.io/api/core/v1"
)
const (
PodCompleted api.PodPhase = "Completed"
)
// isPodUpdateNetPolRelevant checks the attributes that we care about for building NetworkPolicies on the host and if it
// finds a relevant change, it returns true otherwise it returns false. The things we care about for NetworkPolicies:
// 1) Is the phase of the pod changing? (matters for catching completed, succeeded, or failed jobs)
// 2) Is the pod IP changing? (changes how the network policy is applied to the host)
// 3) Is the pod's host IP changing? (should be caught in the above, with the CNI kube-router runs with but we check this as well for sanity)
// 4) Is a pod's label changing? (potentially changes which NetworkPolicies select this pod)
func isPodUpdateNetPolRelevant(oldPod, newPod *api.Pod) bool {
return newPod.Status.Phase != oldPod.Status.Phase ||
newPod.Status.PodIP != oldPod.Status.PodIP ||
!reflect.DeepEqual(newPod.Status.PodIPs, oldPod.Status.PodIPs) ||
newPod.Status.HostIP != oldPod.Status.HostIP ||
!reflect.DeepEqual(newPod.Labels, oldPod.Labels)
}
func isNetPolActionable(pod *api.Pod) bool {
return !isFinished(pod) && pod.Status.PodIP != "" && !pod.Spec.HostNetwork
}
func isFinished(pod *api.Pod) bool {
switch pod.Status.Phase {
case api.PodFailed, api.PodSucceeded, PodCompleted:
return true
}
return false
}
func validateNodePortRange(nodePortOption string) (string, error) {
nodePortValidator := regexp.MustCompile(`^([0-9]+)[:-]([0-9]+)$`)
if matched := nodePortValidator.MatchString(nodePortOption); !matched {
return "", fmt.Errorf("failed to parse node port range given: '%s' please see specification in help text", nodePortOption)
}
matches := nodePortValidator.FindStringSubmatch(nodePortOption)
if len(matches) != 3 {
return "", fmt.Errorf("could not parse port number from range given: '%s'", nodePortOption)
}
port1, err := strconv.ParseUint(matches[1], 10, 16)
if err != nil {
return "", fmt.Errorf("could not parse first port number from range given: '%s'", nodePortOption)
}
port2, err := strconv.ParseUint(matches[2], 10, 16)
if err != nil {
return "", fmt.Errorf("could not parse second port number from range given: '%s'", nodePortOption)
}
if port1 >= port2 {
return "", fmt.Errorf("port 1 is greater than or equal to port 2 in range given: '%s'", nodePortOption)
}
return fmt.Sprintf("%d:%d", port1, port2), nil
}

View File

@ -1,5 +1,5 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/utils/ipset.go // - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/ipset.go
// +build !windows // +build !windows
@ -7,15 +7,18 @@ package utils
import ( import (
"bytes" "bytes"
"crypto/sha1"
"encoding/base32"
"errors" "errors"
"fmt" "fmt"
"os/exec" "os/exec"
"sort"
"strings" "strings"
) )
var ( var (
// Error returned when ipset binary is not found. // Error returned when ipset binary is not found.
errIpsetNotFound = errors.New("Ipset utility not found") errIpsetNotFound = errors.New("ipset utility not found")
) )
const ( const (
@ -82,6 +85,9 @@ const (
OptionNoMatch = "nomatch" OptionNoMatch = "nomatch"
// OptionForceAdd All hash set types support the optional forceadd parameter when creating a set. When sets created with this option become full the next addition to the set may succeed and evict a random entry from the set. // OptionForceAdd All hash set types support the optional forceadd parameter when creating a set. When sets created with this option become full the next addition to the set may succeed and evict a random entry from the set.
OptionForceAdd = "forceadd" OptionForceAdd = "forceadd"
// tmpIPSetPrefix Is the prefix added to temporary ipset names used in the atomic swap operations during ipset restore. You should never see these on your system because they only exist during the restore.
tmpIPSetPrefix = "TMP-"
) )
// IPSet represent ipset sets managed by. // IPSet represent ipset sets managed by.
@ -181,7 +187,7 @@ func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error
// Determine if set with the same name is already active on the system // Determine if set with the same name is already active on the system
setIsActive, err := ipset.Sets[setName].IsActive() setIsActive, err := ipset.Sets[setName].IsActive()
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to determine if ipset set %s exists: %s", return nil, fmt.Errorf("failed to determine if ipset set %s exists: %s",
setName, err) setName, err)
} }
@ -193,20 +199,20 @@ func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error
args = append(args, createOptions...) args = append(args, createOptions...)
args = append(args, "family", "inet6") args = append(args, "family", "inet6")
if _, err := ipset.run(args...); err != nil { if _, err := ipset.run(args...); err != nil {
return nil, fmt.Errorf("Failed to create ipset set on system: %s", err) return nil, fmt.Errorf("failed to create ipset set on system: %s", err)
} }
} else { } else {
_, err := ipset.run(append([]string{"create", "-exist", setName}, _, err := ipset.run(append([]string{"create", "-exist", setName},
createOptions...)...) createOptions...)...)
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to create ipset set on system: %s", err) return nil, fmt.Errorf("failed to create ipset set on system: %s", err)
} }
} }
} }
return ipset.Sets[setName], nil return ipset.Sets[setName], nil
} }
// Adds a given Set to an IPSet // Add a given Set to an IPSet
func (ipset *IPSet) Add(set *Set) error { func (ipset *IPSet) Add(set *Set) error {
_, err := ipset.Create(set.Name, set.Options...) _, err := ipset.Create(set.Name, set.Options...)
if err != nil { if err != nil {
@ -226,6 +232,22 @@ func (ipset *IPSet) Add(set *Set) error {
return nil return nil
} }
// RefreshSet add/update internal Sets with a Set of entries but does not run restore command
func (ipset *IPSet) RefreshSet(setName string, entriesWithOptions [][]string, setType string) {
if ipset.Get(setName) == nil {
ipset.Sets[setName] = &Set{
Name: setName,
Options: []string{setType, OptionTimeout, "0"},
Parent: ipset,
}
}
entries := make([]*Entry, len(entriesWithOptions))
for i, entry := range entriesWithOptions {
entries[i] = &Entry{Set: ipset.Sets[setName], Options: entry}
}
ipset.Get(setName).Entries = entries
}
// Add a given entry to the set. If the -exist option is specified, ipset // Add a given entry to the set. If the -exist option is specified, ipset
// ignores if the entry already added to the set. // ignores if the entry already added to the set.
// Note: if you need to add multiple entries (e.g., in a loop), use BatchAdd instead, // Note: if you need to add multiple entries (e.g., in a loop), use BatchAdd instead,
@ -243,7 +265,7 @@ func (set *Set) Add(addOptions ...string) (*Entry, error) {
return entry, nil return entry, nil
} }
// Adds given entries (with their options) to the set. // BatchAdd given entries (with their options) to the set.
// For multiple items, this is much faster than Add(). // For multiple items, this is much faster than Add().
func (set *Set) BatchAdd(addOptions [][]string) error { func (set *Set) BatchAdd(addOptions [][]string) error {
newEntries := make([]*Entry, len(addOptions)) newEntries := make([]*Entry, len(addOptions))
@ -389,14 +411,59 @@ func parseIPSetSave(ipset *IPSet, result string) map[string]*Set {
// create KUBE-DST-3YNVZWWGX3UQQ4VQ hash:ip family inet hashsize 1024 maxelem 65536 timeout 0 // create KUBE-DST-3YNVZWWGX3UQQ4VQ hash:ip family inet hashsize 1024 maxelem 65536 timeout 0
// add KUBE-DST-3YNVZWWGX3UQQ4VQ 100.96.1.6 timeout 0 // add KUBE-DST-3YNVZWWGX3UQQ4VQ 100.96.1.6 timeout 0
func buildIPSetRestore(ipset *IPSet) string { func buildIPSetRestore(ipset *IPSet) string {
ipSetRestore := "" setNames := make([]string, 0, len(ipset.Sets))
for _, set := range ipset.Sets { for setName := range ipset.Sets {
ipSetRestore += fmt.Sprintf("create %s %s\n", set.Name, strings.Join(set.Options[:], " ")) // we need setNames in some consistent order so that we can unit-test this method has a predictable output:
for _, entry := range set.Entries { setNames = append(setNames, setName)
ipSetRestore += fmt.Sprintf("add %s %s\n", set.Name, strings.Join(entry.Options[:], " "))
}
} }
return ipSetRestore
sort.Strings(setNames)
tmpSets := map[string]string{}
ipSetRestore := &strings.Builder{}
for _, setName := range setNames {
set := ipset.Sets[setName]
setOptions := strings.Join(set.Options, " ")
tmpSetName := tmpSets[setOptions]
if tmpSetName == "" {
// create a temporary set per unique set-options:
hash := sha1.Sum([]byte("tmp:" + setOptions))
tmpSetName = tmpIPSetPrefix + base32.StdEncoding.EncodeToString(hash[:10])
ipSetRestore.WriteString(fmt.Sprintf("create %s %s\n", tmpSetName, setOptions))
// just in case we are starting up after a crash, we should flush the TMP ipset to be safe if it
// already existed, so we do not pollute other ipsets:
ipSetRestore.WriteString(fmt.Sprintf("flush %s\n", tmpSetName))
tmpSets[setOptions] = tmpSetName
}
for _, entry := range set.Entries {
// add entries to the tmp set:
ipSetRestore.WriteString(fmt.Sprintf("add %s %s\n", tmpSetName, strings.Join(entry.Options, " ")))
}
// now create the actual IPSet (this is a noop if it already exists, because we run with -exists):
ipSetRestore.WriteString(fmt.Sprintf("create %s %s\n", set.Name, setOptions))
// now that both exist, we can swap them:
ipSetRestore.WriteString(fmt.Sprintf("swap %s %s\n", tmpSetName, set.Name))
// empty the tmp set (which is actually the old one now):
ipSetRestore.WriteString(fmt.Sprintf("flush %s\n", tmpSetName))
}
setsToDestroy := make([]string, 0, len(tmpSets))
for _, tmpSetName := range tmpSets {
setsToDestroy = append(setsToDestroy, tmpSetName)
}
// need to destroy the sets in a predictable order for unit test!
sort.Strings(setsToDestroy)
for _, tmpSetName := range setsToDestroy {
// finally, destroy the tmp sets.
ipSetRestore.WriteString(fmt.Sprintf("destroy %s\n", tmpSetName))
}
return ipSetRestore.String()
} }
// Save the given set, or all sets if none is given to stdout in a format that // Save the given set, or all sets if none is given to stdout in a format that
@ -489,7 +556,7 @@ func (set *Set) Refresh(entries []string, extraOptions ...string) error {
return set.RefreshWithBuiltinOptions(entriesWithOptions) return set.RefreshWithBuiltinOptions(entriesWithOptions)
} }
// Refresh a Set with new entries with built-in options. // RefreshWithBuiltinOptions refresh a Set with new entries with built-in options.
func (set *Set) RefreshWithBuiltinOptions(entries [][]string) error { func (set *Set) RefreshWithBuiltinOptions(entries [][]string) error {
var err error var err error

View File

@ -0,0 +1,77 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/ipset_test.go
package utils
import "testing"
func Test_buildIPSetRestore(t *testing.T) {
type args struct {
ipset *IPSet
}
tests := []struct {
name string
args args
want string
}{
{
name: "simple-restore",
args: args{
ipset: &IPSet{Sets: map[string]*Set{
"foo": {
Name: "foo",
Options: []string{"hash:ip", "yolo", "things", "12345"},
Entries: []*Entry{
{Options: []string{"1.2.3.4"}},
},
},
"google-dns-servers": {
Name: "google-dns-servers",
Options: []string{"hash:ip", "lol"},
Entries: []*Entry{
{Options: []string{"4.4.4.4"}},
{Options: []string{"8.8.8.8"}},
},
},
// this one and the one above share the same exact options -- and therefore will reuse the same
// tmp ipset:
"more-ip-addresses": {
Name: "google-dns-servers",
Options: []string{"hash:ip", "lol"},
Entries: []*Entry{
{Options: []string{"5.5.5.5"}},
{Options: []string{"6.6.6.6"}},
},
},
}},
},
want: "create TMP-7NOTZDOMLXBX6DAJ hash:ip yolo things 12345\n" +
"flush TMP-7NOTZDOMLXBX6DAJ\n" +
"add TMP-7NOTZDOMLXBX6DAJ 1.2.3.4\n" +
"create foo hash:ip yolo things 12345\n" +
"swap TMP-7NOTZDOMLXBX6DAJ foo\n" +
"flush TMP-7NOTZDOMLXBX6DAJ\n" +
"create TMP-XD7BSSQZELS7TP35 hash:ip lol\n" +
"flush TMP-XD7BSSQZELS7TP35\n" +
"add TMP-XD7BSSQZELS7TP35 4.4.4.4\n" +
"add TMP-XD7BSSQZELS7TP35 8.8.8.8\n" +
"create google-dns-servers hash:ip lol\n" +
"swap TMP-XD7BSSQZELS7TP35 google-dns-servers\n" +
"flush TMP-XD7BSSQZELS7TP35\n" +
"add TMP-XD7BSSQZELS7TP35 5.5.5.5\n" +
"add TMP-XD7BSSQZELS7TP35 6.6.6.6\n" +
"create google-dns-servers hash:ip lol\n" +
"swap TMP-XD7BSSQZELS7TP35 google-dns-servers\n" +
"flush TMP-XD7BSSQZELS7TP35\n" +
"destroy TMP-7NOTZDOMLXBX6DAJ\n" +
"destroy TMP-XD7BSSQZELS7TP35\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := buildIPSetRestore(tt.args.ipset); got != tt.want {
t.Errorf("buildIPSetRestore() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -0,0 +1,75 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/iptables.go
package utils
import (
"bytes"
"fmt"
"os/exec"
"strings"
)
var hasWait bool
func init() {
path, err := exec.LookPath("iptables-restore")
if err != nil {
return
}
args := []string{"iptables-restore", "--help"}
cmd := exec.Cmd{
Path: path,
Args: args,
}
cmdOutput, err := cmd.CombinedOutput()
if err != nil {
return
}
hasWait = strings.Contains(string(cmdOutput), "wait")
}
// SaveInto calls `iptables-save` for given table and stores result in a given buffer.
func SaveInto(table string, buffer *bytes.Buffer) error {
path, err := exec.LookPath("iptables-save")
if err != nil {
return err
}
stderrBuffer := bytes.NewBuffer(nil)
args := []string{"iptables-save", "-t", table}
cmd := exec.Cmd{
Path: path,
Args: args,
Stdout: buffer,
Stderr: stderrBuffer,
}
if err := cmd.Run(); err != nil {
return fmt.Errorf("%v (%s)", err, stderrBuffer)
}
return nil
}
// Restore runs `iptables-restore` passing data through []byte.
func Restore(table string, data []byte) error {
path, err := exec.LookPath("iptables-restore")
if err != nil {
return err
}
var args []string
if hasWait {
args = []string{"iptables-restore", "--wait", "-T", table}
} else {
args = []string{"iptables-restore", "-T", table}
}
cmd := exec.Cmd{
Path: path,
Args: args,
Stdin: bytes.NewBuffer(data),
}
b, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%v (%s)", err, b)
}
return nil
}

View File

@ -1,5 +1,5 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) // Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/utils/node.go // - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/node.go
// +build !windows // +build !windows
@ -43,7 +43,7 @@ func GetNodeObject(clientset kubernetes.Interface, hostnameOverride string) (*ap
} }
} }
return nil, fmt.Errorf("Failed to identify the node by NODE_NAME, hostname or --hostname-override") return nil, fmt.Errorf("failed to identify the node by NODE_NAME, hostname or --hostname-override")
} }
// GetNodeIP returns the most valid external facing IP address for a node. // GetNodeIP returns the most valid external facing IP address for a node.

View File

@ -30,7 +30,7 @@ func keyHash(passphrase string) string {
} }
// encrypt encrypts a byte slice using aes+gcm with a pbkdf2 key derived from the passphrase and a random salt. // encrypt encrypts a byte slice using aes+gcm with a pbkdf2 key derived from the passphrase and a random salt.
// It returns a byte slice containing the salt and base64-encoded cyphertext. // It returns a byte slice containing the salt and base64-encoded ciphertext.
func encrypt(passphrase string, plaintext []byte) ([]byte, error) { func encrypt(passphrase string, plaintext []byte) ([]byte, error) {
salt, err := token.Random(8) salt, err := token.Random(8)
if err != nil { if err != nil {
@ -59,7 +59,7 @@ func encrypt(passphrase string, plaintext []byte) ([]byte, error) {
} }
// decrypt attempts to decrypt the byte slice using the supplied passphrase. // decrypt attempts to decrypt the byte slice using the supplied passphrase.
// The input byte slice should be the cyphertext output from the encrypt function. // The input byte slice should be the ciphertext output from the encrypt function.
func decrypt(passphrase string, ciphertext []byte) ([]byte, error) { func decrypt(passphrase string, ciphertext []byte) ([]byte, error) {
parts := strings.SplitN(string(ciphertext), ":", 2) parts := strings.SplitN(string(ciphertext), ":", 2)
if len(parts) != 2 { if len(parts) != 2 {

1
vendor/modules.txt vendored
View File

@ -2140,6 +2140,7 @@ k8s.io/heapster/metrics/api/v1/types
## explicit ## explicit
k8s.io/klog k8s.io/klog
# k8s.io/klog/v2 v2.8.0 # k8s.io/klog/v2 v2.8.0
## explicit
k8s.io/klog/v2 k8s.io/klog/v2
# k8s.io/kube-aggregator v0.18.0 => github.com/k3s-io/kubernetes/staging/src/k8s.io/kube-aggregator v1.21.2-k3s1 # k8s.io/kube-aggregator v0.18.0 => github.com/k3s-io/kubernetes/staging/src/k8s.io/kube-aggregator v1.21.2-k3s1
k8s.io/kube-aggregator/pkg/apis/apiregistration k8s.io/kube-aggregator/pkg/apis/apiregistration