From 1ca467ac5b0a94fe4307a7667d42095540814a3c Mon Sep 17 00:00:00 2001 From: wenying Date: Tue, 4 Jan 2022 11:16:45 +0800 Subject: [PATCH] [Multicast] Use group as flow actions for multicast traffic 1. Add local Pod receivers into an OpenFlow type "all" group for each multicast group, and use such groups in the flow actions. Remove a Pod from group buckets if the Pod has left the multicast group or is deleted before leaving the multicast group. 2. Improve multicast e2e tests. Signed-off-by: wenyingd Co-authored-by: Ruochen Shen --- cmd/antrea-agent/agent.go | 4 +- pkg/agent/cniserver/pod_configuration.go | 17 +- .../cniserver/pod_configuration_windows.go | 9 +- .../controller/egress/egress_controller.go | 5 +- .../egress/egress_controller_test.go | 7 +- pkg/agent/controller/networkpolicy/cache.go | 9 +- .../controller/networkpolicy/cache_test.go | 9 +- pkg/agent/multicast/mcast_controller.go | 188 +++++++++++++----- pkg/agent/multicast/mcast_controller_test.go | 75 +++++-- pkg/agent/multicast/mcast_discovery.go | 6 +- pkg/agent/multicast/mcast_route_test.go | 5 +- pkg/agent/openflow/client.go | 39 ++-- pkg/agent/openflow/framework.go | 3 +- pkg/agent/openflow/multicast.go | 50 ++++- pkg/agent/openflow/pipeline.go | 28 +-- pkg/agent/openflow/testing/mock_openflow.go | 66 +++--- pkg/agent/proxy/proxier.go | 6 +- pkg/agent/proxy/proxier_test.go | 10 +- pkg/agent/types/event.go | 22 ++ pkg/ovs/openflow/interfaces.go | 1 + pkg/ovs/openflow/ofctrl_bridge.go | 10 +- pkg/ovs/openflow/testing/mock_openflow.go | 14 ++ pkg/util/channel/channel.go | 10 +- pkg/util/channel/channel_test.go | 4 +- test/e2e/multicast_test.go | 48 +++-- test/integration/agent/openflow_test.go | 2 +- test/integration/ovs/ofctrl_test.go | 26 ++- 27 files changed, 490 insertions(+), 183 deletions(-) create mode 100644 pkg/agent/types/event.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 0d3a8f8e797..7076e3c33e8 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -577,11 +577,13 @@ func run(o *Options) error { } mcastController := multicast.NewMulticastController( ofClient, + v4GroupIDAllocator, nodeConfig, ifaceStore, multicastSocket, sets.NewString(append(o.config.MulticastInterfaces, nodeConfig.NodeTransportInterfaceName)...), - ovsBridgeClient) + ovsBridgeClient, + podUpdateChannel) if err := mcastController.Initialize(); err != nil { return err } diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 0c2027df658..026cb7af413 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -32,6 +32,7 @@ import ( "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" + types2 "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" @@ -312,6 +313,7 @@ func (pc *podConfigurator) removeInterfaces(containerID string) error { if err := pc.routeClient.DeleteLocalAntreaFlexibleIPAMPodRule(containerConfig.IPs); err != nil { return err } + return nil } @@ -495,7 +497,13 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta // Add containerConfig into local cache pc.ifaceStore.AddInterface(containerConfig) // Notify the Pod update event to required components. - pc.podUpdateNotifier.Notify(k8s.NamespacedName(containerConfig.PodNamespace, containerConfig.PodName)) + event := types2.PodUpdate{ + PodName: containerConfig.PodName, + PodNamespace: containerConfig.PodNamespace, + IsAdd: true, + ContainerID: containerConfig.ContainerID, + } + pc.podUpdateNotifier.Notify(event) return nil } @@ -518,6 +526,13 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface } // Remove container configuration from cache. pc.ifaceStore.DeleteInterface(containerConfig) + event := types2.PodUpdate{ + PodName: containerConfig.PodName, + PodNamespace: containerConfig.PodNamespace, + IsAdd: false, + ContainerID: containerConfig.ContainerID, + } + pc.podUpdateNotifier.Notify(event) klog.Infof("Removed interfaces for container %s", containerID) return nil } diff --git a/pkg/agent/cniserver/pod_configuration_windows.go b/pkg/agent/cniserver/pod_configuration_windows.go index a2dc2834081..16f824f847e 100644 --- a/pkg/agent/cniserver/pod_configuration_windows.go +++ b/pkg/agent/cniserver/pod_configuration_windows.go @@ -24,6 +24,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/util/k8s" ) @@ -49,7 +50,13 @@ func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.I // Update interface config with the ofPort. ifConfig.OVSPortConfig.OFPort = ofPort // Notify the Pod update event to required components. - pc.podUpdateNotifier.Notify(k8s.NamespacedName(ifConfig.PodNamespace, ifConfig.PodName)) + event := types.PodUpdate{ + PodName: ifConfig.PodName, + PodNamespace: ifConfig.PodNamespace, + IsAdd: true, + ContainerID: ifConfig.ContainerID, + } + pc.podUpdateNotifier.Notify(event) return nil }) } diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index 167d0756a7f..a683a8d84e0 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -41,6 +41,7 @@ import ( "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" + "antrea.io/antrea/pkg/agent/types" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned" @@ -217,9 +218,11 @@ func NewEgressController( // processPodUpdate will be called when CNIServer publishes a Pod update event. // It triggers reconciling the effective Egress of the Pod. -func (c *EgressController) processPodUpdate(pod string) { +func (c *EgressController) processPodUpdate(e interface{}) { c.egressBindingsMutex.Lock() defer c.egressBindingsMutex.Unlock() + podEvent := e.(types.PodUpdate) + pod := k8s.NamespacedName(podEvent.PodNamespace, podEvent.PodName) binding, exists := c.egressBindings[pod] if !exists { return diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index 9c448dd1cf6..0d241bf7e8e 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -38,6 +38,7 @@ import ( ipassignertest "antrea.io/antrea/pkg/agent/ipassigner/testing" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" + "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" @@ -582,7 +583,11 @@ func TestPodUpdateShouldSyncEgress(t *testing.T) { c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) // Mock CNIServer addPodInterface(c.ifaceStore, "ns1", "pendingPod", 10) - c.podUpdateChannel.Notify("ns1/pendingPod") + ev := types.PodUpdate{ + PodName: "pendingPod", + PodNamespace: "ns1", + } + c.podUpdateChannel.Notify(ev) require.NoError(t, wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) { return c.queue.Len() == 1, nil })) diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index 32011eaf8cd..3db75ef08ea 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -28,6 +28,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/metrics" + types2 "antrea.io/antrea/pkg/agent/types" v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" "antrea.io/antrea/pkg/querier" @@ -361,12 +362,12 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub // done if antrea-controller has computed the Pods' policies and propagated // them to this Node by their labels and NodeName, instead of waiting for their // IPs are reported to kube-apiserver and processed by antrea-controller. -func (c *ruleCache) processPodUpdate(pod string) { - namespace, name := k8s.SplitNamespacedName(pod) +func (c *ruleCache) processPodUpdate(e interface{}) { + podEvent := e.(types2.PodUpdate) member := &v1beta.GroupMember{ Pod: &v1beta.PodReference{ - Name: name, - Namespace: namespace, + Name: podEvent.PodName, + Namespace: podEvent.PodNamespace, }, } c.appliedToSetLock.RLock() diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 35878d26cfd..00c01d3ed25 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -25,8 +25,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/k8s" ) var ( @@ -1184,7 +1186,12 @@ func TestRuleCacheProcessPodUpdates(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go podUpdateNotifier.Run(stopCh) - podUpdateNotifier.Notify(tt.podUpdate) + ns, name := k8s.SplitNamespacedName(tt.podUpdate) + e := types.PodUpdate{ + PodNamespace: ns, + PodName: name, + } + podUpdateNotifier.Notify(e) func() { // Drain the channel with 10 ms timeout so we can know it's done. for { diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index c11a923371f..2ac487f8a89 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -28,7 +28,11 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/agent/util" + binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" ) type eventType uint8 @@ -59,10 +63,12 @@ type mcastGroupEvent struct { type GroupMemberStatus struct { group net.IP - // localMembers is a set for the local Pod's interface name which has joined in the multicast group. - localMembers sets.String + // localMembers is a map for the local Pod member and its last update time, key is the Pod's interface name, + // and value is its last update time. + localMembers map[string]time.Time lastIGMPReport time.Time mutex sync.RWMutex + ofGroupID binding.GroupIDType } // eventHandler process the multicast Group membership report or leave messages. @@ -82,7 +88,8 @@ func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) { status := &GroupMemberStatus{ group: e.group, lastIGMPReport: e.time, - localMembers: sets.NewString(e.iface.InterfaceName), + localMembers: map[string]time.Time{e.iface.InterfaceName: e.time}, + ofGroupID: c.v4GroupAllocator.Allocate(), } c.groupCache.Add(status) c.queue.Add(e.group.String()) @@ -101,25 +108,35 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent defer status.mutex.Unlock() newStatus := &GroupMemberStatus{ group: status.group, - localMembers: status.localMembers.Union(nil), + localMembers: make(map[string]time.Time), lastIGMPReport: status.lastIGMPReport, + ofGroupID: status.ofGroupID, } - exist := status.localMembers.Has(e.iface.InterfaceName) + for m, t := range status.localMembers { + newStatus.localMembers[m] = t + } + _, exist := status.localMembers[e.iface.InterfaceName] switch e.eType { case groupJoin: newStatus.lastIGMPReport = e.time + newStatus.localMembers[e.iface.InterfaceName] = e.time + c.groupCache.Update(newStatus) if !exist { - newStatus.localMembers.Insert(e.iface.InterfaceName) klog.InfoS("Added member to multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + c.queue.Add(newStatus.group.String()) } - c.groupCache.Update(newStatus) - c.queue.Add(newStatus.group.String()) case groupLeave: if exist { - newStatus.localMembers.Delete(e.iface.InterfaceName) + delete(newStatus.localMembers, e.iface.InterfaceName) c.groupCache.Update(newStatus) klog.InfoS("Deleted member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - if len(newStatus.localMembers) == 0 { + _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) + // Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are + // other local members in the multicast group. + if !found || len(newStatus.localMembers) > 0 { + c.queue.Add(newStatus.group.String()) + } else { + // Check if all local members have left the multicast group. klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) c.checkLastMember(e.group) } @@ -139,7 +156,7 @@ func (c *Controller) checkLastMember(group net.IP) { c.queue.AddAfter(group.String(), igmpMaxResponseTime) } -// clearStaleGroups checks the stale groups which have not been updated for mcastGroupTimeout, and then notifies worker +// clearStaleGroups checks the stale group members which have not been updated for mcastGroupTimeout, and then notifies worker // to remove them from groupCache. func (c *Controller) clearStaleGroups() { now := time.Now() @@ -147,20 +164,64 @@ func (c *Controller) clearStaleGroups() { status := obj.(*GroupMemberStatus) status.mutex.RLock() diff := now.Sub(status.lastIGMPReport) - status.mutex.RUnlock() if diff > mcastGroupTimeout { + // Notify worker to remove the group from groupCache if all its members are not updated before mcastGroupTimeout. c.queue.Add(status.group.String()) + } else { + // Create a "leave" event for a local member if it is not updated before mcastGroupTimeout. + for member, lastUpdate := range status.localMembers { + if now.Sub(lastUpdate) > mcastGroupTimeout { + ifConfig := &interfacestore.InterfaceConfig{ + InterfaceName: member, + } + event := &mcastGroupEvent{ + group: status.group, + eType: groupLeave, + time: now, + iface: ifConfig, + } + c.groupEventCh <- event + } + } + } + status.mutex.RUnlock() + } +} + +// removeLocalInterface searches the GroupMemberStatus which the deleted interface has joined, and then triggers a member +// leave event so that Antrea can remove the corresponding interface from local multicast receivers on OVS. This function +// should be called if the removed Pod receiver fails to send IGMP leave message before deletion. +func (c *Controller) removeLocalInterface(e interface{}) { + podEvent := e.(types.PodUpdate) + // Ignore Pod creation event. + if podEvent.IsAdd { + return + } + interfaceName := util.GenerateContainerInterfaceName(podEvent.PodName, podEvent.PodNamespace, podEvent.ContainerID) + ifConfig := &interfacestore.InterfaceConfig{ + InterfaceName: interfaceName, + } + groupStatuses := c.getGroupMemberStatusesByPod(interfaceName) + for _, g := range groupStatuses { + event := &mcastGroupEvent{ + group: g.group, + eType: groupLeave, + time: time.Now(), + iface: ifConfig, } + c.groupEventCh <- event } } type Controller struct { - ofClient openflow.Client - nodeConfig *config.NodeConfig - igmpSnooper *IGMPSnooper - groupEventCh chan *mcastGroupEvent - groupCache cache.Indexer - queue workqueue.RateLimitingInterface + ofClient openflow.Client + v4GroupAllocator openflow.GroupAllocator + ifaceStore interfacestore.InterfaceStore + nodeConfig *config.NodeConfig + igmpSnooper *IGMPSnooper + groupEventCh chan *mcastGroupEvent + groupCache cache.Indexer + queue workqueue.RateLimitingInterface // installedGroups saves the groups which are configured on both OVS and the host. installedGroups sets.String installedGroupsMutex sync.RWMutex @@ -168,33 +229,39 @@ type Controller struct { ovsBridgeClient ovsconfig.OVSBridgeClient } -func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, multicastSocket RouteInterface, multicastInterfaces sets.String, ovsBridgeClient ovsconfig.OVSBridgeClient) *Controller { +func NewMulticastController(ofClient openflow.Client, + v4GroupAllocator openflow.GroupAllocator, + nodeConfig *config.NodeConfig, + ifaceStore interfacestore.InterfaceStore, + multicastSocket RouteInterface, + multicastInterfaces sets.String, + ovsBridgeClient ovsconfig.OVSBridgeClient, + podUpdateSubscriber channel.Subscriber) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) groupSnooper := newSnooper(ofClient, ifaceStore, eventCh) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces) - return &Controller{ - ofClient: ofClient, - nodeConfig: nodeConfig, - igmpSnooper: groupSnooper, - groupEventCh: eventCh, - groupCache: groupCache, - installedGroups: sets.NewString(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), - mRouteClient: multicastRouteClient, - ovsBridgeClient: ovsBridgeClient, + c := &Controller{ + ofClient: ofClient, + ifaceStore: ifaceStore, + v4GroupAllocator: v4GroupAllocator, + nodeConfig: nodeConfig, + igmpSnooper: groupSnooper, + groupEventCh: eventCh, + groupCache: groupCache, + installedGroups: sets.NewString(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), + mRouteClient: multicastRouteClient, + ovsBridgeClient: ovsBridgeClient, } + podUpdateSubscriber.Subscribe(c.removeLocalInterface) + return c } func (c *Controller) Initialize() error { - err := c.configureOVSMulticast() - if err != nil { - klog.ErrorS(err, "Failed to configure multicast for the OVS bridge") - return err - } - err = c.mRouteClient.Initialize() + err := c.mRouteClient.Initialize() if err != nil { return err } @@ -221,7 +288,6 @@ func (c *Controller) Run(stopCh <-chan struct{}) { // Periodically check the group member status, and remove the groups in which no members exist go wait.NonSlidingUntil(c.clearStaleGroups, queryInterval, stopCh) - go c.eventHandler(stopCh) for i := 0; i < int(workerCount); i++ { @@ -298,15 +364,30 @@ func (c *Controller) syncGroup(groupKey string) error { return nil } status := obj.(*GroupMemberStatus) + memberPorts := make([]uint32, 0, len(status.localMembers)) + status.mutex.RLock() + defer status.mutex.RUnlock() + for memberInterfaceName := range status.localMembers { + obj, found := c.ifaceStore.GetInterfaceByName(memberInterfaceName) + if !found { + klog.InfoS("Failed to find interface from cache", "interface", memberInterfaceName) + continue + } + memberPorts = append(memberPorts, uint32(obj.OFPort)) + } if c.groupHasInstalled(groupKey) { - status.mutex.Lock() - defer status.mutex.Unlock() if c.groupIsStale(status) { // Remove the multicast flow entry if no local Pod is in the group. - if err := c.ofClient.UninstallMulticastFlow(status.group); err != nil { + if err := c.ofClient.UninstallMulticastFlows(status.group); err != nil { klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) return err } + // Remove the multicast flow entry if no local Pod is in the group. + if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil { + klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey) + return err + } + c.v4GroupAllocator.Release(status.ofGroupID) err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) if err != nil { klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) @@ -320,10 +401,22 @@ func (c *Controller) syncGroup(groupKey string) error { c.delInstalledGroup(groupKey) c.groupCache.Delete(status) klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) + return nil } + // Reinstall OpenFlow group because the local Pod receivers have changed. + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + return err + } + klog.V(2).InfoS("Updated OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) return nil } - if err := c.ofClient.InstallMulticastFlow(status.group); err != nil { + // Install OpenFlow group for a new multicast group which has local Pod receivers joined. + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + return err + } + klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + // Install OpenFlow flow to forward packets to local Pod receivers which are included in the group. + if err := c.ofClient.InstallMulticastFlows(status.group, status.ofGroupID); err != nil { klog.ErrorS(err, "Failed to install multicast flows", "group", status.group) return err } @@ -388,21 +481,6 @@ func podInterfaceIndexFunc(obj interface{}) ([]string, error) { return podInterfaces, nil } -func (c *Controller) configureOVSMulticast() error { - // Configure bridge to enable multicast snooping. - err := c.ovsBridgeClient.SetBridgeMcastSnooping(true) - if err != nil { - return err - } - // Disable flooding of unregistered multicast packets to all ports. - otherConfig := map[string]interface{}{"mcast-snooping-disable-flood-unregistered": "true"} - err = c.ovsBridgeClient.AddBridgeOtherConfig(otherConfig) - if err != nil { - return err - } - return nil -} - func getGroupEventKey(obj interface{}) (string, error) { groupState := obj.(*GroupMemberStatus) return groupState.group.String(), nil diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index 9918b5ffeb6..0536dfda675 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -36,8 +36,10 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" ifaceStoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" multicasttest "antrea.io/antrea/pkg/agent/multicast/testing" + "antrea.io/antrea/pkg/agent/openflow" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + "antrea.io/antrea/pkg/util/channel" ) var ( @@ -45,16 +47,21 @@ var ( mockMulticastSocket *multicasttest.MockRouteInterface mockIfaceStore *ifaceStoretest.MockInterfaceStore ovsClient *ovsconfigtest.MockOVSBridgeClient - mgroup = net.ParseIP("224.96.1.3") if1 = &interfacestore.InterfaceConfig{ Type: interfacestore.ContainerInterface, InterfaceName: "if1", IPs: []net.IP{net.ParseIP("192.168.1.1")}, + OVSPortConfig: &interfacestore.OVSPortConfig{ + OFPort: 1, + }, } if2 = &interfacestore.InterfaceConfig{ Type: interfacestore.ContainerInterface, InterfaceName: "if2", IPs: []net.IP{net.ParseIP("192.168.1.2")}, + OVSPortConfig: &interfacestore.OVSPortConfig{ + OFPort: 2, + }, } nodeIf1IP = net.ParseIP("192.168.20.22") externalInterfaceIP = net.ParseIP("192.168.50.23") @@ -63,6 +70,7 @@ var ( ) func TestAddGroupMemberStatus(t *testing.T) { + mgroup := net.ParseIP("224.96.1.3") event := &mcastGroupEvent{ group: mgroup, eType: groupJoin, @@ -82,7 +90,9 @@ func TestAddGroupMemberStatus(t *testing.T) { key, ok := obj.(string) assert.True(t, ok) assert.Equal(t, mgroup.String(), key) - mockOFClient.EXPECT().InstallMulticastFlow(mgroup).Times(1) + mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastFlows(mgroup, gomock.Any()).Times(1) mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) err = mctrl.syncGroup(key) assert.Nil(t, err) @@ -94,6 +104,7 @@ func TestUpdateGroupMemberStatus(t *testing.T) { err := mctrl.initialize(t) assert.Nil(t, err) igmpMaxResponseTime = time.Second * 1 + mgroup := net.ParseIP("224.96.1.4") event := &mcastGroupEvent{ group: mgroup, eType: groupJoin, @@ -101,7 +112,6 @@ func TestUpdateGroupMemberStatus(t *testing.T) { iface: if1, } mctrl.addGroupMemberStatus(event) - obj, _, _ := mctrl.groupCache.GetByKey(event.group.String()) mockOFClient.EXPECT().SendIGMPQueryPacketOut(igmpQueryDstMac, mcastAllHosts, uint32(openflow13.P_NORMAL), gomock.Any()).Times(len(queryVersions)) for _, e := range []*mcastGroupEvent{ {group: mgroup, eType: groupJoin, time: event.time.Add(time.Second * 20), iface: if1}, @@ -110,6 +120,10 @@ func TestUpdateGroupMemberStatus(t *testing.T) { {group: mgroup, eType: groupLeave, time: event.time.Add(time.Second * 61), iface: if1}, {group: mgroup, eType: groupLeave, time: event.time.Add(time.Second * 62), iface: if2}, } { + obj, _, _ := mctrl.groupCache.GetByKey(event.group.String()) + if e.eType == groupLeave { + mockIfaceStore.EXPECT().GetInterfaceByName(e.iface.InterfaceName).Return(e.iface, true) + } mctrl.updateGroupMemberStatus(obj, e) groupCache := mctrl.groupCache compareGroupStatus(t, groupCache, e) @@ -127,15 +141,22 @@ func TestCheckLastMember(t *testing.T) { workerCount = 1 igmpMaxResponseTime = time.Second * 1 lastProbe := time.Now() + mgroup := net.ParseIP("224.96.1.2") testCheckLastMember := func(ev *mcastGroupEvent, expExist bool) { status := &GroupMemberStatus{ - localMembers: sets.NewString(), + localMembers: map[string]time.Time{}, lastIGMPReport: lastProbe, } if ev != nil { status.group = ev.group + if ev.eType == groupLeave { + mockOFClient.EXPECT().UninstallGroup(gomock.Any()) + mockOFClient.EXPECT().UninstallMulticastFlows(status.group) + } } else { status.group = mgroup + mockOFClient.EXPECT().UninstallGroup(gomock.Any()) + mockOFClient.EXPECT().UninstallMulticastFlows(status.group) } _ = mctrl.groupCache.Add(status) mctrl.addInstalledGroup(status.group.String()) @@ -167,7 +188,8 @@ func TestCheckLastMember(t *testing.T) { } mctrl.queue.Forget(obj) } - mockOFClient.EXPECT().UninstallMulticastFlow(gomock.Any()).Times(2) + mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true).Times(1) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()).Times(1) for _, tc := range []struct { ev *mcastGroupEvent exists bool @@ -194,30 +216,31 @@ func TestClearStaleGroups(t *testing.T) { mctrl.worker() wg.Done() }() - now := time.Now() + validUpdateTime := now.Add(-queryInterval) validGroups := []*GroupMemberStatus{ { group: net.ParseIP("224.96.1.2"), - localMembers: sets.NewString("p1", "p2"), - lastIGMPReport: now.Add(-queryInterval), + localMembers: map[string]time.Time{"p1": now, "p2": validUpdateTime}, + lastIGMPReport: validUpdateTime, }, { group: net.ParseIP("224.96.1.3"), - localMembers: sets.NewString(), - lastIGMPReport: now.Add(-queryInterval), + localMembers: map[string]time.Time{"p2": validUpdateTime}, + lastIGMPReport: validUpdateTime, }, } + staleUpdateTime := now.Add(-mcastGroupTimeout - time.Second) staleGroups := []*GroupMemberStatus{ { group: net.ParseIP("224.96.1.4"), - localMembers: sets.NewString("p1", "p3"), - lastIGMPReport: now.Add(-mcastGroupTimeout - time.Second), + localMembers: map[string]time.Time{"p1": staleUpdateTime, "p3": staleUpdateTime}, + lastIGMPReport: staleUpdateTime, }, { group: net.ParseIP("224.96.1.5"), - localMembers: sets.NewString(), - lastIGMPReport: now.Add(-mcastGroupTimeout - time.Second), + localMembers: map[string]time.Time{}, + lastIGMPReport: staleUpdateTime, }, } for _, g := range validGroups { @@ -225,12 +248,19 @@ func TestClearStaleGroups(t *testing.T) { assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) } + fakePort := int32(1) for _, g := range staleGroups { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + for m := range g.localMembers { + mockIface := &interfacestore.InterfaceConfig{InterfaceName: m, OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: fakePort}} + mockIfaceStore.EXPECT().GetInterfaceByName(m).Return(mockIface, true) + fakePort++ + } } - mockOFClient.EXPECT().UninstallMulticastFlow(gomock.Any()).Times(len(staleGroups)) + mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(len(staleGroups)) + mockOFClient.EXPECT().UninstallMulticastFlows(gomock.Any()).Times(len(staleGroups)) mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(len(staleGroups)) mctrl.clearStaleGroups() mctrl.queue.ShutDown() @@ -251,7 +281,9 @@ func TestProcessPacketIn(t *testing.T) { snooper := mockController.igmpSnooper stopCh := make(chan struct{}) defer close(stopCh) - go mockController.eventHandler(stopCh) + go func() { + mockController.eventHandler(stopCh) + }() getIPs := func(ipStrs []string) []net.IP { ips := make([]net.IP, len(ipStrs)) @@ -285,6 +317,7 @@ func TestProcessPacketIn(t *testing.T) { version: 3, }, } { + mockIfaceStore.EXPECT().GetInterfaceByName(tc.iface.InterfaceName).Return(tc.iface, true).AnyTimes() packets := createIGMPReportPacketIn(getIPs(tc.joinedGroups.List()), getIPs(tc.leftGroups.List()), tc.version, uint32(tc.iface.OFPort)) mockOFClient.EXPECT().SendIGMPQueryPacketOut(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() for _, pkt := range packets { @@ -311,11 +344,11 @@ func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEven assert.Equal(t, event.group, status.group) if event.eType == groupJoin { assert.True(t, status.lastIGMPReport.Equal(event.time) || status.lastIGMPReport.After(event.time)) - exists := status.localMembers.Has(event.iface.InterfaceName) + _, exists := status.localMembers[event.iface.InterfaceName] assert.Truef(t, exists, "member is not added into cache") } else { assert.True(t, status.lastIGMPReport.Before(event.time)) - exists := status.localMembers.Has(event.iface.InterfaceName) + _, exists := status.localMembers[event.iface.InterfaceName] assert.Falsef(t, exists, "member is not removed from cache") } } @@ -329,13 +362,13 @@ func newMockMulticastController(t *testing.T) *Controller { addr := &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addr} mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) - mctrl := NewMulticastController(mockOFClient, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient) + groupAllocator := openflow.NewGroupAllocator(false) + podUpdateSubscriber := channel.NewSubscribableChannel("PodUpdate", 100) + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber) return mctrl } func (c *Controller) initialize(t *testing.T) error { - ovsClient.EXPECT().SetBridgeMcastSnooping(true).Times(1) - ovsClient.EXPECT().AddBridgeOtherConfig(map[string]interface{}{"mcast-snooping-disable-flood-unregistered": "true"}).Times(1) mockOFClient.EXPECT().InstallMulticastInitialFlows(uint8(0)).Times(1) mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(0)).Times(1).Return([]uint16{0}, nil) mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(1)).Times(1).Return([]uint16{1, 2}, nil) diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go index 5f418513ce3..7d4b7fe91fc 100644 --- a/pkg/agent/multicast/mcast_discovery.go +++ b/pkg/agent/multicast/mcast_discovery.go @@ -131,7 +131,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { fallthrough case protocol.IGMPv2Report: mgroup := igmp.(*protocol.IGMPv1or2).GroupAddress - klog.InfoS("Received IGMPv1or2 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) + klog.V(2).InfoS("Received IGMPv1or2 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) event := &mcastGroupEvent{ group: mgroup, eType: groupJoin, @@ -143,7 +143,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { msg := igmp.(*protocol.IGMPv3MembershipReport) for _, gr := range msg.GroupRecords { mgroup := gr.MulticastAddress - klog.InfoS("Received IGMPv3 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName, "recordType", gr.Type, "sourceCount", gr.NumberOfSources) + klog.V(2).InfoS("Received IGMPv3 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName, "recordType", gr.Type, "sourceCount", gr.NumberOfSources) evtType := groupJoin if (gr.Type == protocol.IGMPIsIn || gr.Type == protocol.IGMPToIn) && gr.NumberOfSources == 0 { evtType = groupLeave @@ -159,7 +159,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { case protocol.IGMPv2LeaveGroup: mgroup := igmp.(*protocol.IGMPv1or2).GroupAddress - klog.InfoS("Received IGMPv2 Leave message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) + klog.V(2).InfoS("Received IGMPv2 Leave message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) event := &mcastGroupEvent{ group: mgroup, eType: groupLeave, diff --git a/pkg/agent/multicast/mcast_route_test.go b/pkg/agent/multicast/mcast_route_test.go index 081171e7c00..b9f4c14d69c 100644 --- a/pkg/agent/multicast/mcast_route_test.go +++ b/pkg/agent/multicast/mcast_route_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -76,12 +77,12 @@ func TestProcessIGMPNocacheMsg(t *testing.T) { mRoute.internalInterfaceVIF = uint16(0) status1 := &GroupMemberStatus{ group: net.ParseIP("224.3.3.8"), - localMembers: sets.NewString("aa"), + localMembers: map[string]time.Time{"aa": time.Now()}, } mRoute.groupCache.Add(status1) status2 := &GroupMemberStatus{ group: net.ParseIP("224.3.3.9"), - localMembers: sets.NewString(), + localMembers: map[string]time.Time{}, } mRoute.groupCache.Add(status2) for _, m := range []struct { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 1ef15b5b503..a8aad96d2b9 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -82,9 +82,9 @@ type Client interface { // InstallServiceGroup installs a group for Service LB. Each endpoint // is a bucket of the group. For now, each bucket has the same weight. InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error - // UninstallServiceGroup removes the group and its buckets that are - // installed by InstallServiceGroup. - UninstallServiceGroup(groupID binding.GroupIDType) error + // UninstallGroup removes the group and its buckets that are + // installed by InstallServiceGroup or InstallMulticastGroup. + UninstallGroup(groupID binding.GroupIDType) error // InstallEndpointFlows installs flows for accessing Endpoints. // If an Endpoint is on the current Node, then flows for hairpin and endpoint @@ -268,17 +268,18 @@ type Client interface { // InstallMulticastInitialFlows installs OpenFlow to packetIn the IGMP messages and output the Multicast traffic to // antrea-gw0 so that local Pods could access external Multicast servers. InstallMulticastInitialFlows(pktInReason uint8) error - // InstallMulticastFlow installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 + // InstallMulticastFlows installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 // to ensure it can be forwarded to the external addresses. - InstallMulticastFlow(multicastIP net.IP) error - // UninstallMulticastFlow removes the flow matching the given multicastIP. - UninstallMulticastFlow(multicastIP net.IP) error + InstallMulticastFlows(multicastIP net.IP, groupID binding.GroupIDType) error + // UninstallMulticastFlows removes the flow matching the given multicastIP. + UninstallMulticastFlows(multicastIP net.IP) error // SendIGMPQueryPacketOut sends the IGMPQuery packet as a packet-out to OVS from the gateway port. SendIGMPQueryPacketOut( dstMAC net.HardwareAddr, dstIP net.IP, outPort uint32, igmp ofutil.Message) error + InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32) error } // GetFlowTableStatus returns an array of flow table status. @@ -532,7 +533,7 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff return nil } -func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error { +func (c *client) UninstallGroup(groupID binding.GroupIDType) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() if !c.bridge.DeleteGroup(groupID) { @@ -730,7 +731,7 @@ func (c *client) generatePipelines() { if c.enableMulticast { // TODO: add support for IPv6 protocol - c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}) + c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge) c.activatedFeatures = append(c.activatedFeatures, c.featureMulticast) } c.featureTraceflow = newFeatureTraceflow() @@ -820,6 +821,9 @@ func (c *client) ReplayFlows() { } c.featureService.replayGroups() + if c.enableMulticast { + c.featureMulticast.replayGroups() + } for _, activeFeature := range c.activatedFeatures { if err := c.ofEntryOperations.AddAll(activeFeature.replayFlows()); err != nil { @@ -1095,15 +1099,15 @@ func (c *client) InstallMulticastInitialFlows(pktInReason uint8) error { return c.addFlows(c.featureMulticast.cachedFlows, cacheKey, flows) } -func (c *client) InstallMulticastFlow(multicastIP net.IP) error { - flows := c.featureMulticast.localMulticastForwardFlow(multicastIP) +func (c *client) InstallMulticastFlows(multicastIP net.IP, groupID binding.GroupIDType) error { + flows := c.featureMulticast.localMulticastForwardFlows(multicastIP, groupID) cacheKey := fmt.Sprintf("multicast_%s", multicastIP.String()) c.replayMutex.RLock() defer c.replayMutex.RUnlock() return c.addFlows(c.featureMulticast.cachedFlows, cacheKey, flows) } -func (c *client) UninstallMulticastFlow(multicastIP net.IP) error { +func (c *client) UninstallMulticastFlows(multicastIP net.IP) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() cacheKey := fmt.Sprintf("multicast_%s", multicastIP.String()) @@ -1129,3 +1133,14 @@ func (c *client) SendIGMPQueryPacketOut( packetOutObj := packetOutBuilder.Done() return c.bridge.SendPacketOut(packetOutObj) } + +func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + + targetPorts := append([]uint32{config.HostGatewayOFPort}, localReceivers...) + if err := c.featureMulticast.multicastReceiversGroup(groupID, targetPorts...); err != nil { + return err + } + return nil +} diff --git a/pkg/agent/openflow/framework.go b/pkg/agent/openflow/framework.go index c01edea5ca3..4d26157da46 100644 --- a/pkg/agent/openflow/framework.go +++ b/pkg/agent/openflow/framework.go @@ -242,7 +242,8 @@ func (f *featureEgress) getRequiredTables() []*Table { func (f *featureMulticast) getRequiredTables() []*Table { return []*Table{ - MulticastTable, + MulticastRoutingTable, + MulticastOutputTable, } } diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 3d416aa71fa..8a2adedfcc1 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -15,7 +15,11 @@ package openflow import ( + "fmt" "net" + "sync" + + "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/openflow/cookie" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -26,8 +30,10 @@ var _, mcastCIDR, _ = net.ParseCIDR("224.0.0.0/4") type featureMulticast struct { cookieAllocator cookie.Allocator ipProtocols []binding.Protocol + bridge binding.Bridge cachedFlows *flowCategoryCache + groupCache sync.Map category cookie.Category } @@ -36,12 +42,14 @@ func (f *featureMulticast) getFeatureName() string { return "Multicast" } -func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol) *featureMulticast { +func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge) *featureMulticast { return &featureMulticast{ cookieAllocator: cookieAllocator, ipProtocols: ipProtocols, cachedFlows: newFlowCategoryCache(), + bridge: bridge, category: cookie.Multicast, + groupCache: sync.Map{}, } } @@ -56,10 +64,48 @@ func multicastPipelineClassifyFlow(cookieID uint64, pipeline binding.Pipeline) b } func (f *featureMulticast) initFlows() []binding.Flow { - return []binding.Flow{} + cookieID := f.cookieAllocator.Request(f.category).Raw() + return []binding.Flow{ + f.multicastOutputFlow(cookieID), + } } func (f *featureMulticast) replayFlows() []binding.Flow { // Get cached flows. return getCachedFlows(f.cachedFlows) } + +func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, ports ...uint32) error { + group := f.bridge.CreateGroupTypeAll(groupID).ResetBuckets() + for i := range ports { + group = group.Bucket(). + LoadToRegField(OFPortFoundRegMark.GetField(), OFPortFoundRegMark.GetValue()). + LoadToRegField(TargetOFPortField, ports[i]). + ResubmitToTable(MulticastRoutingTable.GetNext()). + Done() + } + if err := group.Add(); err != nil { + return fmt.Errorf("error when installing Multicast receiver Group: %w", err) + } + f.groupCache.Store(groupID, group) + return nil +} + +func (f *featureMulticast) multicastOutputFlow(cookieID uint64) binding.Flow { + return MulticastOutputTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchRegMark(OFPortFoundRegMark). + Action().OutputToRegField(TargetOFPortField). + Done() +} + +func (f *featureMulticast) replayGroups() { + f.groupCache.Range(func(id, value interface{}) bool { + group := value.(binding.Group) + group.Reset() + if err := group.Add(); err != nil { + klog.ErrorS(err, "Error when replaying cached group", "group", id) + } + return true + }) +} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index ccd527accaa..b4e8b0939fc 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -180,7 +180,10 @@ var ( // Tables of pipelineMulticast are declared below. Do don't declare any tables of other pipelines here! // Tables in stageRouting: - MulticastTable = newTable("Multicast", stageRouting, pipelineMulticast) + MulticastRoutingTable = newTable("MulticastRouting", stageRouting, pipelineMulticast) + + // Tables in stageOutput + MulticastOutputTable = newTable("MulticastOutput", stageOutput, pipelineMulticast) // Flow priority level priorityHigh = uint16(210) @@ -2637,8 +2640,8 @@ func pipelineClassifyFlow(cookieID uint64, protocol binding.Protocol, pipeline b Done() } -// igmpPktInFlows generates the flow to load CustomReasonIGMPRegMark to mark the IGMP packet in MulticastTable and sends -// it to antrea-agent on MulticastTable. +// igmpPktInFlows generates the flow to load CustomReasonIGMPRegMark to mark the IGMP packet in MulticastRoutingTable and sends +// it to antrea-agent. func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { flows := []binding.Flow{ // Set a custom reason for the IGMP packets, and then send it to antrea-agent and forward it normally in the @@ -2646,7 +2649,7 @@ func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { // group and its members in the meanwhile. // Do not set dst IP address because IGMPv1 report message uses target multicast group as IP destination in // the packet. - MulticastTable.ofTable.BuildFlow(priorityHigh). + MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchProtocol(binding.ProtocolIGMP). MatchRegMark(FromLocalRegMark). @@ -2658,33 +2661,34 @@ func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { return flows } -// localMulticastForwardFlow generates the flow to forward multicast packets with OVS action "normal", and outputs +// localMulticastForwardFlows generates the flow to forward multicast packets with OVS action "normal", and outputs // it to Antrea gateway in the meanwhile, so that the packet can be forwarded to local Pods which have joined the Multicast // group and to the external receivers. For external multicast packets accessing to the given multicast IP also hits the // flow, and the packet is not sent back to Antrea gateway because OVS datapath will drop it when it finds the output // port is the same as the input port. -func (f *featureMulticast) localMulticastForwardFlow(multicastIP net.IP) []binding.Flow { +func (f *featureMulticast) localMulticastForwardFlows(multicastIP net.IP, groupID binding.GroupIDType) []binding.Flow { return []binding.Flow{ - MulticastTable.ofTable.BuildFlow(priorityNormal). + MulticastRoutingTable.ofTable.BuildFlow(priorityNormal). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchProtocol(binding.ProtocolIP). MatchDstIP(multicastIP). - Action().Output(config.HostGatewayOFPort). - Action().Normal(). + Action().Group(groupID). Done(), } } // externalMulticastReceiverFlow generates the flow to output multicast packets to Antrea gateway, so that local Pods can // send multicast packets to access the external receivers. For the case that one or more local Pods have joined the target -// multicast group, it is handled by the flows created by function "localMulticastForwardFlow" after local Pods report the +// multicast group, it is handled by the flows created by function "localMulticastForwardFlows" after local Pods report the // IGMP membership. func (f *featureMulticast) externalMulticastReceiverFlow() binding.Flow { - return MulticastTable.ofTable.BuildFlow(priorityLow). + return MulticastRoutingTable.ofTable.BuildFlow(priorityLow). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchProtocol(binding.ProtocolIP). MatchDstIPNet(*mcastCIDR). - Action().Output(config.HostGatewayOFPort). + Action().LoadRegMark(OFPortFoundRegMark). + Action().LoadToRegField(TargetOFPortField, config.HostGatewayOFPort). + Action().NextTable(). Done() } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 740cad169db..c306bc7590f 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -281,18 +281,32 @@ func (mr *MockClientMockRecorder) InstallEndpointFlows(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallEndpointFlows", reflect.TypeOf((*MockClient)(nil).InstallEndpointFlows), arg0, arg1) } -// InstallMulticastFlow mocks base method -func (m *MockClient) InstallMulticastFlow(arg0 net.IP) error { +// InstallMulticastFlows mocks base method +func (m *MockClient) InstallMulticastFlows(arg0 net.IP, arg1 openflow.GroupIDType) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallMulticastFlow", arg0) + ret := m.ctrl.Call(m, "InstallMulticastFlows", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// InstallMulticastFlow indicates an expected call of InstallMulticastFlow -func (mr *MockClientMockRecorder) InstallMulticastFlow(arg0 interface{}) *gomock.Call { +// InstallMulticastFlows indicates an expected call of InstallMulticastFlows +func (mr *MockClientMockRecorder) InstallMulticastFlows(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastFlow", reflect.TypeOf((*MockClient)(nil).InstallMulticastFlow), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastFlows), arg0, arg1) +} + +// InstallMulticastGroup mocks base method +func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticastGroup indicates an expected call of InstallMulticastGroup +func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1) } // InstallMulticastInitialFlows mocks base method @@ -611,18 +625,32 @@ func (mr *MockClientMockRecorder) UninstallEndpointFlows(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallEndpointFlows", reflect.TypeOf((*MockClient)(nil).UninstallEndpointFlows), arg0, arg1) } -// UninstallMulticastFlow mocks base method -func (m *MockClient) UninstallMulticastFlow(arg0 net.IP) error { +// UninstallGroup mocks base method +func (m *MockClient) UninstallGroup(arg0 openflow.GroupIDType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallGroup", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallGroup indicates an expected call of UninstallGroup +func (mr *MockClientMockRecorder) UninstallGroup(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallGroup", reflect.TypeOf((*MockClient)(nil).UninstallGroup), arg0) +} + +// UninstallMulticastFlows mocks base method +func (m *MockClient) UninstallMulticastFlows(arg0 net.IP) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UninstallMulticastFlow", arg0) + ret := m.ctrl.Call(m, "UninstallMulticastFlows", arg0) ret0, _ := ret[0].(error) return ret0 } -// UninstallMulticastFlow indicates an expected call of UninstallMulticastFlow -func (mr *MockClientMockRecorder) UninstallMulticastFlow(arg0 interface{}) *gomock.Call { +// UninstallMulticastFlows indicates an expected call of UninstallMulticastFlows +func (mr *MockClientMockRecorder) UninstallMulticastFlows(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallMulticastFlow", reflect.TypeOf((*MockClient)(nil).UninstallMulticastFlow), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallMulticastFlows", reflect.TypeOf((*MockClient)(nil).UninstallMulticastFlows), arg0) } // UninstallNodeFlows mocks base method @@ -710,20 +738,6 @@ func (mr *MockClientMockRecorder) UninstallServiceFlows(arg0, arg1, arg2 interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallServiceFlows", reflect.TypeOf((*MockClient)(nil).UninstallServiceFlows), arg0, arg1, arg2) } -// UninstallServiceGroup mocks base method -func (m *MockClient) UninstallServiceGroup(arg0 openflow.GroupIDType) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UninstallServiceGroup", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// UninstallServiceGroup indicates an expected call of UninstallServiceGroup -func (mr *MockClientMockRecorder) UninstallServiceGroup(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallServiceGroup", reflect.TypeOf((*MockClient)(nil).UninstallServiceGroup), arg0) -} - // UninstallTraceflowFlows mocks base method func (m *MockClient) UninstallTraceflowFlows(arg0 byte) error { m.ctrl.T.Helper() diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index d3e6a4d31ea..96f10c3c4d8 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -173,7 +173,7 @@ func (p *proxier) removeStaleServices() { // Remove Service group whose Endpoints are local. if svcInfo.NodeLocalExternal() { if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist { - if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil { + if err := p.ofClient.UninstallGroup(groupIDLocal); err != nil { klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName) continue } @@ -182,7 +182,7 @@ func (p *proxier) removeStaleServices() { } // Remove Service group which has all Endpoints. if groupID, exist := p.groupCounter.Get(svcPortName, false); exist { - if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { + if err := p.ofClient.UninstallGroup(groupID); err != nil { klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName) continue } @@ -555,7 +555,7 @@ func (p *proxier) installServices() { continue } if groupID, exist := p.groupCounter.Get(svcPortName, !nodeLocalVal); exist { - if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { + if err := p.ofClient.UninstallGroup(groupID); err != nil { klog.ErrorS(err, "Failed to uninstall Group of all Endpoints for Service", "Service", svcPortName) continue } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index fc809925fad..6c6ec07c2ae 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -347,7 +347,7 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), nodeLocalVal, corev1.ServiceTypeLoadBalancer).Times(1) } groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal) - mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) + mockOFClient.EXPECT().UninstallGroup(groupID).Times(1) } mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if proxyLoadBalancerIPs { @@ -466,7 +466,7 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalVal, corev1.ServiceTypeNodePort).Times(1) groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal) - mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) + mockOFClient.EXPECT().UninstallGroup(groupID).Times(1) } mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(gomock.Any(), uint16(svcNodePort), bindingProtocol).Times(1) @@ -748,7 +748,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(1) fp.syncProxyRules() fp.serviceChanges.OnServiceUpdate(service, nil) @@ -1231,8 +1231,8 @@ func TestServicesWithSameEndpoints(t *testing.T) { mockOFClient.EXPECT().InstallServiceFlows(groupID2, svcIP2, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP1, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP2, uint16(svcPort), bindingProtocol).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(groupID1).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(groupID2).Times(1) + mockOFClient.EXPECT().UninstallGroup(groupID1).Times(1) + mockOFClient.EXPECT().UninstallGroup(groupID2).Times(1) // Since these two Services reference to the same Endpoint, there should only be one operation. mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) diff --git a/pkg/agent/types/event.go b/pkg/agent/types/event.go new file mode 100644 index 00000000000..eea70f66e42 --- /dev/null +++ b/pkg/agent/types/event.go @@ -0,0 +1,22 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +type PodUpdate struct { + PodNamespace string + PodName string + IsAdd bool + ContainerID string +} diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index a086f9a4044..74e362e69b0 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -94,6 +94,7 @@ type Bridge interface { CreateTable(table Table, next uint8, missAction MissActionType) Table // AddTable adds table on the Bridge. Return true if the operation succeeds, otherwise return false. DeleteTable(id uint8) bool + CreateGroupTypeAll(id GroupIDType) Group CreateGroup(id GroupIDType) Group DeleteGroup(id GroupIDType) bool CreateMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index c1b5e11dffb..e5b2b886cfc 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -189,8 +189,16 @@ type OFBridge struct { multipartReplyChs map[uint32]chan *openflow13.MultipartReply } +func (b *OFBridge) CreateGroupTypeAll(id GroupIDType) Group { + return b.createGroupWithType(id, ofctrl.GroupAll) +} + func (b *OFBridge) CreateGroup(id GroupIDType) Group { - ofctrlGroup, err := b.ofSwitch.NewGroup(uint32(id), ofctrl.GroupSelect) + return b.createGroupWithType(id, ofctrl.GroupSelect) +} + +func (b *OFBridge) createGroupWithType(id GroupIDType, groupType ofctrl.GroupType) Group { + ofctrlGroup, err := b.ofSwitch.NewGroup(uint32(id), groupType) if err != nil { // group already exists ofctrlGroup = b.ofSwitch.GetGroup(uint32(id)) } diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index 283a606c538..45a1e3f474c 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -134,6 +134,20 @@ func (mr *MockBridgeMockRecorder) CreateGroup(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGroup", reflect.TypeOf((*MockBridge)(nil).CreateGroup), arg0) } +// CreateGroupTypeAll mocks base method +func (m *MockBridge) CreateGroupTypeAll(arg0 openflow.GroupIDType) openflow.Group { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateGroupTypeAll", arg0) + ret0, _ := ret[0].(openflow.Group) + return ret0 +} + +// CreateGroupTypeAll indicates an expected call of CreateGroupTypeAll +func (mr *MockBridgeMockRecorder) CreateGroupTypeAll(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGroupTypeAll", reflect.TypeOf((*MockBridge)(nil).CreateGroupTypeAll), arg0) +} + // CreateMeter mocks base method func (m *MockBridge) CreateMeter(arg0 openflow.MeterIDType, arg1 ofctrl.MeterFlag) openflow.Meter { m.ctrl.T.Helper() diff --git a/pkg/util/channel/channel.go b/pkg/util/channel/channel.go index b10e9b4d4f3..019eee536d1 100644 --- a/pkg/util/channel/channel.go +++ b/pkg/util/channel/channel.go @@ -25,7 +25,7 @@ const ( notifyTimeout = time.Second ) -type eventHandler func(string) +type eventHandler func(interface{}) type Subscriber interface { // Subscribe registers an eventHandler which will be called when an event is sent to the channel. @@ -37,7 +37,7 @@ type Subscriber interface { type Notifier interface { // Notify sends an event to the channel. - Notify(string) bool + Notify(interface{}) bool } // SubscribableChannel is different from the Go channel which dispatches every event to only single consumer regardless @@ -47,7 +47,7 @@ type SubscribableChannel struct { // The name of the channel, used for logging purpose to differentiate multiple channels. name string // eventCh is the channel used for buffering the pending events. - eventCh chan string + eventCh chan interface{} // handlers is a slice of callbacks registered by consumers. handlers []eventHandler } @@ -55,7 +55,7 @@ type SubscribableChannel struct { func NewSubscribableChannel(name string, bufferSize int) *SubscribableChannel { n := &SubscribableChannel{ name: name, - eventCh: make(chan string, bufferSize), + eventCh: make(chan interface{}, bufferSize), } return n } @@ -64,7 +64,7 @@ func (n *SubscribableChannel) Subscribe(h eventHandler) { n.handlers = append(n.handlers, h) } -func (n *SubscribableChannel) Notify(e string) bool { +func (n *SubscribableChannel) Notify(e interface{}) bool { timer := time.NewTimer(notifyTimeout) defer timer.Stop() select { diff --git a/pkg/util/channel/channel_test.go b/pkg/util/channel/channel_test.go index 2eee503f71f..e848bd0194a 100644 --- a/pkg/util/channel/channel_test.go +++ b/pkg/util/channel/channel_test.go @@ -36,10 +36,10 @@ func newEventReceiver() *eventReceiver { } } -func (r *eventReceiver) receive(e string) { +func (r *eventReceiver) receive(e interface{}) { r.mutex.Lock() defer r.mutex.Unlock() - r.receivedEvents.Insert(e) + r.receivedEvents.Insert(e.(string)) } func (r *eventReceiver) received() sets.String { diff --git a/test/e2e/multicast_test.go b/test/e2e/multicast_test.go index 31a49751bbe..0d7755550ff 100644 --- a/test/e2e/multicast_test.go +++ b/test/e2e/multicast_test.go @@ -83,6 +83,7 @@ func TestMulticast(t *testing.T) { for _, mc := range testcases { mc := mc t.Run(mc.name, func(t *testing.T) { + t.Parallel() runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) }) } @@ -122,6 +123,7 @@ func TestMulticast(t *testing.T) { for _, mc := range testcases { mc := mc t.Run(mc.name, func(t *testing.T) { + t.Parallel() runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) }) } @@ -189,7 +191,7 @@ func runTestMulticastForwardToMultipleInterfaces(t *testing.T, data *TestData, s } } -func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces [][]string) { +func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string) { mcjoinWaitTimeout := defaultTimeout / time.Second gatewayInterface, err := data.GetGatewayInterfaceName(antreaNamespace) failOnError(err, t) @@ -209,10 +211,10 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc defer wg.Done() // The following command joins a multicast group and sets the timeout to 100 seconds(-W 100) before exit. // The command will return after receiving 1 packet(-c 1). - receiveMulticastCommand := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -c 1 -o -p %d -W %d %s", mc.port, mcjoinWaitTimeout, mc.group.String())} + receiveMulticastCommand := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -c 10 -o -p %d -W %d %s", mc.port, mcjoinWaitTimeout, mc.group.String())} res, _, err := data.RunCommandFromPod(testNamespace, r, mcjoinContainerName, receiveMulticastCommand) failOnError(err, t) - assert.Contains(t, res, "Total: 1 packets") + assert.Contains(t, res, "Total: 10 packets") }() } // Wait 2 seconds(-w 2) before sending multicast traffic. @@ -222,23 +224,32 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc data.RunCommandFromPod(testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) }() - if err := wait.Poll(5*time.Second, defaultTimeout, func() (bool, error) { - // Sender pods should add an outbound multicast route except running as HostNetwork. - _, mrouteResult, _, err := data.RunCommandOnNode(nodeName(mc.senderConfig.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s | grep '%s'", mc.group.String(), gatewayInterface, strings.Join(nodeMulticastInterfaces[mc.senderConfig.nodeIdx], " "))) - if err != nil { - return false, err - } - if !mc.senderConfig.isHostNetwork { - if len(mrouteResult) == 0 { - return false, nil + readyReceivers := sets.NewInt() + senderReady := false + if err := wait.Poll(3*time.Second, defaultTimeout, func() (bool, error) { + if !senderReady { + // Sender pods should add an outbound multicast route except running as HostNetwork. + _, mrouteResult, _, err := data.RunCommandOnNode(nodeName(mc.senderConfig.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s | grep '%s'", mc.group.String(), gatewayInterface, strings.Join(nodeMulticastInterfaces[mc.senderConfig.nodeIdx], " "))) + if err != nil { + return false, err } - } else { - if len(mrouteResult) != 0 { - return false, nil + if !mc.senderConfig.isHostNetwork { + if len(mrouteResult) == 0 { + return false, nil + } + } else { + if len(mrouteResult) != 0 { + return false, nil + } } + senderReady = true } + // Check inbound multicast route and whether multicast interfaces has joined the multicast group. for _, receiver := range mc.receiverConfigs { + if readyReceivers.Has(receiver.nodeIdx) { + continue + } for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiver.nodeIdx] { _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) if err != nil { @@ -272,6 +283,7 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc } } } + readyReceivers = readyReceivers.Insert(receiver.nodeIdx) } return true, nil }); err != nil { @@ -282,7 +294,7 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc // computeMulticastInterfaces computes multicastInterfaces for each node. // It returns [][]string with its index as node index and value as multicastInterfaces for this node. -func computeMulticastInterfaces(t *testing.T, data *TestData) ([][]string, error) { +func computeMulticastInterfaces(t *testing.T, data *TestData) (map[int][]string, error) { multicastInterfaces, err := data.GetMulticastInterfaces(antreaNamespace) if err != nil { return nil, err @@ -291,7 +303,7 @@ func computeMulticastInterfaces(t *testing.T, data *TestData) ([][]string, error if err != nil { t.Fatalf("Error getting transport interfaces: %v", err) } - nodeMulticastInterfaces := make([][]string, 0, len(clusterInfo.nodes)) + nodeMulticastInterfaces := make(map[int][]string) for nodeIdx := range clusterInfo.nodes { _, localInterfacesStr, _, err := data.RunCommandOnNode(nodeName(nodeIdx), "ls /sys/class/net") if err != nil { @@ -303,7 +315,7 @@ func computeMulticastInterfaces(t *testing.T, data *TestData) ([][]string, error externalMulticastInterfaces := localInterfacesSet.Intersection(multicastInterfaceSet) currNodeMulticastInterfaces := externalMulticastInterfaces.Insert(transportInterface).List() t.Logf("Multicast interfaces for node index %d is %+v", nodeIdx, currNodeMulticastInterfaces) - nodeMulticastInterfaces = append(nodeMulticastInterfaces, currNodeMulticastInterfaces) + nodeMulticastInterfaces[nodeIdx] = currNodeMulticastInterfaces } return nodeMulticastInterfaces, nil } diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 7bc59f477e4..588e0dceae9 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -703,7 +703,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint groupID := ofconfig.GroupIDType(gid) err := c.UninstallServiceFlows(svc.ip, svc.port, svc.protocol) assert.Nil(t, err) - err = c.UninstallServiceGroup(groupID) + err = c.UninstallGroup(groupID) assert.Nil(t, err) for _, ep := range endpointList { err := c.UninstallEndpointFlows(svc.protocol, ep) diff --git a/test/integration/ovs/ofctrl_test.go b/test/integration/ovs/ofctrl_test.go index b646cceeb39..73769a10e58 100644 --- a/test/integration/ovs/ofctrl_test.go +++ b/test/integration/ovs/ofctrl_test.go @@ -269,6 +269,8 @@ func TestOFctrlGroup(t *testing.T) { ovsCtlClient := ovsctl.NewClient(brName) + id := 1 + for name, buckets := range map[string][]struct { weight uint16 // Must have non-zero value. reg2reg [][4]uint32 // regNum, data, startIndex, endIndex @@ -278,12 +280,25 @@ func TestOFctrlGroup(t *testing.T) { {weight: 100, reg2reg: [][4]uint32{{0, 1, 0, 31}, {1, 2, 15, 31}}, resubmitTable: 31}, {weight: 110, resubmitTable: 42}, }, + "TypeAll": { + {weight: 100, reg2reg: [][4]uint32{{0, 1, 0, 31}, {1, 2, 15, 31}}, resubmitTable: 31}, + {weight: 110, resubmitTable: 42}, + }, } { t.Run(name, func(t *testing.T) { - group := br.CreateGroup(1) + var group binding.Group + gid := binding.GroupIDType(id) + if name == "TypeAll" { + group = br.CreateGroupTypeAll(gid) + } else { + group = br.CreateGroup(gid) + } for _, bucket := range buckets { require.NotZero(t, bucket.weight, "Weight value of a bucket must be specified") - bucketBuilder := group.Bucket().Weight(bucket.weight) + bucketBuilder := group.Bucket() + if name == "Normal" { + bucketBuilder = bucketBuilder.Weight(bucket.weight) + } if bucket.resubmitTable != 0 { bucketBuilder = bucketBuilder.ResubmitToTable(bucket.resubmitTable) } @@ -303,8 +318,10 @@ func TestOFctrlGroup(t *testing.T) { }), "Failed to install group") dumpedGroup := groups[0] for i, bucket := range buckets { - // Must have weight - assert.True(t, strings.Contains(dumpedGroup[i+1], fmt.Sprintf("weight:%d", bucket.weight))) + if name == "Normal" { + // Must have weight + assert.True(t, strings.Contains(dumpedGroup[i+1], fmt.Sprintf("weight:%d", bucket.weight))) + } for _, loading := range bucket.reg2reg { rngStr := "[]" if !(loading[2] == 0 && loading[3] == 31) { @@ -326,6 +343,7 @@ func TestOFctrlGroup(t *testing.T) { return len(groups) == 0, nil }), "Failed to delete group") }) + id++ } }