This commit is contained in:
yuzhiquan 2019-11-28 19:24:19 +08:00
parent 00e8a29b88
commit 7cc0110081
2 changed files with 13 additions and 13 deletions

View File

@ -76,13 +76,13 @@ func Run(ctx context.Context, cfg *config.Node) error {
}() }()
for { for {
addr, dailer, err := util.GetAddressAndDialer("unix://" + cfg.Containerd.Address) addr, dialer, err := util.GetAddressAndDialer("unix://" + cfg.Containerd.Address)
if err != nil { if err != nil {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
continue continue
} }
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil { if err != nil {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
continue continue

View File

@ -49,11 +49,11 @@ const (
// filter table a rule is added to jump the traffic originating (in case of egress network policy) from the pod // filter table a rule is added to jump the traffic originating (in case of egress network policy) from the pod
// or destined (in case of ingress network policy) to the pod specific iptables chain. Each // or destined (in case of ingress network policy) to the pod specific iptables chain. Each
// pod specific iptables chain has rules to jump to the network polices chains, that pod matches. So packet // pod specific iptables chain has rules to jump to the network polices chains, that pod matches. So packet
// originating/destined from/to pod goes through fitler table's, FORWARD chain, followed by pod specific chain, // originating/destined from/to pod goes through filter table's, FORWARD chain, followed by pod specific chain,
// followed by one or more network policy chains, till there is a match which will accept the packet, or gets // followed by one or more network policy chains, till there is a match which will accept the packet, or gets
// dropped by the rule in the pod chain, if there is no match. // dropped by the rule in the pod chain, if there is no match.
// NetworkPolicyController strcut to hold information required by NetworkPolicyController // NetworkPolicyController struct to hold information required by NetworkPolicyController
type NetworkPolicyController struct { type NetworkPolicyController struct {
nodeIP net.IP nodeIP net.IP
nodeHostName string nodeHostName string
@ -138,7 +138,7 @@ type numericPort2eps map[string]*endPoints
type protocol2eps map[string]numericPort2eps type protocol2eps map[string]numericPort2eps
type namedPort2eps map[string]protocol2eps type namedPort2eps map[string]protocol2eps
// Run runs forver till we receive notification on stopCh // Run runs forever till we receive notification on stopCh
func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) { func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) {
t := time.NewTicker(npc.syncPeriod) t := time.NewTicker(npc.syncPeriod)
defer t.Stop() defer t.Stop()
@ -320,16 +320,16 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(version string) (map
activePolicyIPSets[targetDestPodIPSet.Name] = true activePolicyIPSets[targetDestPodIPSet.Name] = true
activePolicyIPSets[targetSourcePodIPSet.Name] = true activePolicyIPSets[targetSourcePodIPSet.Name] = true
currnetPodIPs := make([]string, 0, len(policy.targetPods)) currentPodIPs := make([]string, 0, len(policy.targetPods))
for ip := range policy.targetPods { for ip := range policy.targetPods {
currnetPodIPs = append(currnetPodIPs, ip) currentPodIPs = append(currentPodIPs, ip)
} }
err = targetSourcePodIPSet.Refresh(currnetPodIPs, OptionTimeout, "0") err = targetSourcePodIPSet.Refresh(currentPodIPs, OptionTimeout, "0")
if err != nil { if err != nil {
log.Errorf("failed to refresh targetSourcePodIPSet: " + err.Error()) log.Errorf("failed to refresh targetSourcePodIPSet: " + err.Error())
} }
err = targetDestPodIPSet.Refresh(currnetPodIPs, OptionTimeout, "0") err = targetDestPodIPSet.Refresh(currentPodIPs, OptionTimeout, "0")
if err != nil { if err != nil {
log.Errorf("failed to refresh targetDestPodIPSet: " + err.Error()) log.Errorf("failed to refresh targetDestPodIPSet: " + err.Error())
} }
@ -463,7 +463,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
} }
} }
// case where nether ports nor from details are speified in the ingress rule // case where nether ports nor from details are specified in the ingress rule
// so match on all ports, protocol, source IP's // so match on all ports, protocol, source IP's
if ingressRule.matchAllSource && ingressRule.matchAllPorts { if ingressRule.matchAllSource && ingressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
@ -621,7 +621,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
} }
} }
// case where nether ports nor from details are speified in the egress rule // case where nether ports nor from details are specified in the egress rule
// so match on all ports, protocol, source IP's // so match on all ports, protocol, source IP's
if egressRule.matchAllDestinations && egressRule.matchAllPorts { if egressRule.matchAllDestinations && egressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
@ -808,7 +808,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(version string) (map[s
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
} }
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod // ensure stateful firewall, that permits return traffic for the traffic originated by the pod
comment = "rule for stateful firewall for pod" comment = "rule for stateful firewall for pod"
args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"} args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...) exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...)
@ -906,7 +906,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(version string) (map[s
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error()) return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
} }
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod // ensure stateful firewall, that permits return traffic for the traffic originated by the pod
comment = "rule for stateful firewall for pod" comment = "rule for stateful firewall for pod"
args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"} args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...) exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...)