diff --git a/go.mod b/go.mod index 7136e3d899..f70f5dc2c1 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/containerd/ttrpc v0.0.0-20190828172938-92c8520ef9f8 // indirect github.com/containernetworking/plugins v0.8.2 // indirect github.com/coreos/flannel v0.11.0 + github.com/coreos/go-iptables v0.4.2 github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e github.com/docker/docker v0.7.3-0.20190731001754-589f1dad8dad github.com/docker/go-metrics v0.0.1 // indirect diff --git a/pkg/agent/config/config.go b/pkg/agent/config/config.go index 8bfb95ebdb..81131242c5 100644 --- a/pkg/agent/config/config.go +++ b/pkg/agent/config/config.go @@ -399,6 +399,7 @@ func get(envInfo *cmds.Agent) (*config.Node, error) { nodeConfig.AgentConfig.NodeLabels = envInfo.Labels nodeConfig.AgentConfig.PrivateRegistry = envInfo.PrivateRegistry nodeConfig.AgentConfig.DisableCCM = controlConfig.DisableCCM + nodeConfig.AgentConfig.DisableNPC = controlConfig.DisableNPC return nodeConfig, nil } diff --git a/pkg/agent/netpol/network_policy.go b/pkg/agent/netpol/network_policy.go new file mode 100644 index 0000000000..97eac08a90 --- /dev/null +++ b/pkg/agent/netpol/network_policy.go @@ -0,0 +1,31 @@ +package netpol + +import ( + "context" + "time" + + "github.com/rancher/k3s/pkg/daemons/config" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +func Run(ctx context.Context, nodeConfig *config.Node) error { + restConfig, err := clientcmd.BuildConfigFromFlags("", nodeConfig.AgentConfig.KubeConfigNode) + if err != nil { + return err + } + + client, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return err + } + + npc, err := NewNetworkPolicyController(ctx.Done(), client, time.Minute, nodeConfig.AgentConfig.NodeName) + if err != nil { + return err + } + + go npc.Run(ctx.Done()) + + return nil +} diff --git a/pkg/agent/netpol/network_policy_controller.go b/pkg/agent/netpol/network_policy_controller.go new file mode 100644 index 0000000000..ae35bb1c58 --- /dev/null +++ b/pkg/agent/netpol/network_policy_controller.go @@ -0,0 +1,1742 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/blob/d6f9f31a7b/pkg/controllers/netpol/network_policy_controller.go + +package netpol + +import ( + "crypto/sha256" + "encoding/base32" + "errors" + "fmt" + "net" + "regexp" + "strconv" + "strings" + "sync" + "time" + + // "github.com/cloudnativelabs/kube-router/pkg/healthcheck" + // "github.com/cloudnativelabs/kube-router/pkg/metrics" + // "github.com/cloudnativelabs/kube-router/pkg/options" + // "github.com/cloudnativelabs/kube-router/pkg/utils" + + "github.com/coreos/go-iptables/iptables" + api "k8s.io/api/core/v1" + apiextensions "k8s.io/api/extensions/v1beta1" + networking "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + log "k8s.io/klog" +) + +const ( + networkPolicyAnnotation = "net.beta.kubernetes.io/network-policy" + kubePodFirewallChainPrefix = "KUBE-POD-FW-" + kubeNetworkPolicyChainPrefix = "KUBE-NWPLCY-" + kubeSourceIPSetPrefix = "KUBE-SRC-" + kubeDestinationIPSetPrefix = "KUBE-DST-" +) + +// Network policy controller provides both ingress and egress filtering for the pods as per the defined network +// policies. Two different types of iptables chains are used. Each pod running on the node which either +// requires ingress or egress filtering gets a pod specific chains. Each network policy has a iptables chain, which +// has rules expressed through ipsets matching source and destination pod ip's. In the FORWARD chain of the +// filter table a rule is added to jump the traffic originating (in case of egress network policy) from the pod +// or destined (in case of ingress network policy) to the pod specific iptables chain. Each +// pod specific iptables chain has rules to jump to the network polices chains, that pod matches. So packet +// originating/destined from/to pod goes through fitler table's, FORWARD chain, followed by pod specific chain, +// followed by one or more network policy chains, till there is a match which will accept the packet, or gets +// dropped by the rule in the pod chain, if there is no match. + +// NetworkPolicyController strcut to hold information required by NetworkPolicyController +type NetworkPolicyController struct { + nodeIP net.IP + nodeHostName string + mu sync.Mutex + syncPeriod time.Duration + MetricsEnabled bool + v1NetworkPolicy bool + readyForUpdates bool + // healthChan chan<- *healthcheck.ControllerHeartbeat + + // list of all active network policies expressed as networkPolicyInfo + networkPoliciesInfo *[]networkPolicyInfo + ipSetHandler *IPSet + + podLister cache.Indexer + npLister cache.Indexer + nsLister cache.Indexer + + PodEventHandler cache.ResourceEventHandler + NamespaceEventHandler cache.ResourceEventHandler + NetworkPolicyEventHandler cache.ResourceEventHandler +} + +// internal structure to represent a network policy +type networkPolicyInfo struct { + name string + namespace string + labels map[string]string + + // set of pods matching network policy spec podselector label selector + targetPods map[string]podInfo + + // whitelist ingress rules from the network policy spec + ingressRules []ingressRule + + // whitelist egress rules from the network policy spec + egressRules []egressRule + + // policy type "ingress" or "egress" or "both" as defined by PolicyType in the spec + policyType string +} + +// internal structure to represent Pod +type podInfo struct { + ip string + name string + namespace string + labels map[string]string +} + +// internal structure to represent NetworkPolicyIngressRule in the spec +type ingressRule struct { + matchAllPorts bool + ports []protocolAndPort + namedPorts []endPoints + matchAllSource bool + srcPods []podInfo + srcIPBlocks [][]string +} + +// internal structure to represent NetworkPolicyEgressRule in the spec +type egressRule struct { + matchAllPorts bool + ports []protocolAndPort + namedPorts []endPoints + matchAllDestinations bool + dstPods []podInfo + dstIPBlocks [][]string +} + +type protocolAndPort struct { + protocol string + port string +} + +type endPoints struct { + ips []string + protocolAndPort +} + +type numericPort2eps map[string]*endPoints +type protocol2eps map[string]numericPort2eps +type namedPort2eps map[string]protocol2eps + +// Run runs forver till we receive notification on stopCh +func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) { + t := time.NewTicker(npc.syncPeriod) + defer t.Stop() + + log.Info("Starting network policy controller") + // npc.healthChan = healthChan + + // loop forever till notified to stop on stopCh + for { + select { + case <-stopCh: + log.Info("Shutting down network policies controller") + return + default: + } + + log.V(1).Info("Performing periodic sync of iptables to reflect network policies") + err := npc.Sync() + if err != nil { + log.Errorf("Error during periodic sync of network policies in network policy controller. Error: " + err.Error()) + log.Errorf("Skipping sending heartbeat from network policy controller as periodic sync failed.") + } + // else { + // healthcheck.SendHeartBeat(healthChan, "NPC") + // } + npc.readyForUpdates = true + select { + case <-stopCh: + log.Infof("Shutting down network policies controller") + return + case <-t.C: + } + } +} + +// OnPodUpdate handles updates to pods from the Kubernetes api server +func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) { + pod := obj.(*api.Pod) + log.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name) + + if !npc.readyForUpdates { + log.V(3).Infof("Skipping update to pod: %s/%s, controller still performing bootup full-sync", pod.Namespace, pod.Name) + return + } + + err := npc.Sync() + if err != nil { + log.Errorf("Error syncing network policy for the update to pod: %s/%s Error: %s", pod.Namespace, pod.Name, err) + } +} + +// OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server +func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) { + netpol := obj.(*networking.NetworkPolicy) + log.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name) + + if !npc.readyForUpdates { + log.V(3).Infof("Skipping update to network policy: %s/%s, controller still performing bootup full-sync", netpol.Namespace, netpol.Name) + return + } + + err := npc.Sync() + if err != nil { + log.Errorf("Error syncing network policy for the update to network policy: %s/%s Error: %s", netpol.Namespace, netpol.Name, err) + } +} + +// OnNamespaceUpdate handles updates to namespace from kubernetes api server +func (npc *NetworkPolicyController) OnNamespaceUpdate(obj interface{}) { + namespace := obj.(*api.Namespace) + // namespace (and annotations on it) has no significance in GA ver of network policy + if npc.v1NetworkPolicy { + return + } + log.V(2).Infof("Received update for namespace: %s", namespace.Name) + + err := npc.Sync() + if err != nil { + log.Errorf("Error syncing on namespace update: %s", err) + } +} + +// Sync synchronizes iptables to desired state of network policies +func (npc *NetworkPolicyController) Sync() error { + + var err error + npc.mu.Lock() + defer npc.mu.Unlock() + + // healthcheck.SendHeartBeat(npc.healthChan, "NPC") + start := time.Now() + syncVersion := strconv.FormatInt(start.UnixNano(), 10) + defer func() { + endTime := time.Since(start) + // if npc.MetricsEnabled { + // metrics.ControllerIPtablesSyncTime.Observe(endTime.Seconds()) + // } + log.V(1).Infof("sync iptables took %v", endTime) + }() + + log.V(1).Infof("Starting sync of iptables with version: %s", syncVersion) + if npc.v1NetworkPolicy { + npc.networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo() + if err != nil { + return errors.New("Aborting sync. Failed to build network policies: " + err.Error()) + } + } else { + // TODO remove the Beta support + npc.networkPoliciesInfo, err = npc.buildBetaNetworkPoliciesInfo() + if err != nil { + return errors.New("Aborting sync. Failed to build network policies: " + err.Error()) + } + } + + activePolicyChains, activePolicyIPSets, err := npc.syncNetworkPolicyChains(syncVersion) + if err != nil { + return errors.New("Aborting sync. Failed to sync network policy chains: " + err.Error()) + } + + activePodFwChains, err := npc.syncPodFirewallChains(syncVersion) + if err != nil { + return errors.New("Aborting sync. Failed to sync pod firewalls: " + err.Error()) + } + + err = cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets) + if err != nil { + return errors.New("Aborting sync. Failed to cleanup stale iptables rules: " + err.Error()) + } + + return nil +} + +// Configure iptables rules representing each network policy. All pod's matched by +// network policy spec podselector labels are grouped together in one ipset which +// is used for matching destination ip address. Each ingress rule in the network +// policyspec is evaluated to set of matching pods, which are grouped in to a +// ipset used for source ip addr matching. +func (npc *NetworkPolicyController) syncNetworkPolicyChains(version string) (map[string]bool, map[string]bool, error) { + start := time.Now() + defer func() { + endTime := time.Since(start) + // metrics.ControllerPolicyChainsSyncTime.Observe(endTime.Seconds()) + log.V(2).Infof("Syncing network policy chains took %v", endTime) + }() + activePolicyChains := make(map[string]bool) + activePolicyIPSets := make(map[string]bool) + + iptablesCmdHandler, err := iptables.New() + if err != nil { + log.Fatalf("Failed to initialize iptables executor due to: %s", err.Error()) + } + + // run through all network policies + for _, policy := range *npc.networkPoliciesInfo { + + // ensure there is a unique chain per network policy in filter table + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) + err := iptablesCmdHandler.NewChain("filter", policyChainName) + if err != nil && err.(*iptables.Error).ExitStatus() != 1 { + return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + + activePolicyChains[policyChainName] = true + + // create a ipset for all destination pod ip's matched by the policy spec PodSelector + targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name) + targetDestPodIPSet, err := npc.ipSetHandler.Create(targetDestPodIPSetName, TypeHashIP, OptionTimeout, "0") + if err != nil { + return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) + } + + // create a ipset for all source pod ip's matched by the policy spec PodSelector + targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name) + targetSourcePodIPSet, err := npc.ipSetHandler.Create(targetSourcePodIPSetName, TypeHashIP, OptionTimeout, "0") + if err != nil { + return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) + } + + activePolicyIPSets[targetDestPodIPSet.Name] = true + activePolicyIPSets[targetSourcePodIPSet.Name] = true + + currnetPodIPs := make([]string, 0, len(policy.targetPods)) + for ip := range policy.targetPods { + currnetPodIPs = append(currnetPodIPs, ip) + } + + err = targetSourcePodIPSet.Refresh(currnetPodIPs, OptionTimeout, "0") + if err != nil { + log.Errorf("failed to refresh targetSourcePodIPSet: " + err.Error()) + } + err = targetDestPodIPSet.Refresh(currnetPodIPs, OptionTimeout, "0") + if err != nil { + log.Errorf("failed to refresh targetDestPodIPSet: " + err.Error()) + } + + err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version) + if err != nil { + return nil, nil, err + } + + err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version) + if err != nil { + return nil, nil, err + } + } + + log.V(2).Infof("IPtables chains in the filter table are synchronized with the network policies.") + + return activePolicyChains, activePolicyIPSets, nil +} + +func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo, + targetDestPodIPSetName string, activePolicyIPSets map[string]bool, version string) error { + + // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " + // so no whitelist rules to be added to the network policy + if policy.ingressRules == nil { + return nil + } + + iptablesCmdHandler, err := iptables.New() + if err != nil { + return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error()) + } + + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) + + // run through all the ingress rules in the spec and create iptables rules + // in the chain for the network policy + for i, ingressRule := range policy.ingressRules { + + if len(ingressRule.srcPods) != 0 { + srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, i) + srcPodIPSet, err := npc.ipSetHandler.Create(srcPodIPSetName, TypeHashIP, OptionTimeout, "0") + if err != nil { + return fmt.Errorf("failed to create ipset: %s", err.Error()) + } + + activePolicyIPSets[srcPodIPSet.Name] = true + + ingressRuleSrcPodIPs := make([]string, 0, len(ingressRule.srcPods)) + for _, pod := range ingressRule.srcPods { + ingressRuleSrcPodIPs = append(ingressRuleSrcPodIPs, pod.ip) + } + err = srcPodIPSet.Refresh(ingressRuleSrcPodIPs, OptionTimeout, "0") + if err != nil { + log.Errorf("failed to refresh srcPodIPSet: " + err.Error()) + } + + if len(ingressRule.ports) != 0 { + // case where 'ports' details and 'from' details specified in the ingress rule + // so match on specified source and destination ip's and specified port (if any) and protocol + for _, portProtocol := range ingressRule.ports { + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { + return err + } + } + } + + if len(ingressRule.namedPorts) != 0 { + for j, endPoints := range ingressRule.namedPorts { + namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) + namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, TypeHashIP, OptionTimeout, "0") + if err != nil { + return fmt.Errorf("failed to create ipset: %s", err.Error()) + } + activePolicyIPSets[namedPortIPSet.Name] = true + err = namedPortIPSet.Refresh(endPoints.ips, OptionTimeout, "0") + if err != nil { + log.Errorf("failed to refresh namedPortIPSet: " + err.Error()) + } + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { + return err + } + } + } + + if len(ingressRule.ports) == 0 && len(ingressRule.namedPorts) == 0 { + // case where no 'ports' details specified in the ingress rule but 'from' details specified + // so match on specified source and destination ip with all port and protocol + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", ""); err != nil { + return err + } + } + } + + // case where only 'ports' details specified but no 'from' details in the ingress rule + // so match on all sources, with specified port (if any) and protocol + if ingressRule.matchAllSource && !ingressRule.matchAllPorts { + for _, portProtocol := range ingressRule.ports { + comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { + return err + } + } + + for j, endPoints := range ingressRule.namedPorts { + namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) + namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, TypeHashIP, OptionTimeout, "0") + if err != nil { + return fmt.Errorf("failed to create ipset: %s", err.Error()) + } + + activePolicyIPSets[namedPortIPSet.Name] = true + + err = namedPortIPSet.Refresh(endPoints.ips, OptionTimeout, "0") + if err != nil { + log.Errorf("failed to refresh namedPortIPSet: " + err.Error()) + } + comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { + return err + } + } + } + + // case where nether ports nor from details are speified in the ingress rule + // so match on all ports, protocol, source IP's + if ingressRule.matchAllSource && ingressRule.matchAllPorts { + comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, "", ""); err != nil { + return err + } + } + + if len(ingressRule.srcIPBlocks) != 0 { + srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, i) + srcIPBlockIPSet, err := npc.ipSetHandler.Create(srcIPBlockIPSetName, TypeHashNet, OptionTimeout, "0") + if err != nil { + return fmt.Errorf("failed to create ipset: %s", err.Error()) + } + activePolicyIPSets[srcIPBlockIPSet.Name] = true + err = srcIPBlockIPSet.RefreshWithBuiltinOptions(ingressRule.srcIPBlocks) + if err != nil { + log.Errorf("failed to refresh srcIPBlockIPSet: " + err.Error()) + } + if !ingressRule.matchAllPorts { + for _, portProtocol := range ingressRule.ports { + comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { + return err + } + } + + for j, endPoints := range ingressRule.namedPorts { + namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) + namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, TypeHashIP, OptionTimeout, "0") + if err != nil { + return fmt.Errorf("failed to create ipset: %s", err.Error()) + } + + activePolicyIPSets[namedPortIPSet.Name] = true + + err = namedPortIPSet.Refresh(endPoints.ips, OptionTimeout, "0") + if err != nil { + log.Errorf("failed to refresh namedPortIPSet: " + err.Error()) + } + comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { + return err + } + } + } + if ingressRule.matchAllPorts { + comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", ""); err != nil { + return err + } + } + } + } + + return nil +} + +func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, + targetSourcePodIPSetName string, activePolicyIPSets map[string]bool, version string) error { + + // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " + // so no whitelist rules to be added to the network policy + if policy.egressRules == nil { + return nil + } + + iptablesCmdHandler, err := iptables.New() + if err != nil { + return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error()) + } + + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) + + // run through all the egress rules in the spec and create iptables rules + // in the chain for the network policy + for i, egressRule := range policy.egressRules { + + if len(egressRule.dstPods) != 0 { + dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, i) + dstPodIPSet, err := npc.ipSetHandler.Create(dstPodIPSetName, TypeHashIP, OptionTimeout, "0") + if err != nil { + return fmt.Errorf("failed to create ipset: %s", err.Error()) + } + + activePolicyIPSets[dstPodIPSet.Name] = true + + egressRuleDstPodIPs := make([]string, 0, len(egressRule.dstPods)) + for _, pod := range egressRule.dstPods { + egressRuleDstPodIPs = append(egressRuleDstPodIPs, pod.ip) + } + err = dstPodIPSet.Refresh(egressRuleDstPodIPs, OptionTimeout, "0") + if err != nil { + log.Errorf("failed to refresh dstPodIPSet: " + err.Error()) + } + if len(egressRule.ports) != 0 { + // case where 'ports' details and 'from' details specified in the egress rule + // so match on specified source and destination ip's and specified port (if any) and protocol + for _, portProtocol := range egressRule.ports { + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { + return err + } + } + } + + if len(egressRule.namedPorts) != 0 { + for j, endPoints := range egressRule.namedPorts { + namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, i, j) + namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, TypeHashIP, OptionTimeout, "0") + if err != nil { + return fmt.Errorf("failed to create ipset: %s", err.Error()) + } + + activePolicyIPSets[namedPortIPSet.Name] = true + + err = namedPortIPSet.Refresh(endPoints.ips, OptionTimeout, "0") + if err != nil { + log.Errorf("failed to refresh namedPortIPSet: " + err.Error()) + } + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { + return err + } + } + + } + + if len(egressRule.ports) == 0 && len(egressRule.namedPorts) == 0 { + // case where no 'ports' details specified in the ingress rule but 'from' details specified + // so match on specified source and destination ip with all port and protocol + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", ""); err != nil { + return err + } + } + } + + // case where only 'ports' details specified but no 'to' details in the egress rule + // so match on all sources, with specified port (if any) and protocol + if egressRule.matchAllDestinations && !egressRule.matchAllPorts { + for _, portProtocol := range egressRule.ports { + comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port); err != nil { + return err + } + } + } + + // case where nether ports nor from details are speified in the egress rule + // so match on all ports, protocol, source IP's + if egressRule.matchAllDestinations && egressRule.matchAllPorts { + comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", "", ""); err != nil { + return err + } + } + if len(egressRule.dstIPBlocks) != 0 { + dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, i) + dstIPBlockIPSet, err := npc.ipSetHandler.Create(dstIPBlockIPSetName, TypeHashNet, OptionTimeout, "0") + if err != nil { + return fmt.Errorf("failed to create ipset: %s", err.Error()) + } + activePolicyIPSets[dstIPBlockIPSet.Name] = true + err = dstIPBlockIPSet.RefreshWithBuiltinOptions(egressRule.dstIPBlocks) + if err != nil { + log.Errorf("failed to refresh dstIPBlockIPSet: " + err.Error()) + } + if !egressRule.matchAllPorts { + for _, portProtocol := range egressRule.ports { + comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port); err != nil { + return err + } + } + } + if egressRule.matchAllPorts { + comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", ""); err != nil { + return err + } + } + } + } + return nil +} + +func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *iptables.IPTables, policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort string) error { + if iptablesCmdHandler == nil { + return fmt.Errorf("Failed to run iptables command: iptablesCmdHandler is nil") + } + args := make([]string, 0) + if comment != "" { + args = append(args, "-m", "comment", "--comment", comment) + } + if srcIPSetName != "" { + args = append(args, "-m", "set", "--set", srcIPSetName, "src") + } + if dstIPSetName != "" { + args = append(args, "-m", "set", "--set", dstIPSetName, "dst") + } + if protocol != "" { + args = append(args, "-p", protocol) + } + if dPort != "" { + args = append(args, "--dport", dPort) + } + args = append(args, "-j", "ACCEPT") + err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...) + if err != nil { + return fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + return nil +} + +func (npc *NetworkPolicyController) syncPodFirewallChains(version string) (map[string]bool, error) { + + activePodFwChains := make(map[string]bool) + + iptablesCmdHandler, err := iptables.New() + if err != nil { + log.Fatalf("Failed to initialize iptables executor: %s", err.Error()) + } + + // loop through the pods running on the node which to which ingress network policies to be applied + ingressNetworkPolicyEnabledPods, err := npc.getIngressNetworkPolicyEnabledPods(npc.nodeIP.String()) + if err != nil { + return nil, err + } + for _, pod := range *ingressNetworkPolicyEnabledPods { + + // below condition occurs when we get trasient update while removing or adding pod + // subseqent update will do the correct action + if len(pod.ip) == 0 || pod.ip == "" { + continue + } + + // ensure pod specific firewall chain exist for all the pods that need ingress firewall + podFwChainName := podFirewallChainName(pod.namespace, pod.name, version) + err = iptablesCmdHandler.NewChain("filter", podFwChainName) + if err != nil && err.(*iptables.Error).ExitStatus() != 1 { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + activePodFwChains[podFwChainName] = true + + // add entries in pod firewall to run through required network policies + for _, policy := range *npc.networkPoliciesInfo { + if _, ok := policy.targetPods[pod.ip]; ok { + comment := "run through nw policy " + policy.name + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) + args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName} + exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) + if err != nil && err.(*iptables.Error).ExitStatus() != 1 { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + } + } + + comment := "rule to permit the traffic traffic to pods when source is the pod's local node" + args := []string{"-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT"} + exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + + // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain + // this rule applies to the traffic getting routed (coming for other node pods) + comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + args = []string{"-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName} + exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + + // ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain + // this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy + exists, err = iptablesCmdHandler.Exists("filter", "OUTPUT", args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", "OUTPUT", 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + + // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain + // this rule applies to the traffic getting switched (coming for same node pods) + comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + args = []string{"-m", "physdev", "--physdev-is-bridged", + "-m", "comment", "--comment", comment, + "-d", pod.ip, + "-j", podFwChainName} + exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + + // add default DROP rule at the end of chain + comment = "default rule to REJECT traffic destined for POD name:" + pod.name + " namespace: " + pod.namespace + args = []string{"-m", "comment", "--comment", comment, "-j", "REJECT"} + err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + + // ensure statefull firewall, that permits return traffic for the traffic originated by the pod + comment = "rule for stateful firewall for pod" + args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"} + exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + } + + // loop through the pods running on the node which egress network policies to be applied + egressNetworkPolicyEnabledPods, err := npc.getEgressNetworkPolicyEnabledPods(npc.nodeIP.String()) + if err != nil { + return nil, err + } + for _, pod := range *egressNetworkPolicyEnabledPods { + + // below condition occurs when we get trasient update while removing or adding pod + // subseqent update will do the correct action + if len(pod.ip) == 0 || pod.ip == "" { + continue + } + + // ensure pod specific firewall chain exist for all the pods that need egress firewall + podFwChainName := podFirewallChainName(pod.namespace, pod.name, version) + err = iptablesCmdHandler.NewChain("filter", podFwChainName) + if err != nil && err.(*iptables.Error).ExitStatus() != 1 { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + activePodFwChains[podFwChainName] = true + + // add entries in pod firewall to run through required network policies + for _, policy := range *npc.networkPoliciesInfo { + if _, ok := policy.targetPods[pod.ip]; ok { + comment := "run through nw policy " + policy.name + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) + args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName} + exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) + if err != nil && err.(*iptables.Error).ExitStatus() != 1 { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + } + } + + // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain + // this rule applies to the traffic getting routed (coming for other node pods) + comment := "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + args := []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName} + exists, err := iptablesCmdHandler.Exists("filter", "FORWARD", args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + + // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain + // this rule applies to the traffic getting switched (coming for same node pods) + comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + args = []string{"-m", "physdev", "--physdev-is-bridged", + "-m", "comment", "--comment", comment, + "-s", pod.ip, + "-j", podFwChainName} + exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + + // add default DROP rule at the end of chain + comment = "default rule to REJECT traffic destined for POD name:" + pod.name + " namespace: " + pod.namespace + args = []string{"-m", "comment", "--comment", comment, "-j", "REJECT"} + err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + + // ensure statefull firewall, that permits return traffic for the traffic originated by the pod + comment = "rule for stateful firewall for pod" + args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"} + exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + if !exists { + err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...) + if err != nil { + return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) + } + } + } + + return activePodFwChains, nil +} + +func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets map[string]bool) error { + + cleanupPodFwChains := make([]string, 0) + cleanupPolicyChains := make([]string, 0) + cleanupPolicyIPSets := make([]*Set, 0) + + iptablesCmdHandler, err := iptables.New() + if err != nil { + log.Fatalf("failed to initialize iptables command executor due to %s", err.Error()) + } + ipsets, err := NewIPSet(false) + if err != nil { + log.Fatalf("failed to create ipsets command executor due to %s", err.Error()) + } + err = ipsets.Save() + if err != nil { + log.Fatalf("failed to initialize ipsets command executor due to %s", err.Error()) + } + + // get the list of chains created for pod firewall and network policies + chains, err := iptablesCmdHandler.ListChains("filter") + for _, chain := range chains { + if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) { + if _, ok := activePolicyChains[chain]; !ok { + cleanupPolicyChains = append(cleanupPolicyChains, chain) + } + } + if strings.HasPrefix(chain, kubePodFirewallChainPrefix) { + if _, ok := activePodFwChains[chain]; !ok { + cleanupPodFwChains = append(cleanupPodFwChains, chain) + } + } + } + for _, set := range ipsets.Sets { + if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) || + strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) { + if _, ok := activePolicyIPSets[set.Name]; !ok { + cleanupPolicyIPSets = append(cleanupPolicyIPSets, set) + } + } + } + + // cleanup FORWARD chain rules to jump to pod firewall + for _, chain := range cleanupPodFwChains { + + forwardChainRules, err := iptablesCmdHandler.List("filter", "FORWARD") + if err != nil { + return fmt.Errorf("failed to list rules in filter table, FORWARD chain due to %s", err.Error()) + } + outputChainRules, err := iptablesCmdHandler.List("filter", "OUTPUT") + if err != nil { + return fmt.Errorf("failed to list rules in filter table, OUTPUT chain due to %s", err.Error()) + } + + // TODO delete rule by spec, than rule number to avoid extra loop + var realRuleNo int + for i, rule := range forwardChainRules { + if strings.Contains(rule, chain) { + err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo)) + if err != nil { + return fmt.Errorf("failed to delete rule: %s from the FORWARD chain of filter table due to %s", rule, err.Error()) + } + realRuleNo++ + } + } + realRuleNo = 0 + for i, rule := range outputChainRules { + if strings.Contains(rule, chain) { + err = iptablesCmdHandler.Delete("filter", "OUTPUT", strconv.Itoa(i-realRuleNo)) + if err != nil { + return fmt.Errorf("failed to delete rule: %s from the OUTPUT chain of filter table due to %s", rule, err.Error()) + } + realRuleNo++ + } + } + } + + // cleanup pod firewall chain + for _, chain := range cleanupPodFwChains { + log.V(2).Infof("Found pod fw chain to cleanup: %s", chain) + err = iptablesCmdHandler.ClearChain("filter", chain) + if err != nil { + return fmt.Errorf("Failed to flush the rules in chain %s due to %s", chain, err.Error()) + } + err = iptablesCmdHandler.DeleteChain("filter", chain) + if err != nil { + return fmt.Errorf("Failed to delete the chain %s due to %s", chain, err.Error()) + } + log.V(2).Infof("Deleted pod specific firewall chain: %s from the filter table", chain) + } + + // cleanup network policy chains + for _, policyChain := range cleanupPolicyChains { + log.V(2).Infof("Found policy chain to cleanup %s", policyChain) + + // first clean up any references from pod firewall chain + for podFwChain := range activePodFwChains { + podFwChainRules, err := iptablesCmdHandler.List("filter", podFwChain) + if err != nil { + + } + for i, rule := range podFwChainRules { + if strings.Contains(rule, policyChain) { + err = iptablesCmdHandler.Delete("filter", podFwChain, strconv.Itoa(i)) + if err != nil { + return fmt.Errorf("Failed to delete rule %s from the chain %s", rule, podFwChain) + } + break + } + } + } + + err = iptablesCmdHandler.ClearChain("filter", policyChain) + if err != nil { + return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err) + } + err = iptablesCmdHandler.DeleteChain("filter", policyChain) + if err != nil { + return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err) + } + log.V(2).Infof("Deleted network policy chain: %s from the filter table", policyChain) + } + + // cleanup network policy ipsets + for _, set := range cleanupPolicyIPSets { + err = set.Destroy() + if err != nil { + return fmt.Errorf("Failed to delete ipset %s due to %s", set.Name, err) + } + } + return nil +} + +func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(nodeIP string) (*map[string]podInfo, error) { + nodePods := make(map[string]podInfo) + + for _, obj := range npc.podLister.List() { + pod := obj.(*api.Pod) + + if strings.Compare(pod.Status.HostIP, nodeIP) != 0 { + continue + } + for _, policy := range *npc.networkPoliciesInfo { + if policy.namespace != pod.ObjectMeta.Namespace { + continue + } + _, ok := policy.targetPods[pod.Status.PodIP] + if ok && (policy.policyType == "both" || policy.policyType == "ingress") { + log.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.") + nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, + name: pod.ObjectMeta.Name, + namespace: pod.ObjectMeta.Namespace, + labels: pod.ObjectMeta.Labels} + break + } + } + } + return &nodePods, nil + +} + +func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIP string) (*map[string]podInfo, error) { + + nodePods := make(map[string]podInfo) + + for _, obj := range npc.podLister.List() { + pod := obj.(*api.Pod) + + if strings.Compare(pod.Status.HostIP, nodeIP) != 0 { + continue + } + for _, policy := range *npc.networkPoliciesInfo { + if policy.namespace != pod.ObjectMeta.Namespace { + continue + } + _, ok := policy.targetPods[pod.Status.PodIP] + if ok && (policy.policyType == "both" || policy.policyType == "egress") { + log.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.") + nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, + name: pod.ObjectMeta.Name, + namespace: pod.ObjectMeta.Namespace, + labels: pod.ObjectMeta.Labels} + break + } + } + } + return &nodePods, nil +} + +func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networking.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) { + numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0) + for _, npPort := range npPorts { + if npPort.Port == nil { + numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)}) + } else if npPort.Port.Type == intstr.Int { + numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)}) + } else { + if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok { + if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok { + for _, eps := range numericPort2eps { + namedPorts = append(namedPorts, *eps) + } + } + } + } + } + return +} + +func (npc *NetworkPolicyController) processBetaNetworkPolicyPorts(npPorts []apiextensions.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) { + numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0) + for _, npPort := range npPorts { + if npPort.Port == nil { + numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)}) + } else if npPort.Port.Type == intstr.Int { + numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)}) + } else { + if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok { + if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok { + for _, eps := range numericPort2eps { + namedPorts = append(namedPorts, *eps) + } + } + } + } + } + return +} + +func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { + + NetworkPolicies := make([]networkPolicyInfo, 0) + + for _, policyObj := range npc.npLister.List() { + + policy, ok := policyObj.(*networking.NetworkPolicy) + if !ok { + return nil, fmt.Errorf("Failed to convert") + } + newPolicy := networkPolicyInfo{ + name: policy.Name, + namespace: policy.Namespace, + labels: policy.Spec.PodSelector.MatchLabels, + policyType: "ingress", + } + + // check if there is explicitly specified PolicyTypes in the spec + if len(policy.Spec.PolicyTypes) > 0 { + ingressType, egressType := false, false + for _, policyType := range policy.Spec.PolicyTypes { + if policyType == networking.PolicyTypeIngress { + ingressType = true + } + if policyType == networking.PolicyTypeEgress { + egressType = true + } + } + if ingressType && egressType { + newPolicy.policyType = "both" + } else if egressType { + newPolicy.policyType = "egress" + } else if ingressType { + newPolicy.policyType = "ingress" + } + } else { + if policy.Spec.Egress != nil && policy.Spec.Ingress != nil { + newPolicy.policyType = "both" + } else if policy.Spec.Egress != nil { + newPolicy.policyType = "egress" + } else if policy.Spec.Ingress != nil { + newPolicy.policyType = "ingress" + } + } + + matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels) + newPolicy.targetPods = make(map[string]podInfo) + namedPort2IngressEps := make(namedPort2eps) + if err == nil { + for _, matchingPod := range matchingPods { + if matchingPod.Status.PodIP == "" { + continue + } + newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, + name: matchingPod.ObjectMeta.Name, + namespace: matchingPod.ObjectMeta.Namespace, + labels: matchingPod.ObjectMeta.Labels} + npc.grabNamedPortFromPod(matchingPod, &namedPort2IngressEps) + } + } + + if policy.Spec.Ingress == nil { + newPolicy.ingressRules = nil + } else { + newPolicy.ingressRules = make([]ingressRule, 0) + } + + if policy.Spec.Egress == nil { + newPolicy.egressRules = nil + } else { + newPolicy.egressRules = make([]egressRule, 0) + } + + for _, specIngressRule := range policy.Spec.Ingress { + ingressRule := ingressRule{} + ingressRule.srcPods = make([]podInfo, 0) + ingressRule.srcIPBlocks = make([][]string, 0) + + // If this field is empty or missing in the spec, this rule matches all sources + if len(specIngressRule.From) == 0 { + ingressRule.matchAllSource = true + } else { + ingressRule.matchAllSource = false + for _, peer := range specIngressRule.From { + if peerPods, err := npc.evalPodPeer(policy, peer); err == nil { + for _, peerPod := range peerPods { + if peerPod.Status.PodIP == "" { + continue + } + ingressRule.srcPods = append(ingressRule.srcPods, + podInfo{ip: peerPod.Status.PodIP, + name: peerPod.ObjectMeta.Name, + namespace: peerPod.ObjectMeta.Namespace, + labels: peerPod.ObjectMeta.Labels}) + } + } + ingressRule.srcIPBlocks = append(ingressRule.srcIPBlocks, npc.evalIPBlockPeer(peer)...) + } + } + + ingressRule.ports = make([]protocolAndPort, 0) + ingressRule.namedPorts = make([]endPoints, 0) + // If this field is empty or missing in the spec, this rule matches all ports + if len(specIngressRule.Ports) == 0 { + ingressRule.matchAllPorts = true + } else { + ingressRule.matchAllPorts = false + ingressRule.ports, ingressRule.namedPorts = npc.processNetworkPolicyPorts(specIngressRule.Ports, namedPort2IngressEps) + } + + newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule) + } + + for _, specEgressRule := range policy.Spec.Egress { + egressRule := egressRule{} + egressRule.dstPods = make([]podInfo, 0) + egressRule.dstIPBlocks = make([][]string, 0) + namedPort2EgressEps := make(namedPort2eps) + + // If this field is empty or missing in the spec, this rule matches all sources + if len(specEgressRule.To) == 0 { + egressRule.matchAllDestinations = true + } else { + egressRule.matchAllDestinations = false + for _, peer := range specEgressRule.To { + if peerPods, err := npc.evalPodPeer(policy, peer); err == nil { + for _, peerPod := range peerPods { + if peerPod.Status.PodIP == "" { + continue + } + egressRule.dstPods = append(egressRule.dstPods, + podInfo{ip: peerPod.Status.PodIP, + name: peerPod.ObjectMeta.Name, + namespace: peerPod.ObjectMeta.Namespace, + labels: peerPod.ObjectMeta.Labels}) + npc.grabNamedPortFromPod(peerPod, &namedPort2EgressEps) + } + + } + egressRule.dstIPBlocks = append(egressRule.dstIPBlocks, npc.evalIPBlockPeer(peer)...) + } + } + + egressRule.ports = make([]protocolAndPort, 0) + egressRule.namedPorts = make([]endPoints, 0) + // If this field is empty or missing in the spec, this rule matches all ports + if len(specEgressRule.Ports) == 0 { + egressRule.matchAllPorts = true + } else { + egressRule.matchAllPorts = false + egressRule.ports, egressRule.namedPorts = npc.processNetworkPolicyPorts(specEgressRule.Ports, namedPort2EgressEps) + } + + newPolicy.egressRules = append(newPolicy.egressRules, egressRule) + } + NetworkPolicies = append(NetworkPolicies, newPolicy) + } + + return &NetworkPolicies, nil +} + +func (npc *NetworkPolicyController) evalPodPeer(policy *networking.NetworkPolicy, peer networking.NetworkPolicyPeer) ([]*api.Pod, error) { + + var matchingPods []*api.Pod + matchingPods = make([]*api.Pod, 0) + var err error + // spec can have both PodSelector AND NamespaceSelector + if peer.NamespaceSelector != nil { + namespaces, err := npc.ListNamespaceByLabels(peer.NamespaceSelector.MatchLabels) + if err != nil { + return nil, errors.New("Failed to build network policies info due to " + err.Error()) + } + + var podSelectorLabels map[string]string + if peer.PodSelector != nil { + podSelectorLabels = peer.PodSelector.MatchLabels + } + for _, namespace := range namespaces { + namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, podSelectorLabels) + if err != nil { + return nil, errors.New("Failed to build network policies info due to " + err.Error()) + } + matchingPods = append(matchingPods, namespacePods...) + } + } else if peer.PodSelector != nil { + matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels) + } + + return matchingPods, err +} + +func (npc *NetworkPolicyController) ListPodsByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) { + podLister := listers.NewPodLister(npc.podLister) + allMatchedNameSpacePods, err := podLister.Pods(namespace).List(labelsToMatch.AsSelector()) + if err != nil { + return nil, err + } + return allMatchedNameSpacePods, nil +} + +func (npc *NetworkPolicyController) ListNamespaceByLabels(set labels.Set) ([]*api.Namespace, error) { + namespaceLister := listers.NewNamespaceLister(npc.nsLister) + matchedNamespaces, err := namespaceLister.List(set.AsSelector()) + if err != nil { + return nil, err + } + return matchedNamespaces, nil +} + +func (npc *NetworkPolicyController) evalIPBlockPeer(peer networking.NetworkPolicyPeer) [][]string { + ipBlock := make([][]string, 0) + if peer.PodSelector == nil && peer.NamespaceSelector == nil && peer.IPBlock != nil { + if cidr := peer.IPBlock.CIDR; strings.HasSuffix(cidr, "/0") { + ipBlock = append(ipBlock, []string{"0.0.0.0/1", OptionTimeout, "0"}, []string{"128.0.0.0/1", OptionTimeout, "0"}) + } else { + ipBlock = append(ipBlock, []string{cidr, OptionTimeout, "0"}) + } + for _, except := range peer.IPBlock.Except { + if strings.HasSuffix(except, "/0") { + ipBlock = append(ipBlock, []string{"0.0.0.0/1", OptionTimeout, "0", OptionNoMatch}, []string{"128.0.0.0/1", OptionTimeout, "0", OptionNoMatch}) + } else { + ipBlock = append(ipBlock, []string{except, OptionTimeout, "0", OptionNoMatch}) + } + } + } + return ipBlock +} + +func (npc *NetworkPolicyController) grabNamedPortFromPod(pod *api.Pod, namedPort2eps *namedPort2eps) { + if pod == nil || namedPort2eps == nil { + return + } + for k := range pod.Spec.Containers { + for _, port := range pod.Spec.Containers[k].Ports { + name := port.Name + protocol := string(port.Protocol) + containerPort := strconv.Itoa(int(port.ContainerPort)) + + if (*namedPort2eps)[name] == nil { + (*namedPort2eps)[name] = make(protocol2eps) + } + if (*namedPort2eps)[name][protocol] == nil { + (*namedPort2eps)[name][protocol] = make(numericPort2eps) + } + if eps, ok := (*namedPort2eps)[name][protocol][containerPort]; !ok { + (*namedPort2eps)[name][protocol][containerPort] = &endPoints{ + ips: []string{pod.Status.PodIP}, + protocolAndPort: protocolAndPort{port: containerPort, protocol: protocol}, + } + } else { + eps.ips = append(eps.ips, pod.Status.PodIP) + } + } + } +} + +func (npc *NetworkPolicyController) buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) { + + NetworkPolicies := make([]networkPolicyInfo, 0) + + for _, policyObj := range npc.npLister.List() { + + policy, _ := policyObj.(*apiextensions.NetworkPolicy) + newPolicy := networkPolicyInfo{ + name: policy.Name, + namespace: policy.Namespace, + labels: policy.Spec.PodSelector.MatchLabels, + } + matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels) + newPolicy.targetPods = make(map[string]podInfo) + newPolicy.ingressRules = make([]ingressRule, 0) + namedPort2IngressEps := make(namedPort2eps) + if err == nil { + for _, matchingPod := range matchingPods { + if matchingPod.Status.PodIP == "" { + continue + } + newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, + name: matchingPod.ObjectMeta.Name, + namespace: matchingPod.ObjectMeta.Namespace, + labels: matchingPod.ObjectMeta.Labels} + npc.grabNamedPortFromPod(matchingPod, &namedPort2IngressEps) + } + } + + for _, specIngressRule := range policy.Spec.Ingress { + ingressRule := ingressRule{} + + ingressRule.ports = make([]protocolAndPort, 0) + ingressRule.namedPorts = make([]endPoints, 0) + ingressRule.ports, ingressRule.namedPorts = npc.processBetaNetworkPolicyPorts(specIngressRule.Ports, namedPort2IngressEps) + ingressRule.srcPods = make([]podInfo, 0) + for _, peer := range specIngressRule.From { + matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels) + if err == nil { + for _, matchingPod := range matchingPods { + if matchingPod.Status.PodIP == "" { + continue + } + ingressRule.srcPods = append(ingressRule.srcPods, + podInfo{ip: matchingPod.Status.PodIP, + name: matchingPod.ObjectMeta.Name, + namespace: matchingPod.ObjectMeta.Namespace, + labels: matchingPod.ObjectMeta.Labels}) + } + } + } + newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule) + } + NetworkPolicies = append(NetworkPolicies, newPolicy) + } + + return &NetworkPolicies, nil +} + +func podFirewallChainName(namespace, podName string, version string) string { + hash := sha256.Sum256([]byte(namespace + podName + version)) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return kubePodFirewallChainPrefix + encoded[:16] +} + +func networkPolicyChainName(namespace, policyName string, version string) string { + hash := sha256.Sum256([]byte(namespace + policyName + version)) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return kubeNetworkPolicyChainPrefix + encoded[:16] +} + +func policySourcePodIPSetName(namespace, policyName string) string { + hash := sha256.Sum256([]byte(namespace + policyName)) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return kubeSourceIPSetPrefix + encoded[:16] +} + +func policyDestinationPodIPSetName(namespace, policyName string) string { + hash := sha256.Sum256([]byte(namespace + policyName)) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return kubeDestinationIPSetPrefix + encoded[:16] +} + +func policyIndexedSourcePodIPSetName(namespace, policyName string, ingressRuleNo int) string { + hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "pod")) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return kubeSourceIPSetPrefix + encoded[:16] +} + +func policyIndexedDestinationPodIPSetName(namespace, policyName string, egressRuleNo int) string { + hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "pod")) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return kubeDestinationIPSetPrefix + encoded[:16] +} + +func policyIndexedSourceIPBlockIPSetName(namespace, policyName string, ingressRuleNo int) string { + hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "ipblock")) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return kubeSourceIPSetPrefix + encoded[:16] +} + +func policyIndexedDestinationIPBlockIPSetName(namespace, policyName string, egressRuleNo int) string { + hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "ipblock")) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return kubeDestinationIPSetPrefix + encoded[:16] +} + +func policyIndexedIngressNamedPortIPSetName(namespace, policyName string, ingressRuleNo, namedPortNo int) string { + hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + strconv.Itoa(namedPortNo) + "namedport")) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return kubeDestinationIPSetPrefix + encoded[:16] +} + +func policyIndexedEgressNamedPortIPSetName(namespace, policyName string, egressRuleNo, namedPortNo int) string { + hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + strconv.Itoa(namedPortNo) + "namedport")) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return kubeDestinationIPSetPrefix + encoded[:16] +} + +// Cleanup cleanup configurations done +func (npc *NetworkPolicyController) Cleanup() { + + log.Info("Cleaning up iptables configuration permanently done by kube-router") + + iptablesCmdHandler, err := iptables.New() + if err != nil { + log.Errorf("Failed to initialize iptables executor: %s", err.Error()) + } + + // delete jump rules in FORWARD chain to pod specific firewall chain + forwardChainRules, err := iptablesCmdHandler.List("filter", "FORWARD") + if err != nil { + log.Errorf("Failed to delete iptables rules as part of cleanup") + return + } + + // TODO: need a better way to delte rule with out using number + var realRuleNo int + for i, rule := range forwardChainRules { + if strings.Contains(rule, kubePodFirewallChainPrefix) { + err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo)) + realRuleNo++ + } + } + + // delete jump rules in OUTPUT chain to pod specific firewall chain + forwardChainRules, err = iptablesCmdHandler.List("filter", "OUTPUT") + if err != nil { + log.Errorf("Failed to delete iptables rules as part of cleanup") + return + } + + // TODO: need a better way to delte rule with out using number + realRuleNo = 0 + for i, rule := range forwardChainRules { + if strings.Contains(rule, kubePodFirewallChainPrefix) { + err = iptablesCmdHandler.Delete("filter", "OUTPUT", strconv.Itoa(i-realRuleNo)) + realRuleNo++ + } + } + + // flush and delete pod specific firewall chain + chains, err := iptablesCmdHandler.ListChains("filter") + for _, chain := range chains { + if strings.HasPrefix(chain, kubePodFirewallChainPrefix) { + err = iptablesCmdHandler.ClearChain("filter", chain) + if err != nil { + log.Errorf("Failed to cleanup iptables rules: " + err.Error()) + return + } + err = iptablesCmdHandler.DeleteChain("filter", chain) + if err != nil { + log.Errorf("Failed to cleanup iptables rules: " + err.Error()) + return + } + } + } + + // flush and delete per network policy specific chain + chains, err = iptablesCmdHandler.ListChains("filter") + for _, chain := range chains { + if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) { + err = iptablesCmdHandler.ClearChain("filter", chain) + if err != nil { + log.Errorf("Failed to cleanup iptables rules: " + err.Error()) + return + } + err = iptablesCmdHandler.DeleteChain("filter", chain) + if err != nil { + log.Errorf("Failed to cleanup iptables rules: " + err.Error()) + return + } + } + } + + // delete all ipsets + ipset, err := NewIPSet(false) + if err != nil { + log.Errorf("Failed to clean up ipsets: " + err.Error()) + } + err = ipset.Save() + if err != nil { + log.Errorf("Failed to clean up ipsets: " + err.Error()) + } + err = ipset.DestroyAllWithin() + if err != nil { + log.Errorf("Failed to clean up ipsets: " + err.Error()) + } + log.Infof("Successfully cleaned the iptables configuration done by kube-router") +} + +func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + npc.OnPodUpdate(obj) + + }, + UpdateFunc: func(oldObj, newObj interface{}) { + newPoObj := newObj.(*api.Pod) + oldPoObj := oldObj.(*api.Pod) + if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP { + // for the network policies, we are only interested in pod status phase change or IP change + npc.OnPodUpdate(newObj) + } + }, + DeleteFunc: func(obj interface{}) { + npc.OnPodUpdate(obj) + }, + } +} + +func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + npc.OnNamespaceUpdate(obj) + + }, + UpdateFunc: func(oldObj, newObj interface{}) { + npc.OnNamespaceUpdate(newObj) + + }, + DeleteFunc: func(obj interface{}) { + npc.OnNamespaceUpdate(obj) + + }, + } +} + +func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + npc.OnNetworkPolicyUpdate(obj) + + }, + UpdateFunc: func(oldObj, newObj interface{}) { + npc.OnNetworkPolicyUpdate(newObj) + }, + DeleteFunc: func(obj interface{}) { + npc.OnNetworkPolicyUpdate(obj) + + }, + } +} + +// NewNetworkPolicyController returns new NetworkPolicyController object +func NewNetworkPolicyController( + stopCh <-chan struct{}, + clientset kubernetes.Interface, + ipTablesSyncPeriod time.Duration, + hostnameOverride string) (*NetworkPolicyController, error) { + + npc := NetworkPolicyController{} + + informerFactory := informers.NewSharedInformerFactory(clientset, 0) + podInformer := informerFactory.Core().V1().Pods().Informer() + nsInformer := informerFactory.Core().V1().Namespaces().Informer() + npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer() + informerFactory.Start(stopCh) + + // if config.MetricsEnabled { + // //Register the metrics for this controller + // prometheus.MustRegister(metrics.ControllerIPtablesSyncTime) + // prometheus.MustRegister(metrics.ControllerPolicyChainsSyncTime) + // npc.MetricsEnabled = true + // } + + npc.syncPeriod = ipTablesSyncPeriod + + npc.v1NetworkPolicy = true + v, _ := clientset.Discovery().ServerVersion() + valid := regexp.MustCompile("[0-9]") + v.Minor = strings.Join(valid.FindAllString(v.Minor, -1), "") + minorVer, _ := strconv.Atoi(v.Minor) + if v.Major == "1" && minorVer < 7 { + npc.v1NetworkPolicy = false + } + + node, err := clientset.CoreV1().Nodes().Get(hostnameOverride, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + npc.nodeHostName = node.Name + + nodeIP, err := GetNodeIP(node) + if err != nil { + return nil, err + } + npc.nodeIP = nodeIP + + ipset, err := NewIPSet(false) + if err != nil { + return nil, err + } + err = ipset.Save() + if err != nil { + return nil, err + } + npc.ipSetHandler = ipset + + npc.podLister = podInformer.GetIndexer() + npc.PodEventHandler = npc.newPodEventHandler() + + npc.nsLister = nsInformer.GetIndexer() + npc.NamespaceEventHandler = npc.newNamespaceEventHandler() + + npc.npLister = npInformer.GetIndexer() + npc.NetworkPolicyEventHandler = npc.newNetworkPolicyEventHandler() + + return &npc, nil +} diff --git a/pkg/agent/netpol/utils.go b/pkg/agent/netpol/utils.go new file mode 100644 index 0000000000..74980890f9 --- /dev/null +++ b/pkg/agent/netpol/utils.go @@ -0,0 +1,534 @@ +// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs) +// - modified from https://github.com/cloudnativelabs/kube-router/tree/d6f9f31a7b/pkg/utils + +package netpol + +import ( + "bytes" + "errors" + "fmt" + "net" + "os/exec" + "strings" + + apiv1 "k8s.io/api/core/v1" +) + +var ( + // Error returned when ipset binary is not found. + errIpsetNotFound = errors.New("Ipset utility not found") +) + +const ( + // FamillyInet IPV4. + FamillyInet = "inet" + // FamillyInet6 IPV6. + FamillyInet6 = "inet6" + + // DefaultMaxElem Default OptionMaxElem value. + DefaultMaxElem = "65536" + // DefaultHasSize Defaul OptionHashSize value. + DefaultHasSize = "1024" + + // TypeHashIP The hash:ip set type uses a hash to store IP host addresses (default) or network addresses. Zero valued IP address cannot be stored in a hash:ip type of set. + TypeHashIP = "hash:ip" + // TypeHashMac The hash:mac set type uses a hash to store MAC addresses. Zero valued MAC addresses cannot be stored in a hash:mac type of set. + TypeHashMac = "hash:mac" + // TypeHashNet The hash:net set type uses a hash to store different sized IP network addresses. Network address with zero prefix size cannot be stored in this type of sets. + TypeHashNet = "hash:net" + // TypeHashNetNet The hash:net,net set type uses a hash to store pairs of different sized IP network addresses. Bear in mind that the first parameter has precedence over the second, so a nomatch entry could be potentially be ineffective if a more specific first parameter existed with a suitable second parameter. Network address with zero prefix size cannot be stored in this type of set. + TypeHashNetNet = "hash:net,net" + // TypeHashIPPort The hash:ip,port set type uses a hash to store IP address and port number pairs. The port number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. + TypeHashIPPort = "hash:ip,port" + // TypeHashNetPort The hash:net,port set type uses a hash to store different sized IP network address and port pairs. The port number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. Network address with zero prefix size is not accepted either. + TypeHashNetPort = "hash:net,port" + // TypeHashIPPortIP The hash:ip,port,ip set type uses a hash to store IP address, port number and a second IP address triples. The port number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. + TypeHashIPPortIP = "hash:ip,port,ip" + // TypeHashIPPortNet The hash:ip,port,net set type uses a hash to store IP address, port number and IP network address triples. The port number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. Network address with zero prefix size cannot be stored either. + TypeHashIPPortNet = "hash:ip,port,net" + // TypeHashIPMark The hash:ip,mark set type uses a hash to store IP address and packet mark pairs. + TypeHashIPMark = "hash:ip,mark" + // TypeHashIPNetPortNet The hash:net,port,net set type behaves similarly to hash:ip,port,net but accepts a cidr value for both the first and last parameter. Either subnet is permitted to be a /0 should you wish to match port between all destinations. + TypeHashIPNetPortNet = "hash:net,port,net" + // TypeHashNetIface The hash:net,iface set type uses a hash to store different sized IP network address and interface name pairs. + TypeHashNetIface = "hash:net,iface" + // TypeListSet The list:set type uses a simple list in which you can store set names. + TypeListSet = "list:set" + + // OptionTimeout All set types supports the optional timeout parameter when creating a set and adding entries. The value of the timeout parameter for the create command means the default timeout value (in seconds) for new entries. If a set is created with timeout support, then the same timeout option can be used to specify non-default timeout values when adding entries. Zero timeout value means the entry is added permanent to the set. The timeout value of already added elements can be changed by readding the element using the -exist option. When listing the set, the number of entries printed in the header might be larger than the listed number of entries for sets with the timeout extensions: the number of entries in the set is updated when elements added/deleted to the set and periodically when the garbage colletor evicts the timed out entries.` + OptionTimeout = "timeout" + // OptionCounters All set types support the optional counters option when creating a set. If the option is specified then the set is created with packet and byte counters per element support. The packet and byte counters are initialized to zero when the elements are (re-)added to the set, unless the packet and byte counter values are explicitly specified by the packets and bytes options. An example when an element is added to a set with non-zero counter values. + OptionCounters = "counters" + // OptionPackets All set types support the optional counters option when creating a set. If the option is specified then the set is created with packet and byte counters per element support. The packet and byte counters are initialized to zero when the elements are (re-)added to the set, unless the packet and byte counter values are explicitly specified by the packets and bytes options. An example when an element is added to a set with non-zero counter values. + OptionPackets = "packets" + // OptionBytes All set types support the optional counters option when creating a set. If the option is specified then the set is created with packet and byte counters per element support. The packet and byte counters are initialized to zero when the elements are (re-)added to the set, unless the packet and byte counter values are explicitly specified by the packets and bytes options. An example when an element is added to a set with non-zero counter values. + OptionBytes = "bytes" + // OptionComment All set types support the optional comment extension. Enabling this extension on an ipset enables you to annotate an ipset entry with an arbitrary string. This string is completely ignored by both the kernel and ipset itself and is purely for providing a convenient means to document the reason for an entry's existence. Comments must not contain any quotation marks and the usual escape character (\) has no meaning + OptionComment = "comment" + // OptionSkbinfo All set types support the optional skbinfo extension. This extension allow to store the metainfo (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. + OptionSkbinfo = "skbinfo" + // OptionSkbmark All set types support the optional skbinfo extension. This extension allow to store the metainfo (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. + OptionSkbmark = "skbmark" + // OptionSkbprio All set types support the optional skbinfo extension. This extension allow to store the metainfo (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. + OptionSkbprio = "skbprio" + // OptionSkbqueue All set types support the optional skbinfo extension. This extension allow to store the metainfo (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. + OptionSkbqueue = "skbqueue" + // OptionHashSize This parameter is valid for the create command of all hash type sets. It defines the initial hash size for the set, default is 1024. The hash size must be a power of two, the kernel automatically rounds up non power of two hash sizes to the first correct value. + OptionHashSize = "hashsize" + // OptionMaxElem This parameter is valid for the create command of all hash type sets. It does define the maximal number of elements which can be stored in the set, default 65536. + OptionMaxElem = "maxelem" + // OptionFamilly This parameter is valid for the create command of all hash type sets except for hash:mac. It defines the protocol family of the IP addresses to be stored in the set. The default is inet, i.e IPv4. + OptionFamilly = "family" + // OptionNoMatch The hash set types which can store net type of data (i.e. hash:*net*) support the optional nomatch option when adding entries. When matching elements in the set, entries marked as nomatch are skipped as if those were not added to the set, which makes possible to build up sets with exceptions. See the example at hash type hash:net below. When elements are tested by ipset, the nomatch flags are taken into account. If one wants to test the existence of an element marked with nomatch in a set, then the flag must be specified too. + OptionNoMatch = "nomatch" + // OptionForceAdd All hash set types support the optional forceadd parameter when creating a set. When sets created with this option become full the next addition to the set may succeed and evict a random entry from the set. + OptionForceAdd = "forceadd" +) + +// IPSet represent ipset sets managed by. +type IPSet struct { + ipSetPath *string + Sets map[string]*Set + isIpv6 bool +} + +// Set reprensent a ipset set entry. +type Set struct { + Parent *IPSet + Name string + Entries []*Entry + Options []string +} + +// Entry of ipset Set. +type Entry struct { + Set *Set + Options []string +} + +// Get ipset binary path or return an error. +func getIPSetPath() (*string, error) { + path, err := exec.LookPath("ipset") + if err != nil { + return nil, errIpsetNotFound + } + return &path, nil +} + +// Used to run ipset binary with args and return stdout. +func (ipset *IPSet) run(args ...string) (string, error) { + var stderr bytes.Buffer + var stdout bytes.Buffer + cmd := exec.Cmd{ + Path: *ipset.ipSetPath, + Args: append([]string{*ipset.ipSetPath}, args...), + Stderr: &stderr, + Stdout: &stdout, + } + + if err := cmd.Run(); err != nil { + return "", errors.New(stderr.String()) + } + + return stdout.String(), nil +} + +// Used to run ipset binary with arg and inject stdin buffer and return stdout. +func (ipset *IPSet) runWithStdin(stdin *bytes.Buffer, args ...string) (string, error) { + var stderr bytes.Buffer + var stdout bytes.Buffer + cmd := exec.Cmd{ + Path: *ipset.ipSetPath, + Args: append([]string{*ipset.ipSetPath}, args...), + Stderr: &stderr, + Stdout: &stdout, + Stdin: stdin, + } + + if err := cmd.Run(); err != nil { + return "", errors.New(stderr.String()) + } + + return stdout.String(), nil +} + +// NewIPSet create a new IPSet with ipSetPath initialized. +func NewIPSet(isIpv6 bool) (*IPSet, error) { + ipSetPath, err := getIPSetPath() + if err != nil { + return nil, err + } + ipSet := &IPSet{ + ipSetPath: ipSetPath, + Sets: make(map[string]*Set), + isIpv6: isIpv6, + } + return ipSet, nil +} + +// Create a set identified with setname and specified type. The type may +// require type specific options. Does not create set on the system if it +// already exists by the same name. +func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error) { + // Populate Set map if needed + if ipset.Get(setName) == nil { + ipset.Sets[setName] = &Set{ + Name: setName, + Options: createOptions, + Parent: ipset, + } + } + + // Determine if set with the same name is already active on the system + setIsActive, err := ipset.Sets[setName].IsActive() + if err != nil { + return nil, fmt.Errorf("Failed to determine if ipset set %s exists: %s", + setName, err) + } + + // Create set if missing from the system + if !setIsActive { + if ipset.isIpv6 { + // Add "family inet6" option and a "inet6:" prefix for IPv6 sets. + args := []string{"create", "-exist", ipset.Sets[setName].name()} + args = append(args, createOptions...) + args = append(args, "family", "inet6") + if _, err := ipset.run(args...); err != nil { + return nil, fmt.Errorf("Failed to create ipset set on system: %s", err) + } + } else { + _, err := ipset.run(append([]string{"create", "-exist", setName}, + createOptions...)...) + if err != nil { + return nil, fmt.Errorf("Failed to create ipset set on system: %s", err) + } + } + } + return ipset.Sets[setName], nil +} + +// Adds a given Set to an IPSet +func (ipset *IPSet) Add(set *Set) error { + _, err := ipset.Create(set.Name, set.Options...) + if err != nil { + return err + } + + for _, entry := range set.Entries { + _, err := ipset.Get(set.Name).Add(entry.Options...) + if err != nil { + return err + } + } + + return nil +} + +// Add a given entry to the set. If the -exist option is specified, ipset +// ignores if the entry already added to the set. +func (set *Set) Add(addOptions ...string) (*Entry, error) { + entry := &Entry{ + Set: set, + Options: addOptions, + } + set.Entries = append(set.Entries, entry) + _, err := set.Parent.run(append([]string{"add", "-exist", entry.Set.name()}, addOptions...)...) + if err != nil { + return nil, err + } + return entry, nil +} + +// Del an entry from a set. If the -exist option is specified and the entry is +// not in the set (maybe already expired), then the command is ignored. +func (entry *Entry) Del() error { + _, err := entry.Set.Parent.run(append([]string{"del", entry.Set.name()}, entry.Options...)...) + if err != nil { + return err + } + entry.Set.Parent.Save() + return nil +} + +// Test wether an entry is in a set or not. Exit status number is zero if the +// tested entry is in the set and nonzero if it is missing from the set. +func (set *Set) Test(testOptions ...string) (bool, error) { + _, err := set.Parent.run(append([]string{"test", set.name()}, testOptions...)...) + if err != nil { + return false, err + } + return true, nil +} + +// Destroy the specified set or all the sets if none is given. If the set has +// got reference(s), nothing is done and no set destroyed. +func (set *Set) Destroy() error { + _, err := set.Parent.run("destroy", set.name()) + if err != nil { + return err + } + + delete(set.Parent.Sets, set.Name) + return nil +} + +// Destroy the specified set by name. If the set has got reference(s), nothing +// is done and no set destroyed. If the IPSet does not contain the named set +// then Destroy is a no-op. +func (ipset *IPSet) Destroy(setName string) error { + set := ipset.Get(setName) + if set == nil { + return nil + } + + err := set.Destroy() + if err != nil { + return err + } + + return nil +} + +// DestroyAllWithin destroys all sets contained within the IPSet's Sets. +func (ipset *IPSet) DestroyAllWithin() error { + for _, v := range ipset.Sets { + err := v.Destroy() + if err != nil { + return err + } + } + + return nil +} + +// IsActive checks if a set exists on the system with the same name. +func (set *Set) IsActive() (bool, error) { + _, err := set.Parent.run("list", set.name()) + if err != nil { + if strings.Contains(err.Error(), "name does not exist") { + return false, nil + } + return false, err + } + return true, nil +} + +func (set *Set) name() string { + if set.Parent.isIpv6 { + return "inet6:" + set.Name + } + return set.Name +} + +// Parse ipset save stdout. +// ex: +// create KUBE-DST-3YNVZWWGX3UQQ4VQ hash:ip family inet hashsize 1024 maxelem 65536 timeout 0 +// add KUBE-DST-3YNVZWWGX3UQQ4VQ 100.96.1.6 timeout 0 +func parseIPSetSave(ipset *IPSet, result string) map[string]*Set { + sets := make(map[string]*Set) + // Save is always in order + lines := strings.Split(result, "\n") + for _, line := range lines { + content := strings.Split(line, " ") + if content[0] == "create" { + sets[content[1]] = &Set{ + Parent: ipset, + Name: content[1], + Options: content[2:], + } + } else if content[0] == "add" { + set := sets[content[1]] + set.Entries = append(set.Entries, &Entry{ + Set: set, + Options: content[2:], + }) + } + } + + return sets +} + +// Build ipset restore input +// ex: +// create KUBE-DST-3YNVZWWGX3UQQ4VQ hash:ip family inet hashsize 1024 maxelem 65536 timeout 0 +// add KUBE-DST-3YNVZWWGX3UQQ4VQ 100.96.1.6 timeout 0 +func buildIPSetRestore(ipset *IPSet) string { + ipSetRestore := "" + for _, set := range ipset.Sets { + ipSetRestore += fmt.Sprintf("create %s %s\n", set.Name, strings.Join(set.Options[:], " ")) + for _, entry := range set.Entries { + ipSetRestore += fmt.Sprintf("add %s %s\n", set.Name, strings.Join(entry.Options[:], " ")) + } + } + return ipSetRestore +} + +// Save the given set, or all sets if none is given to stdout in a format that +// restore can read. The option -file can be used to specify a filename instead +// of stdout. +// save "ipset save" command output to ipset.sets. +func (ipset *IPSet) Save() error { + stdout, err := ipset.run("save") + if err != nil { + return err + } + ipset.Sets = parseIPSetSave(ipset, stdout) + return nil +} + +// Restore a saved session generated by save. The saved session can be fed from +// stdin or the option -file can be used to specify a filename instead of +// stdin. Please note, existing sets and elements are not erased by restore +// unless specified so in the restore file. All commands are allowed in restore +// mode except list, help, version, interactive mode and restore itself. +// Send formated ipset.sets into stdin of "ipset restore" command. +func (ipset *IPSet) Restore() error { + stdin := bytes.NewBufferString(buildIPSetRestore(ipset)) + _, err := ipset.runWithStdin(stdin, "restore", "-exist") + if err != nil { + return err + } + return nil +} + +// Flush all entries from the specified set or flush all sets if none is given. +func (set *Set) Flush() error { + _, err := set.Parent.run("flush", set.Name) + if err != nil { + return err + } + return nil +} + +// Flush all entries from the specified set or flush all sets if none is given. +func (ipset *IPSet) Flush() error { + _, err := ipset.run("flush") + if err != nil { + return err + } + return nil +} + +// Get Set by Name. +func (ipset *IPSet) Get(setName string) *Set { + set, ok := ipset.Sets[setName] + if !ok { + return nil + } + + return set +} + +// Rename a set. Set identified by SETNAME-TO must not exist. +func (set *Set) Rename(newName string) error { + if set.Parent.isIpv6 { + newName = "ipv6:" + newName + } + _, err := set.Parent.run("rename", set.name(), newName) + if err != nil { + return err + } + return nil +} + +// Swap the content of two sets, or in another words, exchange the name of two +// sets. The referred sets must exist and compatible type of sets can be +// swapped only. +func (set *Set) Swap(setTo *Set) error { + _, err := set.Parent.run("swap", set.name(), setTo.name()) + if err != nil { + return err + } + return nil +} + +// Refresh a Set with new entries. +func (set *Set) Refresh(entries []string, extraOptions ...string) error { + var err error + // The set-name must be < 32 characters! + tempName := set.Name + "-" + + newSet := &Set{ + Parent: set.Parent, + Name: tempName, + Options: set.Options, + } + + err = set.Parent.Add(newSet) + if err != nil { + return err + } + + for _, entry := range entries { + _, err = newSet.Add(entry) + if err != nil { + return err + } + } + + err = set.Swap(newSet) + if err != nil { + return err + } + + err = set.Parent.Destroy(tempName) + if err != nil { + return err + } + + return nil +} + +// Refresh a Set with new entries with built-in options. +func (set *Set) RefreshWithBuiltinOptions(entries [][]string) error { + var err error + tempName := set.Name + "-temp" + newSet := &Set{ + Parent: set.Parent, + Name: tempName, + Options: set.Options, + } + + err = set.Parent.Add(newSet) + if err != nil { + return err + } + + for _, entry := range entries { + _, err = newSet.Add(entry...) + if err != nil { + return err + } + } + + err = set.Swap(newSet) + if err != nil { + return err + } + + err = set.Parent.Destroy(tempName) + if err != nil { + return err + } + + return nil +} + +// GetNodeIP returns the most valid external facing IP address for a node. +// Order of preference: +// 1. NodeInternalIP +// 2. NodeExternalIP (Only set on cloud providers usually) +func GetNodeIP(node *apiv1.Node) (net.IP, error) { + addresses := node.Status.Addresses + addressMap := make(map[apiv1.NodeAddressType][]apiv1.NodeAddress) + for i := range addresses { + addressMap[addresses[i].Type] = append(addressMap[addresses[i].Type], addresses[i]) + } + if addresses, ok := addressMap[apiv1.NodeInternalIP]; ok { + return net.ParseIP(addresses[0].Address), nil + } + if addresses, ok := addressMap[apiv1.NodeExternalIP]; ok { + return net.ParseIP(addresses[0].Address), nil + } + return nil, errors.New("host IP unknown") +} diff --git a/pkg/agent/run.go b/pkg/agent/run.go index aa2f34b809..8c96d89b0e 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -13,6 +13,7 @@ import ( "github.com/rancher/k3s/pkg/agent/containerd" "github.com/rancher/k3s/pkg/agent/flannel" "github.com/rancher/k3s/pkg/agent/loadbalancer" + "github.com/rancher/k3s/pkg/agent/netpol" "github.com/rancher/k3s/pkg/agent/syssetup" "github.com/rancher/k3s/pkg/agent/tunnel" "github.com/rancher/k3s/pkg/cli/cmds" @@ -75,6 +76,12 @@ func run(ctx context.Context, cfg cmds.Agent, lb *loadbalancer.LoadBalancer) err } } + if !nodeConfig.AgentConfig.DisableNPC { + if err := netpol.Run(ctx, nodeConfig); err != nil { + return err + } + } + <-ctx.Done() return ctx.Err() } diff --git a/pkg/cli/cmds/server.go b/pkg/cli/cmds/server.go index 377d67a967..7da1a40ba3 100644 --- a/pkg/cli/cmds/server.go +++ b/pkg/cli/cmds/server.go @@ -37,6 +37,7 @@ type Server struct { FlannelBackend string DefaultLocalStoragePath string DisableCCM bool + DisableNPC bool } var ServerConfig Server @@ -206,6 +207,11 @@ func NewServerCommand(action func(*cli.Context) error) cli.Command { Usage: "Disable k3s default cloud controller manager", Destination: &ServerConfig.DisableCCM, }, + cli.BoolFlag{ + Name: "disable-network-policy", + Usage: "Disable k3s default network policy controller", + Destination: &ServerConfig.DisableNPC, + }, cli.StringFlag{ Name: "flannel-backend", Usage: fmt.Sprintf("(experimental) One of '%s', '%s', '%s', or '%s'", config.FlannelBackendNone, config.FlannelBackendVXLAN, config.FlannelBackendIPSEC, config.FlannelBackendWireguard), diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index 2d612806c0..674d33eccd 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -87,6 +87,7 @@ func run(app *cli.Context, cfg *cmds.Server) error { serverConfig.ControlConfig.FlannelBackend = cfg.FlannelBackend serverConfig.ControlConfig.ExtraCloudControllerArgs = cfg.ExtraCloudControllerArgs serverConfig.ControlConfig.DisableCCM = cfg.DisableCCM + serverConfig.ControlConfig.DisableNPC = cfg.DisableNPC if cmds.AgentConfig.FlannelIface != "" && cmds.AgentConfig.NodeIP == "" { cmds.AgentConfig.NodeIP = netutil.GetIPFromInterface(cmds.AgentConfig.FlannelIface) diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index e1dbce6ae6..d84f433643 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -79,6 +79,7 @@ type Agent struct { StrongSwanDir string PrivateRegistry string DisableCCM bool + DisableNPC bool } type Control struct { @@ -108,6 +109,7 @@ type Control struct { IPSECPSK string DefaultLocalStoragePath string DisableCCM bool + DisableNPC bool Runtime *ControlRuntime `json:"-"` }