Skip to content

Commit

Permalink
Release 1.11 merge netpol (#2355)
Browse files Browse the repository at this point in the history
* egress networkpolicy acl add option apply-after-lb

* add named port

* fix egress node and gateway acl should apply after lb.

* 1. create np's acl rules before the pod completely starts
2. fix not allowing gateway packet pass

* refactor
  • Loading branch information
changluyi authored Feb 20, 2023
1 parent 14a8b9b commit 578b392
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 40 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/build-x86-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,8 @@ jobs:
needs:
- build-centos-compile
- k8s-conformance-e2e
# - k8s-netpol-e2e
- k8s-netpol-e2e
- k8s-netpol-legacy-e2e
- cyclonus-netpol-e2e
- kube-ovn-conformance-e2e
- kube-ovn-ic-conformance-e2e
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Controller struct {
//subnetVpcMap *sync.Map
podSubnetMap *sync.Map
ipam *ovnipam.IPAM
namedPort *NamedPort

ovnLegacyClient *ovs.LegacyClient
ovnClient *ovs.OvnClient
Expand Down Expand Up @@ -260,6 +261,7 @@ func NewController(config *Configuration) *Controller {
ovnLegacyClient: ovs.NewLegacyClient(config.OvnNbAddr, config.OvnTimeout, config.OvnSbAddr, config.ClusterRouter, config.ClusterTcpLoadBalancer, config.ClusterUdpLoadBalancer, config.ClusterTcpSessionLoadBalancer, config.ClusterUdpSessionLoadBalancer, config.NodeSwitch, config.NodeSwitchCIDR),
ovnPgKeyMutex: keymutex.New(97),
ipam: ovnipam.NewIPAM(),
namedPort: NewNamedPort(),

vpcsLister: vpcInformer.Lister(),
vpcSynced: vpcInformer.Informer().HasSynced,
Expand Down
44 changes: 27 additions & 17 deletions pkg/controller/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}

namedPortMap := c.namedPort.GetNamedPortByNs(np.Namespace)
ports, err := c.fetchSelectedPorts(np.Namespace, &np.Spec.PodSelector)
if err != nil {
klog.Errorf("failed to fetch ports, %v", err)
Expand Down Expand Up @@ -302,9 +303,9 @@ func (c *Controller) handleUpdateNp(key string) error {
}

if len(allows) != 0 || len(excepts) != 0 {
ingressAclCmd = c.ovnLegacyClient.CombineIngressACLCmd(pgName, ingressAllowAsName, ingressExceptAsName, protocol, npr.Ports, logEnable, ingressAclCmd, idx)
ingressAclCmd = c.ovnLegacyClient.CombineIngressACLCmd(pgName, ingressAllowAsName, ingressExceptAsName, protocol, npr.Ports, logEnable, ingressAclCmd, idx, namedPortMap)
} else {
ingressAclCmd = c.ovnLegacyClient.CombineIngressACLCmd(pgName, ingressAllowAsName, ingressExceptAsName, protocol, []netv1.NetworkPolicyPort{}, logEnable, ingressAclCmd, idx)
ingressAclCmd = c.ovnLegacyClient.CombineIngressACLCmd(pgName, ingressAllowAsName, ingressExceptAsName, protocol, []netv1.NetworkPolicyPort{}, logEnable, ingressAclCmd, idx, namedPortMap)
}
}
if len(np.Spec.Ingress) == 0 {
Expand All @@ -320,7 +321,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}
ingressPorts := []netv1.NetworkPolicyPort{}
ingressAclCmd = c.ovnLegacyClient.CombineIngressACLCmd(pgName, ingressAllowAsName, ingressExceptAsName, protocol, ingressPorts, logEnable, ingressAclCmd, 0)
ingressAclCmd = c.ovnLegacyClient.CombineIngressACLCmd(pgName, ingressAllowAsName, ingressExceptAsName, protocol, ingressPorts, logEnable, ingressAclCmd, 0, namedPortMap)
}

klog.Infof("create ingress acl cmd is: %v", ingressAclCmd)
Expand Down Expand Up @@ -445,7 +446,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}

if len(allows) != 0 || len(excepts) != 0 {
egressAclCmd = c.ovnLegacyClient.CombineEgressACLCmd(pgName, egressAllowAsName, egressExceptAsName, protocol, npr.Ports, logEnable, egressAclCmd, idx)
egressAclCmd = c.ovnLegacyClient.CombineEgressACLCmd(pgName, egressAllowAsName, egressExceptAsName, protocol, npr.Ports, logEnable, egressAclCmd, idx, namedPortMap)
}
}
if len(np.Spec.Egress) == 0 {
Expand All @@ -461,7 +462,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}
egressPorts := []netv1.NetworkPolicyPort{}
egressAclCmd = c.ovnLegacyClient.CombineEgressACLCmd(pgName, egressAllowAsName, egressExceptAsName, protocol, egressPorts, logEnable, egressAclCmd, 0)
egressAclCmd = c.ovnLegacyClient.CombineEgressACLCmd(pgName, egressAllowAsName, egressExceptAsName, protocol, egressPorts, logEnable, egressAclCmd, 0, namedPortMap)
}

klog.Infof("create egress acl cmd is: %v", egressAclCmd)
Expand Down Expand Up @@ -519,7 +520,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}
}

if err = c.ovnLegacyClient.CreateGatewayACL(pgName, subnet.Spec.Gateway, subnet.Spec.CIDRBlock); err != nil {
if err = c.ovnLegacyClient.CreateGatewayACL("", pgName, subnet.Spec.Gateway, subnet.Spec.CIDRBlock); err != nil {
klog.Errorf("failed to create gateway acl, %v", err)
return err
}
Expand Down Expand Up @@ -593,7 +594,7 @@ func (c *Controller) fetchSelectedPorts(namespace string, selector *metav1.Label

ports := make([]string, 0, len(pods))
for _, pod := range pods {
if !isPodAlive(pod) || pod.Spec.HostNetwork {
if pod.Spec.HostNetwork {
continue
}
podName := c.getNameByPod(pod)
Expand Down Expand Up @@ -723,19 +724,28 @@ func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, np
}

for _, pod := range pods {
for _, podIP := range pod.Status.PodIPs {
if podIP.IP != "" && util.CheckProtocol(podIP.IP) == protocol {
selectedAddresses = append(selectedAddresses, podIP.IP)
if len(svcs) == 0 {
continue
podNets, err := c.getPodKubeovnNets(pod)
if err != nil {
klog.Errorf("failed to get pod nets %v", err)
return nil, nil, err
}
for _, podNet := range podNets {
podIPAnnotation := pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)]
podIPs := strings.Split(podIPAnnotation, ",")
for _, podIP := range podIPs {
if podIP != "" && util.CheckProtocol(podIP) == protocol {
selectedAddresses = append(selectedAddresses, podIP)
}
}
if len(svcs) == 0 {
continue
}

svcIPs, err := svcMatchPods(svcs, pod, protocol)
if err != nil {
return nil, nil, err
}
selectedAddresses = append(selectedAddresses, svcIPs...)
svcIPs, err := svcMatchPods(svcs, pod, protocol)
if err != nil {
return nil, nil, err
}
selectedAddresses = append(selectedAddresses, svcIPs...)
}
}
}
Expand Down
136 changes: 127 additions & 9 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"time"

"gopkg.in/k8snetworkplumbingwg/multus-cni.v3/pkg/logging"
Expand All @@ -28,6 +29,117 @@ import (
"github.com/kubeovn/kube-ovn/pkg/util"
)

type NamedPort struct {
mutex sync.RWMutex
// first key is namespace, second key is portName
namedPortMap map[string]map[string]*util.NamedPortInfo
}

func NewNamedPort() *NamedPort {
return &NamedPort{
mutex: sync.RWMutex{},
namedPortMap: map[string]map[string]*util.NamedPortInfo{},
}
}

func (n *NamedPort) AddNamedPortByPod(pod *v1.Pod) {
n.mutex.Lock()
defer n.mutex.Unlock()
ns := pod.Namespace
podName := pod.Name

if pod.Spec.Containers == nil {
return
}

for _, container := range pod.Spec.Containers {
if container.Ports == nil {
continue
}

for _, port := range container.Ports {
if port.Name == "" || port.ContainerPort == 0 {
continue
}

if _, ok := n.namedPortMap[ns]; ok {
if _, ok := n.namedPortMap[ns][port.Name]; ok {
if n.namedPortMap[ns][port.Name].PortId == port.ContainerPort {
n.namedPortMap[ns][port.Name].Pods[podName] = ""
} else {
klog.Warning("named port %s has already be defined with portID %d",
port.Name, n.namedPortMap[ns][port.Name].PortId)
}
continue
}
} else {
n.namedPortMap[ns] = make(map[string]*util.NamedPortInfo)
}
n.namedPortMap[ns][port.Name] = &util.NamedPortInfo{
PortId: port.ContainerPort,
Pods: map[string]string{podName: ""},
}
}
}
}

func (n *NamedPort) DeleteNamedPortByPod(pod *v1.Pod) {
n.mutex.Lock()
defer n.mutex.Unlock()

ns := pod.Namespace
podName := pod.Name

if pod.Spec.Containers == nil {
return
}

for _, container := range pod.Spec.Containers {
if container.Ports == nil {
continue
}

for _, port := range container.Ports {
if port.Name == "" {
continue
}

if _, ok := n.namedPortMap[ns]; !ok {
continue
}

if _, ok := n.namedPortMap[ns][port.Name]; !ok {
continue
}

if _, ok := n.namedPortMap[ns][port.Name].Pods[podName]; !ok {
continue
}

delete(n.namedPortMap[ns][port.Name].Pods, podName)
if len(n.namedPortMap[ns][port.Name].Pods) == 0 {
delete(n.namedPortMap[ns], port.Name)
if len(n.namedPortMap[ns]) == 0 {
delete(n.namedPortMap, ns)
}
}
}
}
}

func (n *NamedPort) GetNamedPortByNs(namespace string) map[string]*util.NamedPortInfo {
n.mutex.RLock()
defer n.mutex.RUnlock()

if result, ok := n.namedPortMap[namespace]; ok {
for portName, info := range result {
klog.Infof("namespace %s's namedPort portname is %s with info %v ", namespace, portName, info)
}
return result
}
return nil
}

func isPodAlive(p *v1.Pod) bool {
if p.DeletionTimestamp != nil && p.DeletionGracePeriodSeconds != nil {
now := time.Now()
Expand Down Expand Up @@ -62,9 +174,12 @@ func (c *Controller) enqueueAddPod(obj interface{}) {

p := obj.(*v1.Pod)
// TODO: we need to find a way to reduce duplicated np added to the queue
if c.config.EnableNP && p.Status.PodIP != "" {
for _, np := range c.podMatchNetworkPolicies(p) {
c.updateNpQueue.Add(np)
if c.config.EnableNP {
c.namedPort.AddNamedPortByPod(p)
if p.Status.PodIP != "" {
for _, np := range c.podMatchNetworkPolicies(p) {
c.updateNpQueue.Add(np)
}
}
}

Expand Down Expand Up @@ -130,6 +245,7 @@ func (c *Controller) enqueueDeletePod(obj interface{}) {

p := obj.(*v1.Pod)
if c.config.EnableNP {
c.namedPort.DeleteNamedPortByPod(p)
for _, np := range c.podMatchNetworkPolicies(p) {
c.updateNpQueue.Add(np)
}
Expand All @@ -151,19 +267,14 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {
}

if c.config.EnableNP {
c.namedPort.AddNamedPortByPod(newPod)
if !reflect.DeepEqual(oldPod.Labels, newPod.Labels) {
oldNp := c.podMatchNetworkPolicies(oldPod)
newNp := c.podMatchNetworkPolicies(newPod)
for _, np := range util.DiffStringSlice(oldNp, newNp) {
c.updateNpQueue.Add(np)
}
}

if oldPod.Status.PodIP != newPod.Status.PodIP {
for _, np := range c.podMatchNetworkPolicies(newPod) {
c.updateNpQueue.Add(np)
}
}
}

if newPod.Spec.HostNetwork {
Expand Down Expand Up @@ -227,6 +338,13 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {

if newPod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" && newPod.Spec.NodeName != "" {
if newPod.Annotations[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] != "true" {
if c.config.EnableNP {
for _, np := range c.podMatchNetworkPolicies(newPod) {
klog.V(3).Infof("enqueue update pod %s' network policy", key)
c.updateNpQueue.Add(np)
}
}

klog.V(3).Infof("enqueue update pod %s", key)
c.updatePodQueue.Add(key)
break
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,11 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
c.patchSubnetStatus(subnet, "ResetLogicalSwitchAclSuccess", "")
}

if err := c.ovnLegacyClient.CreateGatewayACL(subnet.Name, "", subnet.Spec.Gateway, subnet.Spec.CIDRBlock); err != nil {
klog.Errorf("create gateway acl %s failed, %v", subnet.Name, err)
return err
}

if err := c.ovnLegacyClient.UpdateSubnetACL(subnet.Name, subnet.Spec.Acls); err != nil {
c.patchSubnetStatus(subnet, "SetLogicalSwitchAclsFailed", err.Error())
return err
Expand Down
Loading

0 comments on commit 578b392

Please sign in to comment.