Skip to content

Commit

Permalink
use ipset save and restore to modify ipset to reduce exec calls
Browse files Browse the repository at this point in the history
  • Loading branch information
murali-reddy authored and aauren committed Mar 18, 2021
1 parent 888cac9 commit afd866c
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 109 deletions.
11 changes: 0 additions & 11 deletions pkg/controllers/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ func (npc *NetworkPolicyController) fullPolicySync() {
return
}

fmt.Println(npc.filterTableRules.String())
if err := utils.Restore("filter", npc.filterTableRules.Bytes()); err != nil {
glog.Errorf("Aborting sync. Failed to run iptables-restore: %v" + err.Error())
return
Expand Down Expand Up @@ -652,16 +651,6 @@ func NewNetworkPolicyController(clientset kubernetes.Interface,
}
npc.nodeIP = nodeIP

ipset, err := utils.NewIPSet(false)
if err != nil {
return nil, err
}
err = ipset.Save()
if err != nil {
return nil, err
}
npc.ipSetHandler = ipset

npc.podLister = podInformer.GetIndexer()
npc.PodEventHandler = npc.newPodEventHandler()

Expand Down
157 changes: 59 additions & 98 deletions pkg/controllers/netpol/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
metrics.ControllerPolicyChainsSyncTime.Observe(endTime.Seconds())
glog.V(2).Infof("Syncing network policy chains took %v", endTime)
}()

ipset, err := utils.NewIPSet(false)
if err != nil {
return nil, nil, err
}
err = ipset.Save()
if err != nil {
return nil, nil, err
}
npc.ipSetHandler = ipset

activePolicyChains := make(map[string]bool)
activePolicyIPSets := make(map[string]bool)

Expand All @@ -95,39 +106,36 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
if policy.policyType == "both" || policy.policyType == "ingress" {
// create a ipset for all destination pod ip's matched by the policy spec PodSelector
targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name)
targetDestPodIPSet, err := npc.ipSetHandler.Create(targetDestPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
}
err = targetDestPodIPSet.Refresh(currnetPodIps)
if err != nil {
glog.Errorf("failed to refresh targetDestPodIPSet,: " + err.Error())
setEntries := make([][]string, 0)
for _, podIP := range currnetPodIps {
setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(targetDestPodIPSetName, setEntries)
err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version)
if err != nil {
return nil, nil, err
}
activePolicyIPSets[targetDestPodIPSet.Name] = true
activePolicyIPSets[targetDestPodIPSetName] = true
}

if policy.policyType == "both" || policy.policyType == "egress" {
// create a ipset for all source pod ip's matched by the policy spec PodSelector
targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name)
targetSourcePodIPSet, err := npc.ipSetHandler.Create(targetSourcePodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
}
err = targetSourcePodIPSet.Refresh(currnetPodIps)
if err != nil {
glog.Errorf("failed to refresh targetSourcePodIPSet: " + err.Error())
setEntries := make([][]string, 0)
for _, podIP := range currnetPodIps {
setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(targetSourcePodIPSetName, setEntries)
err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version)
if err != nil {
return nil, nil, err
}
activePolicyIPSets[targetSourcePodIPSet.Name] = true
activePolicyIPSets[targetSourcePodIPSetName] = true
}
}

err = npc.ipSetHandler.Restore()
if err != nil {
return nil, nil, fmt.Errorf("failed to perform ipset restore: %s", err.Error())
}

glog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.")
Expand All @@ -152,21 +160,12 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo

if len(ingressRule.srcPods) != 0 {
srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, i)
srcPodIPSet, err := npc.ipSetHandler.Create(srcPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}

activePolicyIPSets[srcPodIPSet.Name] = true

ingressRuleSrcPodIPs := make([]string, 0, len(ingressRule.srcPods))
activePolicyIPSets[srcPodIPSetName] = true
setEntries := make([][]string, 0)
for _, pod := range ingressRule.srcPods {
ingressRuleSrcPodIPs = append(ingressRuleSrcPodIPs, pod.ip)
}
err = srcPodIPSet.Refresh(ingressRuleSrcPodIPs)
if err != nil {
glog.Errorf("failed to refresh srcPodIPSet: " + err.Error())
setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(srcPodIPSetName, setEntries)

if len(ingressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the ingress rule
Expand All @@ -183,15 +182,13 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if len(ingressRule.namedPorts) != 0 {
for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[namedPortIPSet.Name] = true
err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries)

comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
Expand Down Expand Up @@ -224,17 +221,13 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo

for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries)

activePolicyIPSets[namedPortIPSet.Name] = true

err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
}
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
Expand All @@ -255,15 +248,9 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo

if len(ingressRule.srcIPBlocks) != 0 {
srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, i)
srcIPBlockIPSet, err := npc.ipSetHandler.Create(srcIPBlockIPSetName, utils.TypeHashNet, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[srcIPBlockIPSet.Name] = true
err = srcIPBlockIPSet.RefreshWithBuiltinOptions(ingressRule.srcIPBlocks)
if err != nil {
glog.Errorf("failed to refresh srcIPBlockIPSet: " + err.Error())
}
activePolicyIPSets[srcIPBlockIPSetName] = true
npc.ipSetHandler.RefreshSet(srcIPBlockIPSetName, ingressRule.srcIPBlocks)

if !ingressRule.matchAllPorts {
for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
Expand All @@ -275,17 +262,12 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo

for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}

activePolicyIPSets[namedPortIPSet.Name] = true

err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries)
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
Expand Down Expand Up @@ -323,21 +305,12 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,

if len(egressRule.dstPods) != 0 {
dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, i)
dstPodIPSet, err := npc.ipSetHandler.Create(dstPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}

activePolicyIPSets[dstPodIPSet.Name] = true

egressRuleDstPodIps := make([]string, 0, len(egressRule.dstPods))
activePolicyIPSets[dstPodIPSetName] = true
setEntries := make([][]string, 0)
for _, pod := range egressRule.dstPods {
egressRuleDstPodIps = append(egressRuleDstPodIps, pod.ip)
}
err = dstPodIPSet.Refresh(egressRuleDstPodIps)
if err != nil {
glog.Errorf("failed to refresh dstPodIPSet: " + err.Error())
setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(dstPodIPSetName, setEntries)
if len(egressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the egress rule
// so match on specified source and destination ip's and specified port (if any) and protocol
Expand All @@ -353,17 +326,12 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if len(egressRule.namedPorts) != 0 {
for j, endPoints := range egressRule.namedPorts {
namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}

activePolicyIPSets[namedPortIPSet.Name] = true

err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries)
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
Expand Down Expand Up @@ -407,15 +375,8 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
}
if len(egressRule.dstIPBlocks) != 0 {
dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, i)
dstIPBlockIPSet, err := npc.ipSetHandler.Create(dstIPBlockIPSetName, utils.TypeHashNet, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[dstIPBlockIPSet.Name] = true
err = dstIPBlockIPSet.RefreshWithBuiltinOptions(egressRule.dstIPBlocks)
if err != nil {
glog.Errorf("failed to refresh dstIPBlockIPSet: " + err.Error())
}
activePolicyIPSets[dstIPBlockIPSetName] = true
npc.ipSetHandler.RefreshSet(dstIPBlockIPSetName, egressRule.dstIPBlocks)
if !egressRule.matchAllPorts {
for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
Expand Down
16 changes: 16 additions & 0 deletions pkg/utils/ipset.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@ func (ipset *IPSet) Add(set *Set) error {
return nil
}

// RefreshSet add/update internal Sets with a Set of entries but does not run restore command
func (ipset *IPSet) RefreshSet(setName string, entriesWithOptions [][]string) {
if ipset.Get(setName) == nil {
ipset.Sets[setName] = &Set{
Name: setName,
Options: []string{TypeHashIP, OptionTimeout, "0"},
Parent: ipset,
}
}
entries := make([]*Entry, len(entriesWithOptions))
for i, entry := range entriesWithOptions {
entries[i] = &Entry{Set: ipset.Sets[setName], Options: entry}
}
ipset.Get(setName).Entries = entries
}

// Add a given entry to the set. If the -exist option is specified, ipset
// ignores if the entry already added to the set.
// Note: if you need to add multiple entries (e.g., in a loop), use BatchAdd instead,
Expand Down

0 comments on commit afd866c

Please sign in to comment.