diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 0d3a8f8e797..7076e3c33e8 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -577,11 +577,13 @@ func run(o *Options) error { } mcastController := multicast.NewMulticastController( ofClient, + v4GroupIDAllocator, nodeConfig, ifaceStore, multicastSocket, sets.NewString(append(o.config.MulticastInterfaces, nodeConfig.NodeTransportInterfaceName)...), - ovsBridgeClient) + ovsBridgeClient, + podUpdateChannel) if err := mcastController.Initialize(); err != nil { return err } diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 0c2027df658..026cb7af413 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -32,6 +32,7 @@ import ( "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" + types2 "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" @@ -312,6 +313,7 @@ func (pc *podConfigurator) removeInterfaces(containerID string) error { if err := pc.routeClient.DeleteLocalAntreaFlexibleIPAMPodRule(containerConfig.IPs); err != nil { return err } + return nil } @@ -495,7 +497,13 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta // Add containerConfig into local cache pc.ifaceStore.AddInterface(containerConfig) // Notify the Pod update event to required components. - pc.podUpdateNotifier.Notify(k8s.NamespacedName(containerConfig.PodNamespace, containerConfig.PodName)) + event := types2.PodUpdate{ + PodName: containerConfig.PodName, + PodNamespace: containerConfig.PodNamespace, + IsAdd: true, + ContainerID: containerConfig.ContainerID, + } + pc.podUpdateNotifier.Notify(event) return nil } @@ -518,6 +526,13 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface } // Remove container configuration from cache. pc.ifaceStore.DeleteInterface(containerConfig) + event := types2.PodUpdate{ + PodName: containerConfig.PodName, + PodNamespace: containerConfig.PodNamespace, + IsAdd: false, + ContainerID: containerConfig.ContainerID, + } + pc.podUpdateNotifier.Notify(event) klog.Infof("Removed interfaces for container %s", containerID) return nil } diff --git a/pkg/agent/cniserver/pod_configuration_windows.go b/pkg/agent/cniserver/pod_configuration_windows.go index a2dc2834081..16f824f847e 100644 --- a/pkg/agent/cniserver/pod_configuration_windows.go +++ b/pkg/agent/cniserver/pod_configuration_windows.go @@ -24,6 +24,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/util/k8s" ) @@ -49,7 +50,13 @@ func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.I // Update interface config with the ofPort. ifConfig.OVSPortConfig.OFPort = ofPort // Notify the Pod update event to required components. - pc.podUpdateNotifier.Notify(k8s.NamespacedName(ifConfig.PodNamespace, ifConfig.PodName)) + event := types.PodUpdate{ + PodName: ifConfig.PodName, + PodNamespace: ifConfig.PodNamespace, + IsAdd: true, + ContainerID: ifConfig.ContainerID, + } + pc.podUpdateNotifier.Notify(event) return nil }) } diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index 167d0756a7f..a683a8d84e0 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -41,6 +41,7 @@ import ( "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" + "antrea.io/antrea/pkg/agent/types" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned" @@ -217,9 +218,11 @@ func NewEgressController( // processPodUpdate will be called when CNIServer publishes a Pod update event. // It triggers reconciling the effective Egress of the Pod. -func (c *EgressController) processPodUpdate(pod string) { +func (c *EgressController) processPodUpdate(e interface{}) { c.egressBindingsMutex.Lock() defer c.egressBindingsMutex.Unlock() + podEvent := e.(types.PodUpdate) + pod := k8s.NamespacedName(podEvent.PodNamespace, podEvent.PodName) binding, exists := c.egressBindings[pod] if !exists { return diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index 9c448dd1cf6..0d241bf7e8e 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -38,6 +38,7 @@ import ( ipassignertest "antrea.io/antrea/pkg/agent/ipassigner/testing" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" + "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" @@ -582,7 +583,11 @@ func TestPodUpdateShouldSyncEgress(t *testing.T) { c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1) // Mock CNIServer addPodInterface(c.ifaceStore, "ns1", "pendingPod", 10) - c.podUpdateChannel.Notify("ns1/pendingPod") + ev := types.PodUpdate{ + PodName: "pendingPod", + PodNamespace: "ns1", + } + c.podUpdateChannel.Notify(ev) require.NoError(t, wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) { return c.queue.Len() == 1, nil })) diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index 32011eaf8cd..3db75ef08ea 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -28,6 +28,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/metrics" + types2 "antrea.io/antrea/pkg/agent/types" v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" "antrea.io/antrea/pkg/querier" @@ -361,12 +362,12 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub // done if antrea-controller has computed the Pods' policies and propagated // them to this Node by their labels and NodeName, instead of waiting for their // IPs are reported to kube-apiserver and processed by antrea-controller. -func (c *ruleCache) processPodUpdate(pod string) { - namespace, name := k8s.SplitNamespacedName(pod) +func (c *ruleCache) processPodUpdate(e interface{}) { + podEvent := e.(types2.PodUpdate) member := &v1beta.GroupMember{ Pod: &v1beta.PodReference{ - Name: name, - Namespace: namespace, + Name: podEvent.PodName, + Namespace: podEvent.PodNamespace, }, } c.appliedToSetLock.RLock() diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 35878d26cfd..00c01d3ed25 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -25,8 +25,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/k8s" ) var ( @@ -1184,7 +1186,12 @@ func TestRuleCacheProcessPodUpdates(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go podUpdateNotifier.Run(stopCh) - podUpdateNotifier.Notify(tt.podUpdate) + ns, name := k8s.SplitNamespacedName(tt.podUpdate) + e := types.PodUpdate{ + PodNamespace: ns, + PodName: name, + } + podUpdateNotifier.Notify(e) func() { // Drain the channel with 10 ms timeout so we can know it's done. for { diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index c11a923371f..83ef3ae2daf 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -28,7 +28,11 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/agent/util" + binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" ) type eventType uint8 @@ -59,10 +63,12 @@ type mcastGroupEvent struct { type GroupMemberStatus struct { group net.IP - // localMembers is a set for the local Pod's interface name which has joined in the multicast group. - localMembers sets.String + // localMembers is a map for the local Pod member and its last update time, key is the Pod's interface name, + // and value is its last update time. + localMembers map[string]time.Time lastIGMPReport time.Time mutex sync.RWMutex + ofGroupID binding.GroupIDType } // eventHandler process the multicast Group membership report or leave messages. @@ -82,7 +88,8 @@ func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) { status := &GroupMemberStatus{ group: e.group, lastIGMPReport: e.time, - localMembers: sets.NewString(e.iface.InterfaceName), + localMembers: map[string]time.Time{e.iface.InterfaceName: e.time}, + ofGroupID: c.v4GroupAllocator.Allocate(), } c.groupCache.Add(status) c.queue.Add(e.group.String()) @@ -101,25 +108,35 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent defer status.mutex.Unlock() newStatus := &GroupMemberStatus{ group: status.group, - localMembers: status.localMembers.Union(nil), + localMembers: make(map[string]time.Time), lastIGMPReport: status.lastIGMPReport, + ofGroupID: status.ofGroupID, } - exist := status.localMembers.Has(e.iface.InterfaceName) + for m, t := range status.localMembers { + newStatus.localMembers[m] = t + } + _, exist := status.localMembers[e.iface.InterfaceName] switch e.eType { case groupJoin: newStatus.lastIGMPReport = e.time + newStatus.localMembers[e.iface.InterfaceName] = e.time + c.groupCache.Update(newStatus) if !exist { - newStatus.localMembers.Insert(e.iface.InterfaceName) klog.InfoS("Added member to multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + c.queue.Add(newStatus.group.String()) } - c.groupCache.Update(newStatus) - c.queue.Add(newStatus.group.String()) case groupLeave: if exist { - newStatus.localMembers.Delete(e.iface.InterfaceName) + delete(newStatus.localMembers, e.iface.InterfaceName) c.groupCache.Update(newStatus) klog.InfoS("Deleted member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - if len(newStatus.localMembers) == 0 { + _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) + // Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are + // other local members in the multicast group. + if !found || len(newStatus.localMembers) > 0 { + c.queue.Add(newStatus.group.String()) + } else { + // Check if all local members have left the multicast group. klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) c.checkLastMember(e.group) } @@ -139,7 +156,7 @@ func (c *Controller) checkLastMember(group net.IP) { c.queue.AddAfter(group.String(), igmpMaxResponseTime) } -// clearStaleGroups checks the stale groups which have not been updated for mcastGroupTimeout, and then notifies worker +// clearStaleGroups checks the stale group members which have not been updated for mcastGroupTimeout, and then notifies worker // to remove them from groupCache. func (c *Controller) clearStaleGroups() { now := time.Now() @@ -147,20 +164,64 @@ func (c *Controller) clearStaleGroups() { status := obj.(*GroupMemberStatus) status.mutex.RLock() diff := now.Sub(status.lastIGMPReport) - status.mutex.RUnlock() if diff > mcastGroupTimeout { + // Notify worker to remove the group from groupCache if all its members are not updated before mcastGroupTimeout. c.queue.Add(status.group.String()) + } else { + // Create a "leave" event for a local member if it is not updated before mcastGroupTimeout. + for member, lastUpdate := range status.localMembers { + if now.Sub(lastUpdate) > mcastGroupTimeout { + ifConfig := &interfacestore.InterfaceConfig{ + InterfaceName: member, + } + event := &mcastGroupEvent{ + group: status.group, + eType: groupLeave, + time: now, + iface: ifConfig, + } + c.groupEventCh <- event + } + } } + status.mutex.RUnlock() + } +} + +// removeLocalInterface searches the GroupMemberStatus which the deleted interface has joined, and then triggers a member +// leave event so that Antrea can remove the corresponding interface from local multicast receivers on OVS. This function +// should be called if the removed Pod receiver fails to send IGMP leave message before deletion. +func (c *Controller) removeLocalInterface(e interface{}) { + podEvent := e.(types.PodUpdate) + // Ignore Pod creation event. + if podEvent.IsAdd { + return + } + interfaceName := util.GenerateContainerInterfaceName(podEvent.PodName, podEvent.PodNamespace, podEvent.ContainerID) + ifConfig := &interfacestore.InterfaceConfig{ + InterfaceName: interfaceName, + } + groupStatuses := c.getGroupMemberStatusesByPod(interfaceName) + for _, g := range groupStatuses { + event := &mcastGroupEvent{ + group: g.group, + eType: groupLeave, + time: time.Now(), + iface: ifConfig, + } + c.groupEventCh <- event } } type Controller struct { - ofClient openflow.Client - nodeConfig *config.NodeConfig - igmpSnooper *IGMPSnooper - groupEventCh chan *mcastGroupEvent - groupCache cache.Indexer - queue workqueue.RateLimitingInterface + ofClient openflow.Client + v4GroupAllocator openflow.GroupAllocator + ifaceStore interfacestore.InterfaceStore + nodeConfig *config.NodeConfig + igmpSnooper *IGMPSnooper + groupEventCh chan *mcastGroupEvent + groupCache cache.Indexer + queue workqueue.RateLimitingInterface // installedGroups saves the groups which are configured on both OVS and the host. installedGroups sets.String installedGroupsMutex sync.RWMutex @@ -168,33 +229,39 @@ type Controller struct { ovsBridgeClient ovsconfig.OVSBridgeClient } -func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, multicastSocket RouteInterface, multicastInterfaces sets.String, ovsBridgeClient ovsconfig.OVSBridgeClient) *Controller { +func NewMulticastController(ofClient openflow.Client, + v4GroupAllocator openflow.GroupAllocator, + nodeConfig *config.NodeConfig, + ifaceStore interfacestore.InterfaceStore, + multicastSocket RouteInterface, + multicastInterfaces sets.String, + ovsBridgeClient ovsconfig.OVSBridgeClient, + podUpdateSubscriber channel.Subscriber) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) groupSnooper := newSnooper(ofClient, ifaceStore, eventCh) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces) - return &Controller{ - ofClient: ofClient, - nodeConfig: nodeConfig, - igmpSnooper: groupSnooper, - groupEventCh: eventCh, - groupCache: groupCache, - installedGroups: sets.NewString(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), - mRouteClient: multicastRouteClient, - ovsBridgeClient: ovsBridgeClient, + c := &Controller{ + ofClient: ofClient, + ifaceStore: ifaceStore, + v4GroupAllocator: v4GroupAllocator, + nodeConfig: nodeConfig, + igmpSnooper: groupSnooper, + groupEventCh: eventCh, + groupCache: groupCache, + installedGroups: sets.NewString(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), + mRouteClient: multicastRouteClient, + ovsBridgeClient: ovsBridgeClient, } + podUpdateSubscriber.Subscribe(c.removeLocalInterface) + return c } func (c *Controller) Initialize() error { - err := c.configureOVSMulticast() - if err != nil { - klog.ErrorS(err, "Failed to configure multicast for the OVS bridge") - return err - } - err = c.mRouteClient.Initialize() + err := c.mRouteClient.Initialize() if err != nil { return err } @@ -221,7 +288,6 @@ func (c *Controller) Run(stopCh <-chan struct{}) { // Periodically check the group member status, and remove the groups in which no members exist go wait.NonSlidingUntil(c.clearStaleGroups, queryInterval, stopCh) - go c.eventHandler(stopCh) for i := 0; i < int(workerCount); i++ { @@ -298,15 +364,30 @@ func (c *Controller) syncGroup(groupKey string) error { return nil } status := obj.(*GroupMemberStatus) + memberPorts := make([]uint32, 0, len(status.localMembers)) + status.mutex.Lock() + defer status.mutex.Unlock() + for memberInterfaceName := range status.localMembers { + obj, found := c.ifaceStore.GetInterfaceByName(memberInterfaceName) + if !found { + klog.InfoS("Failed to find interface from cache", "interface", memberInterfaceName) + continue + } + memberPorts = append(memberPorts, uint32(obj.OFPort)) + } if c.groupHasInstalled(groupKey) { - status.mutex.Lock() - defer status.mutex.Unlock() if c.groupIsStale(status) { // Remove the multicast flow entry if no local Pod is in the group. - if err := c.ofClient.UninstallMulticastFlow(status.group); err != nil { + if err := c.ofClient.UninstallMulticastFlows(status.group); err != nil { klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) return err } + // Remove the multicast flow entry if no local Pod is in the group. + if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil { + klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey) + return err + } + c.v4GroupAllocator.Release(status.ofGroupID) err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) if err != nil { klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) @@ -320,10 +401,22 @@ func (c *Controller) syncGroup(groupKey string) error { c.delInstalledGroup(groupKey) c.groupCache.Delete(status) klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) + return nil + } + // Reinstall OpenFlow group because the local Pod receivers have changed. + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + return err } + klog.V(2).InfoS("Updated OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) return nil } - if err := c.ofClient.InstallMulticastFlow(status.group); err != nil { + // Install OpenFlow group for a new multicast group which has local Pod receivers joined. + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + return err + } + klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + // Install OpenFlow flow to forward packets to local Pod receivers which are included in the group. + if err := c.ofClient.InstallMulticastFlows(status.group, status.ofGroupID); err != nil { klog.ErrorS(err, "Failed to install multicast flows", "group", status.group) return err } @@ -377,6 +470,7 @@ func (c *Controller) addOrUpdateGroupEvent(e *mcastGroupEvent) { c.updateGroupMemberStatus(obj, e) } } + return } func podInterfaceIndexFunc(obj interface{}) ([]string, error) { @@ -388,21 +482,6 @@ func podInterfaceIndexFunc(obj interface{}) ([]string, error) { return podInterfaces, nil } -func (c *Controller) configureOVSMulticast() error { - // Configure bridge to enable multicast snooping. - err := c.ovsBridgeClient.SetBridgeMcastSnooping(true) - if err != nil { - return err - } - // Disable flooding of unregistered multicast packets to all ports. - otherConfig := map[string]interface{}{"mcast-snooping-disable-flood-unregistered": "true"} - err = c.ovsBridgeClient.AddBridgeOtherConfig(otherConfig) - if err != nil { - return err - } - return nil -} - func getGroupEventKey(obj interface{}) (string, error) { groupState := obj.(*GroupMemberStatus) return groupState.group.String(), nil diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index 9918b5ffeb6..c07b0514eb9 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -36,8 +36,10 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" ifaceStoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" multicasttest "antrea.io/antrea/pkg/agent/multicast/testing" + "antrea.io/antrea/pkg/agent/openflow" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + "antrea.io/antrea/pkg/util/channel" ) var ( @@ -45,16 +47,21 @@ var ( mockMulticastSocket *multicasttest.MockRouteInterface mockIfaceStore *ifaceStoretest.MockInterfaceStore ovsClient *ovsconfigtest.MockOVSBridgeClient - mgroup = net.ParseIP("224.96.1.3") if1 = &interfacestore.InterfaceConfig{ Type: interfacestore.ContainerInterface, InterfaceName: "if1", IPs: []net.IP{net.ParseIP("192.168.1.1")}, + OVSPortConfig: &interfacestore.OVSPortConfig{ + OFPort: 1, + }, } if2 = &interfacestore.InterfaceConfig{ Type: interfacestore.ContainerInterface, InterfaceName: "if2", IPs: []net.IP{net.ParseIP("192.168.1.2")}, + OVSPortConfig: &interfacestore.OVSPortConfig{ + OFPort: 2, + }, } nodeIf1IP = net.ParseIP("192.168.20.22") externalInterfaceIP = net.ParseIP("192.168.50.23") @@ -63,6 +70,7 @@ var ( ) func TestAddGroupMemberStatus(t *testing.T) { + mgroup := net.ParseIP("224.96.1.3") event := &mcastGroupEvent{ group: mgroup, eType: groupJoin, @@ -82,7 +90,9 @@ func TestAddGroupMemberStatus(t *testing.T) { key, ok := obj.(string) assert.True(t, ok) assert.Equal(t, mgroup.String(), key) - mockOFClient.EXPECT().InstallMulticastFlow(mgroup).Times(1) + mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastFlows(mgroup, gomock.Any()).Times(1) mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) err = mctrl.syncGroup(key) assert.Nil(t, err) @@ -94,6 +104,7 @@ func TestUpdateGroupMemberStatus(t *testing.T) { err := mctrl.initialize(t) assert.Nil(t, err) igmpMaxResponseTime = time.Second * 1 + mgroup := net.ParseIP("224.96.1.4") event := &mcastGroupEvent{ group: mgroup, eType: groupJoin, @@ -101,7 +112,6 @@ func TestUpdateGroupMemberStatus(t *testing.T) { iface: if1, } mctrl.addGroupMemberStatus(event) - obj, _, _ := mctrl.groupCache.GetByKey(event.group.String()) mockOFClient.EXPECT().SendIGMPQueryPacketOut(igmpQueryDstMac, mcastAllHosts, uint32(openflow13.P_NORMAL), gomock.Any()).Times(len(queryVersions)) for _, e := range []*mcastGroupEvent{ {group: mgroup, eType: groupJoin, time: event.time.Add(time.Second * 20), iface: if1}, @@ -110,6 +120,10 @@ func TestUpdateGroupMemberStatus(t *testing.T) { {group: mgroup, eType: groupLeave, time: event.time.Add(time.Second * 61), iface: if1}, {group: mgroup, eType: groupLeave, time: event.time.Add(time.Second * 62), iface: if2}, } { + obj, _, _ := mctrl.groupCache.GetByKey(event.group.String()) + if e.eType == groupLeave { + mockIfaceStore.EXPECT().GetInterfaceByName(e.iface.InterfaceName).Return(e.iface, true) + } mctrl.updateGroupMemberStatus(obj, e) groupCache := mctrl.groupCache compareGroupStatus(t, groupCache, e) @@ -127,15 +141,22 @@ func TestCheckLastMember(t *testing.T) { workerCount = 1 igmpMaxResponseTime = time.Second * 1 lastProbe := time.Now() + mgroup := net.ParseIP("224.96.1.2") testCheckLastMember := func(ev *mcastGroupEvent, expExist bool) { status := &GroupMemberStatus{ - localMembers: sets.NewString(), + localMembers: map[string]time.Time{}, lastIGMPReport: lastProbe, } if ev != nil { status.group = ev.group + if ev.eType == groupLeave { + mockOFClient.EXPECT().UninstallGroup(gomock.Any()) + mockOFClient.EXPECT().UninstallMulticastFlows(status.group) + } } else { status.group = mgroup + mockOFClient.EXPECT().UninstallGroup(gomock.Any()) + mockOFClient.EXPECT().UninstallMulticastFlows(status.group) } _ = mctrl.groupCache.Add(status) mctrl.addInstalledGroup(status.group.String()) @@ -167,7 +188,8 @@ func TestCheckLastMember(t *testing.T) { } mctrl.queue.Forget(obj) } - mockOFClient.EXPECT().UninstallMulticastFlow(gomock.Any()).Times(2) + mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true).Times(1) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()).Times(1) for _, tc := range []struct { ev *mcastGroupEvent exists bool @@ -194,30 +216,31 @@ func TestClearStaleGroups(t *testing.T) { mctrl.worker() wg.Done() }() - now := time.Now() + validUpdateTime := now.Add(-queryInterval) validGroups := []*GroupMemberStatus{ { group: net.ParseIP("224.96.1.2"), - localMembers: sets.NewString("p1", "p2"), - lastIGMPReport: now.Add(-queryInterval), + localMembers: map[string]time.Time{"p1": now, "p2": validUpdateTime}, + lastIGMPReport: validUpdateTime, }, { group: net.ParseIP("224.96.1.3"), - localMembers: sets.NewString(), - lastIGMPReport: now.Add(-queryInterval), + localMembers: map[string]time.Time{"p2": validUpdateTime}, + lastIGMPReport: validUpdateTime, }, } + staleUpdateTime := now.Add(-mcastGroupTimeout - time.Second) staleGroups := []*GroupMemberStatus{ { group: net.ParseIP("224.96.1.4"), - localMembers: sets.NewString("p1", "p3"), - lastIGMPReport: now.Add(-mcastGroupTimeout - time.Second), + localMembers: map[string]time.Time{"p1": staleUpdateTime, "p3": staleUpdateTime}, + lastIGMPReport: staleUpdateTime, }, { group: net.ParseIP("224.96.1.5"), - localMembers: sets.NewString(), - lastIGMPReport: now.Add(-mcastGroupTimeout - time.Second), + localMembers: map[string]time.Time{}, + lastIGMPReport: staleUpdateTime, }, } for _, g := range validGroups { @@ -225,12 +248,19 @@ func TestClearStaleGroups(t *testing.T) { assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) } + fakePort := int32(1) for _, g := range staleGroups { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + for m := range g.localMembers { + mockIface := &interfacestore.InterfaceConfig{InterfaceName: m, OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: fakePort}} + mockIfaceStore.EXPECT().GetInterfaceByName(m).Return(mockIface, true) + fakePort++ + } } - mockOFClient.EXPECT().UninstallMulticastFlow(gomock.Any()).Times(len(staleGroups)) + mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(len(staleGroups)) + mockOFClient.EXPECT().UninstallMulticastFlows(gomock.Any()).Times(len(staleGroups)) mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(len(staleGroups)) mctrl.clearStaleGroups() mctrl.queue.ShutDown() @@ -251,7 +281,9 @@ func TestProcessPacketIn(t *testing.T) { snooper := mockController.igmpSnooper stopCh := make(chan struct{}) defer close(stopCh) - go mockController.eventHandler(stopCh) + go func() { + mockController.eventHandler(stopCh) + }() getIPs := func(ipStrs []string) []net.IP { ips := make([]net.IP, len(ipStrs)) @@ -285,6 +317,7 @@ func TestProcessPacketIn(t *testing.T) { version: 3, }, } { + mockIfaceStore.EXPECT().GetInterfaceByName(tc.iface.InterfaceName).Return(tc.iface, true).AnyTimes() packets := createIGMPReportPacketIn(getIPs(tc.joinedGroups.List()), getIPs(tc.leftGroups.List()), tc.version, uint32(tc.iface.OFPort)) mockOFClient.EXPECT().SendIGMPQueryPacketOut(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() for _, pkt := range packets { @@ -311,11 +344,11 @@ func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEven assert.Equal(t, event.group, status.group) if event.eType == groupJoin { assert.True(t, status.lastIGMPReport.Equal(event.time) || status.lastIGMPReport.After(event.time)) - exists := status.localMembers.Has(event.iface.InterfaceName) + _, exists := status.localMembers[event.iface.InterfaceName] assert.Truef(t, exists, "member is not added into cache") } else { assert.True(t, status.lastIGMPReport.Before(event.time)) - exists := status.localMembers.Has(event.iface.InterfaceName) + _, exists := status.localMembers[event.iface.InterfaceName] assert.Falsef(t, exists, "member is not removed from cache") } } @@ -329,7 +362,9 @@ func newMockMulticastController(t *testing.T) *Controller { addr := &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addr} mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) - mctrl := NewMulticastController(mockOFClient, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient) + groupAllocator := openflow.NewGroupAllocator(false) + podUpdateSubscriber := channel.NewSubscribableChannel("PodUpdate", 100) + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber) return mctrl } diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go index 5f418513ce3..7d4b7fe91fc 100644 --- a/pkg/agent/multicast/mcast_discovery.go +++ b/pkg/agent/multicast/mcast_discovery.go @@ -131,7 +131,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { fallthrough case protocol.IGMPv2Report: mgroup := igmp.(*protocol.IGMPv1or2).GroupAddress - klog.InfoS("Received IGMPv1or2 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) + klog.V(2).InfoS("Received IGMPv1or2 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) event := &mcastGroupEvent{ group: mgroup, eType: groupJoin, @@ -143,7 +143,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { msg := igmp.(*protocol.IGMPv3MembershipReport) for _, gr := range msg.GroupRecords { mgroup := gr.MulticastAddress - klog.InfoS("Received IGMPv3 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName, "recordType", gr.Type, "sourceCount", gr.NumberOfSources) + klog.V(2).InfoS("Received IGMPv3 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName, "recordType", gr.Type, "sourceCount", gr.NumberOfSources) evtType := groupJoin if (gr.Type == protocol.IGMPIsIn || gr.Type == protocol.IGMPToIn) && gr.NumberOfSources == 0 { evtType = groupLeave @@ -159,7 +159,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { case protocol.IGMPv2LeaveGroup: mgroup := igmp.(*protocol.IGMPv1or2).GroupAddress - klog.InfoS("Received IGMPv2 Leave message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) + klog.V(2).InfoS("Received IGMPv2 Leave message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) event := &mcastGroupEvent{ group: mgroup, eType: groupLeave, diff --git a/pkg/agent/multicast/mcast_route_test.go b/pkg/agent/multicast/mcast_route_test.go index 081171e7c00..b9f4c14d69c 100644 --- a/pkg/agent/multicast/mcast_route_test.go +++ b/pkg/agent/multicast/mcast_route_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -76,12 +77,12 @@ func TestProcessIGMPNocacheMsg(t *testing.T) { mRoute.internalInterfaceVIF = uint16(0) status1 := &GroupMemberStatus{ group: net.ParseIP("224.3.3.8"), - localMembers: sets.NewString("aa"), + localMembers: map[string]time.Time{"aa": time.Now()}, } mRoute.groupCache.Add(status1) status2 := &GroupMemberStatus{ group: net.ParseIP("224.3.3.9"), - localMembers: sets.NewString(), + localMembers: map[string]time.Time{}, } mRoute.groupCache.Add(status2) for _, m := range []struct { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 1ef15b5b503..a8aad96d2b9 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -82,9 +82,9 @@ type Client interface { // InstallServiceGroup installs a group for Service LB. Each endpoint // is a bucket of the group. For now, each bucket has the same weight. InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error - // UninstallServiceGroup removes the group and its buckets that are - // installed by InstallServiceGroup. - UninstallServiceGroup(groupID binding.GroupIDType) error + // UninstallGroup removes the group and its buckets that are + // installed by InstallServiceGroup or InstallMulticastGroup. + UninstallGroup(groupID binding.GroupIDType) error // InstallEndpointFlows installs flows for accessing Endpoints. // If an Endpoint is on the current Node, then flows for hairpin and endpoint @@ -268,17 +268,18 @@ type Client interface { // InstallMulticastInitialFlows installs OpenFlow to packetIn the IGMP messages and output the Multicast traffic to // antrea-gw0 so that local Pods could access external Multicast servers. InstallMulticastInitialFlows(pktInReason uint8) error - // InstallMulticastFlow installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 + // InstallMulticastFlows installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 // to ensure it can be forwarded to the external addresses. - InstallMulticastFlow(multicastIP net.IP) error - // UninstallMulticastFlow removes the flow matching the given multicastIP. - UninstallMulticastFlow(multicastIP net.IP) error + InstallMulticastFlows(multicastIP net.IP, groupID binding.GroupIDType) error + // UninstallMulticastFlows removes the flow matching the given multicastIP. + UninstallMulticastFlows(multicastIP net.IP) error // SendIGMPQueryPacketOut sends the IGMPQuery packet as a packet-out to OVS from the gateway port. SendIGMPQueryPacketOut( dstMAC net.HardwareAddr, dstIP net.IP, outPort uint32, igmp ofutil.Message) error + InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32) error } // GetFlowTableStatus returns an array of flow table status. @@ -532,7 +533,7 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff return nil } -func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error { +func (c *client) UninstallGroup(groupID binding.GroupIDType) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() if !c.bridge.DeleteGroup(groupID) { @@ -730,7 +731,7 @@ func (c *client) generatePipelines() { if c.enableMulticast { // TODO: add support for IPv6 protocol - c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}) + c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge) c.activatedFeatures = append(c.activatedFeatures, c.featureMulticast) } c.featureTraceflow = newFeatureTraceflow() @@ -820,6 +821,9 @@ func (c *client) ReplayFlows() { } c.featureService.replayGroups() + if c.enableMulticast { + c.featureMulticast.replayGroups() + } for _, activeFeature := range c.activatedFeatures { if err := c.ofEntryOperations.AddAll(activeFeature.replayFlows()); err != nil { @@ -1095,15 +1099,15 @@ func (c *client) InstallMulticastInitialFlows(pktInReason uint8) error { return c.addFlows(c.featureMulticast.cachedFlows, cacheKey, flows) } -func (c *client) InstallMulticastFlow(multicastIP net.IP) error { - flows := c.featureMulticast.localMulticastForwardFlow(multicastIP) +func (c *client) InstallMulticastFlows(multicastIP net.IP, groupID binding.GroupIDType) error { + flows := c.featureMulticast.localMulticastForwardFlows(multicastIP, groupID) cacheKey := fmt.Sprintf("multicast_%s", multicastIP.String()) c.replayMutex.RLock() defer c.replayMutex.RUnlock() return c.addFlows(c.featureMulticast.cachedFlows, cacheKey, flows) } -func (c *client) UninstallMulticastFlow(multicastIP net.IP) error { +func (c *client) UninstallMulticastFlows(multicastIP net.IP) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() cacheKey := fmt.Sprintf("multicast_%s", multicastIP.String()) @@ -1129,3 +1133,14 @@ func (c *client) SendIGMPQueryPacketOut( packetOutObj := packetOutBuilder.Done() return c.bridge.SendPacketOut(packetOutObj) } + +func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + + targetPorts := append([]uint32{config.HostGatewayOFPort}, localReceivers...) + if err := c.featureMulticast.multicastReceiversGroup(groupID, targetPorts...); err != nil { + return err + } + return nil +} diff --git a/pkg/agent/openflow/framework.go b/pkg/agent/openflow/framework.go index c01edea5ca3..4d26157da46 100644 --- a/pkg/agent/openflow/framework.go +++ b/pkg/agent/openflow/framework.go @@ -242,7 +242,8 @@ func (f *featureEgress) getRequiredTables() []*Table { func (f *featureMulticast) getRequiredTables() []*Table { return []*Table{ - MulticastTable, + MulticastRoutingTable, + MulticastOutputTable, } } diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 3d416aa71fa..871ecc91b68 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -15,7 +15,11 @@ package openflow import ( + "fmt" "net" + "sync" + + "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/openflow/cookie" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -26,8 +30,10 @@ var _, mcastCIDR, _ = net.ParseCIDR("224.0.0.0/4") type featureMulticast struct { cookieAllocator cookie.Allocator ipProtocols []binding.Protocol + bridge binding.Bridge cachedFlows *flowCategoryCache + groupCache sync.Map category cookie.Category } @@ -36,12 +42,14 @@ func (f *featureMulticast) getFeatureName() string { return "Multicast" } -func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol) *featureMulticast { +func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge) *featureMulticast { return &featureMulticast{ cookieAllocator: cookieAllocator, ipProtocols: ipProtocols, cachedFlows: newFlowCategoryCache(), + bridge: bridge, category: cookie.Multicast, + groupCache: sync.Map{}, } } @@ -56,10 +64,48 @@ func multicastPipelineClassifyFlow(cookieID uint64, pipeline binding.Pipeline) b } func (f *featureMulticast) initFlows() []binding.Flow { - return []binding.Flow{} + cookieID := f.cookieAllocator.Request(f.category).Raw() + return []binding.Flow{ + f.multicastOutputFlow(cookieID), + } } func (f *featureMulticast) replayFlows() []binding.Flow { // Get cached flows. return getCachedFlows(f.cachedFlows) } + +func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, ports ...uint32) error { + group := f.bridge.CreateGroupTypeAll(groupID).ResetBuckets() + for i := range ports { + group = group.Bucket(). + LoadToRegField(OFPortFoundRegMark.GetField(), OFPortFoundRegMark.GetValue()). + LoadToRegField(TargetOFPortField, ports[i]). + ResubmitToTable(MulticastRoutingTable.GetNext()). + Done() + } + if err := group.Add(); err != nil { + return fmt.Errorf("error when installing Multicast receiver Group: %w", err) + } + f.groupCache.Store(groupID, group) + return nil +} + +func (f *featureMulticast) multicastOutputFlow(cookieID uint64) binding.Flow { + return MulticastOutputTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchRegMark(OFPortFoundRegMark). + Action().OutputToRegField(TargetOFPortField). + Done() +} + +func (f *featureMulticast) replayGroups() { + f.groupCache.Range(func(id, value interface{}) bool { + group := value.(binding.Group) + group.Reset() + if err := group.Add(); err != nil { + klog.Errorf("Error when replaying cached group %d: %v", id, err) + } + return true + }) +} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index ccd527accaa..b4e8b0939fc 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -180,7 +180,10 @@ var ( // Tables of pipelineMulticast are declared below. Do don't declare any tables of other pipelines here! // Tables in stageRouting: - MulticastTable = newTable("Multicast", stageRouting, pipelineMulticast) + MulticastRoutingTable = newTable("MulticastRouting", stageRouting, pipelineMulticast) + + // Tables in stageOutput + MulticastOutputTable = newTable("MulticastOutput", stageOutput, pipelineMulticast) // Flow priority level priorityHigh = uint16(210) @@ -2637,8 +2640,8 @@ func pipelineClassifyFlow(cookieID uint64, protocol binding.Protocol, pipeline b Done() } -// igmpPktInFlows generates the flow to load CustomReasonIGMPRegMark to mark the IGMP packet in MulticastTable and sends -// it to antrea-agent on MulticastTable. +// igmpPktInFlows generates the flow to load CustomReasonIGMPRegMark to mark the IGMP packet in MulticastRoutingTable and sends +// it to antrea-agent. func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { flows := []binding.Flow{ // Set a custom reason for the IGMP packets, and then send it to antrea-agent and forward it normally in the @@ -2646,7 +2649,7 @@ func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { // group and its members in the meanwhile. // Do not set dst IP address because IGMPv1 report message uses target multicast group as IP destination in // the packet. - MulticastTable.ofTable.BuildFlow(priorityHigh). + MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchProtocol(binding.ProtocolIGMP). MatchRegMark(FromLocalRegMark). @@ -2658,33 +2661,34 @@ func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { return flows } -// localMulticastForwardFlow generates the flow to forward multicast packets with OVS action "normal", and outputs +// localMulticastForwardFlows generates the flow to forward multicast packets with OVS action "normal", and outputs // it to Antrea gateway in the meanwhile, so that the packet can be forwarded to local Pods which have joined the Multicast // group and to the external receivers. For external multicast packets accessing to the given multicast IP also hits the // flow, and the packet is not sent back to Antrea gateway because OVS datapath will drop it when it finds the output // port is the same as the input port. -func (f *featureMulticast) localMulticastForwardFlow(multicastIP net.IP) []binding.Flow { +func (f *featureMulticast) localMulticastForwardFlows(multicastIP net.IP, groupID binding.GroupIDType) []binding.Flow { return []binding.Flow{ - MulticastTable.ofTable.BuildFlow(priorityNormal). + MulticastRoutingTable.ofTable.BuildFlow(priorityNormal). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchProtocol(binding.ProtocolIP). MatchDstIP(multicastIP). - Action().Output(config.HostGatewayOFPort). - Action().Normal(). + Action().Group(groupID). Done(), } } // externalMulticastReceiverFlow generates the flow to output multicast packets to Antrea gateway, so that local Pods can // send multicast packets to access the external receivers. For the case that one or more local Pods have joined the target -// multicast group, it is handled by the flows created by function "localMulticastForwardFlow" after local Pods report the +// multicast group, it is handled by the flows created by function "localMulticastForwardFlows" after local Pods report the // IGMP membership. func (f *featureMulticast) externalMulticastReceiverFlow() binding.Flow { - return MulticastTable.ofTable.BuildFlow(priorityLow). + return MulticastRoutingTable.ofTable.BuildFlow(priorityLow). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchProtocol(binding.ProtocolIP). MatchDstIPNet(*mcastCIDR). - Action().Output(config.HostGatewayOFPort). + Action().LoadRegMark(OFPortFoundRegMark). + Action().LoadToRegField(TargetOFPortField, config.HostGatewayOFPort). + Action().NextTable(). Done() } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 740cad169db..c306bc7590f 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -281,18 +281,32 @@ func (mr *MockClientMockRecorder) InstallEndpointFlows(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallEndpointFlows", reflect.TypeOf((*MockClient)(nil).InstallEndpointFlows), arg0, arg1) } -// InstallMulticastFlow mocks base method -func (m *MockClient) InstallMulticastFlow(arg0 net.IP) error { +// InstallMulticastFlows mocks base method +func (m *MockClient) InstallMulticastFlows(arg0 net.IP, arg1 openflow.GroupIDType) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallMulticastFlow", arg0) + ret := m.ctrl.Call(m, "InstallMulticastFlows", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// InstallMulticastFlow indicates an expected call of InstallMulticastFlow -func (mr *MockClientMockRecorder) InstallMulticastFlow(arg0 interface{}) *gomock.Call { +// InstallMulticastFlows indicates an expected call of InstallMulticastFlows +func (mr *MockClientMockRecorder) InstallMulticastFlows(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastFlow", reflect.TypeOf((*MockClient)(nil).InstallMulticastFlow), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastFlows), arg0, arg1) +} + +// InstallMulticastGroup mocks base method +func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticastGroup indicates an expected call of InstallMulticastGroup +func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1) } // InstallMulticastInitialFlows mocks base method @@ -611,18 +625,32 @@ func (mr *MockClientMockRecorder) UninstallEndpointFlows(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallEndpointFlows", reflect.TypeOf((*MockClient)(nil).UninstallEndpointFlows), arg0, arg1) } -// UninstallMulticastFlow mocks base method -func (m *MockClient) UninstallMulticastFlow(arg0 net.IP) error { +// UninstallGroup mocks base method +func (m *MockClient) UninstallGroup(arg0 openflow.GroupIDType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallGroup", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallGroup indicates an expected call of UninstallGroup +func (mr *MockClientMockRecorder) UninstallGroup(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallGroup", reflect.TypeOf((*MockClient)(nil).UninstallGroup), arg0) +} + +// UninstallMulticastFlows mocks base method +func (m *MockClient) UninstallMulticastFlows(arg0 net.IP) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UninstallMulticastFlow", arg0) + ret := m.ctrl.Call(m, "UninstallMulticastFlows", arg0) ret0, _ := ret[0].(error) return ret0 } -// UninstallMulticastFlow indicates an expected call of UninstallMulticastFlow -func (mr *MockClientMockRecorder) UninstallMulticastFlow(arg0 interface{}) *gomock.Call { +// UninstallMulticastFlows indicates an expected call of UninstallMulticastFlows +func (mr *MockClientMockRecorder) UninstallMulticastFlows(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallMulticastFlow", reflect.TypeOf((*MockClient)(nil).UninstallMulticastFlow), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallMulticastFlows", reflect.TypeOf((*MockClient)(nil).UninstallMulticastFlows), arg0) } // UninstallNodeFlows mocks base method @@ -710,20 +738,6 @@ func (mr *MockClientMockRecorder) UninstallServiceFlows(arg0, arg1, arg2 interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallServiceFlows", reflect.TypeOf((*MockClient)(nil).UninstallServiceFlows), arg0, arg1, arg2) } -// UninstallServiceGroup mocks base method -func (m *MockClient) UninstallServiceGroup(arg0 openflow.GroupIDType) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UninstallServiceGroup", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// UninstallServiceGroup indicates an expected call of UninstallServiceGroup -func (mr *MockClientMockRecorder) UninstallServiceGroup(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallServiceGroup", reflect.TypeOf((*MockClient)(nil).UninstallServiceGroup), arg0) -} - // UninstallTraceflowFlows mocks base method func (m *MockClient) UninstallTraceflowFlows(arg0 byte) error { m.ctrl.T.Helper() diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index d3e6a4d31ea..96f10c3c4d8 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -173,7 +173,7 @@ func (p *proxier) removeStaleServices() { // Remove Service group whose Endpoints are local. if svcInfo.NodeLocalExternal() { if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist { - if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil { + if err := p.ofClient.UninstallGroup(groupIDLocal); err != nil { klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName) continue } @@ -182,7 +182,7 @@ func (p *proxier) removeStaleServices() { } // Remove Service group which has all Endpoints. if groupID, exist := p.groupCounter.Get(svcPortName, false); exist { - if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { + if err := p.ofClient.UninstallGroup(groupID); err != nil { klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName) continue } @@ -555,7 +555,7 @@ func (p *proxier) installServices() { continue } if groupID, exist := p.groupCounter.Get(svcPortName, !nodeLocalVal); exist { - if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { + if err := p.ofClient.UninstallGroup(groupID); err != nil { klog.ErrorS(err, "Failed to uninstall Group of all Endpoints for Service", "Service", svcPortName) continue } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index fc809925fad..6c6ec07c2ae 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -347,7 +347,7 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), nodeLocalVal, corev1.ServiceTypeLoadBalancer).Times(1) } groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal) - mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) + mockOFClient.EXPECT().UninstallGroup(groupID).Times(1) } mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if proxyLoadBalancerIPs { @@ -466,7 +466,7 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalVal, corev1.ServiceTypeNodePort).Times(1) groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal) - mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) + mockOFClient.EXPECT().UninstallGroup(groupID).Times(1) } mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(gomock.Any(), uint16(svcNodePort), bindingProtocol).Times(1) @@ -748,7 +748,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(1) fp.syncProxyRules() fp.serviceChanges.OnServiceUpdate(service, nil) @@ -1231,8 +1231,8 @@ func TestServicesWithSameEndpoints(t *testing.T) { mockOFClient.EXPECT().InstallServiceFlows(groupID2, svcIP2, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP1, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP2, uint16(svcPort), bindingProtocol).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(groupID1).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(groupID2).Times(1) + mockOFClient.EXPECT().UninstallGroup(groupID1).Times(1) + mockOFClient.EXPECT().UninstallGroup(groupID2).Times(1) // Since these two Services reference to the same Endpoint, there should only be one operation. mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) diff --git a/pkg/agent/types/event.go b/pkg/agent/types/event.go new file mode 100644 index 00000000000..eea70f66e42 --- /dev/null +++ b/pkg/agent/types/event.go @@ -0,0 +1,22 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +type PodUpdate struct { + PodNamespace string + PodName string + IsAdd bool + ContainerID string +} diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index a086f9a4044..74e362e69b0 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -94,6 +94,7 @@ type Bridge interface { CreateTable(table Table, next uint8, missAction MissActionType) Table // AddTable adds table on the Bridge. Return true if the operation succeeds, otherwise return false. DeleteTable(id uint8) bool + CreateGroupTypeAll(id GroupIDType) Group CreateGroup(id GroupIDType) Group DeleteGroup(id GroupIDType) bool CreateMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index c1b5e11dffb..e5b2b886cfc 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -189,8 +189,16 @@ type OFBridge struct { multipartReplyChs map[uint32]chan *openflow13.MultipartReply } +func (b *OFBridge) CreateGroupTypeAll(id GroupIDType) Group { + return b.createGroupWithType(id, ofctrl.GroupAll) +} + func (b *OFBridge) CreateGroup(id GroupIDType) Group { - ofctrlGroup, err := b.ofSwitch.NewGroup(uint32(id), ofctrl.GroupSelect) + return b.createGroupWithType(id, ofctrl.GroupSelect) +} + +func (b *OFBridge) createGroupWithType(id GroupIDType, groupType ofctrl.GroupType) Group { + ofctrlGroup, err := b.ofSwitch.NewGroup(uint32(id), groupType) if err != nil { // group already exists ofctrlGroup = b.ofSwitch.GetGroup(uint32(id)) } diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index 283a606c538..45a1e3f474c 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -134,6 +134,20 @@ func (mr *MockBridgeMockRecorder) CreateGroup(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGroup", reflect.TypeOf((*MockBridge)(nil).CreateGroup), arg0) } +// CreateGroupTypeAll mocks base method +func (m *MockBridge) CreateGroupTypeAll(arg0 openflow.GroupIDType) openflow.Group { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateGroupTypeAll", arg0) + ret0, _ := ret[0].(openflow.Group) + return ret0 +} + +// CreateGroupTypeAll indicates an expected call of CreateGroupTypeAll +func (mr *MockBridgeMockRecorder) CreateGroupTypeAll(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGroupTypeAll", reflect.TypeOf((*MockBridge)(nil).CreateGroupTypeAll), arg0) +} + // CreateMeter mocks base method func (m *MockBridge) CreateMeter(arg0 openflow.MeterIDType, arg1 ofctrl.MeterFlag) openflow.Meter { m.ctrl.T.Helper() diff --git a/pkg/util/channel/channel.go b/pkg/util/channel/channel.go index b10e9b4d4f3..019eee536d1 100644 --- a/pkg/util/channel/channel.go +++ b/pkg/util/channel/channel.go @@ -25,7 +25,7 @@ const ( notifyTimeout = time.Second ) -type eventHandler func(string) +type eventHandler func(interface{}) type Subscriber interface { // Subscribe registers an eventHandler which will be called when an event is sent to the channel. @@ -37,7 +37,7 @@ type Subscriber interface { type Notifier interface { // Notify sends an event to the channel. - Notify(string) bool + Notify(interface{}) bool } // SubscribableChannel is different from the Go channel which dispatches every event to only single consumer regardless @@ -47,7 +47,7 @@ type SubscribableChannel struct { // The name of the channel, used for logging purpose to differentiate multiple channels. name string // eventCh is the channel used for buffering the pending events. - eventCh chan string + eventCh chan interface{} // handlers is a slice of callbacks registered by consumers. handlers []eventHandler } @@ -55,7 +55,7 @@ type SubscribableChannel struct { func NewSubscribableChannel(name string, bufferSize int) *SubscribableChannel { n := &SubscribableChannel{ name: name, - eventCh: make(chan string, bufferSize), + eventCh: make(chan interface{}, bufferSize), } return n } @@ -64,7 +64,7 @@ func (n *SubscribableChannel) Subscribe(h eventHandler) { n.handlers = append(n.handlers, h) } -func (n *SubscribableChannel) Notify(e string) bool { +func (n *SubscribableChannel) Notify(e interface{}) bool { timer := time.NewTimer(notifyTimeout) defer timer.Stop() select { diff --git a/pkg/util/channel/channel_test.go b/pkg/util/channel/channel_test.go index 2eee503f71f..e848bd0194a 100644 --- a/pkg/util/channel/channel_test.go +++ b/pkg/util/channel/channel_test.go @@ -36,10 +36,10 @@ func newEventReceiver() *eventReceiver { } } -func (r *eventReceiver) receive(e string) { +func (r *eventReceiver) receive(e interface{}) { r.mutex.Lock() defer r.mutex.Unlock() - r.receivedEvents.Insert(e) + r.receivedEvents.Insert(e.(string)) } func (r *eventReceiver) received() sets.String { diff --git a/test/e2e/multicast_test.go b/test/e2e/multicast_test.go index 31a49751bbe..0d7755550ff 100644 --- a/test/e2e/multicast_test.go +++ b/test/e2e/multicast_test.go @@ -83,6 +83,7 @@ func TestMulticast(t *testing.T) { for _, mc := range testcases { mc := mc t.Run(mc.name, func(t *testing.T) { + t.Parallel() runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) }) } @@ -122,6 +123,7 @@ func TestMulticast(t *testing.T) { for _, mc := range testcases { mc := mc t.Run(mc.name, func(t *testing.T) { + t.Parallel() runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) }) } @@ -189,7 +191,7 @@ func runTestMulticastForwardToMultipleInterfaces(t *testing.T, data *TestData, s } } -func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces [][]string) { +func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string) { mcjoinWaitTimeout := defaultTimeout / time.Second gatewayInterface, err := data.GetGatewayInterfaceName(antreaNamespace) failOnError(err, t) @@ -209,10 +211,10 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc defer wg.Done() // The following command joins a multicast group and sets the timeout to 100 seconds(-W 100) before exit. // The command will return after receiving 1 packet(-c 1). - receiveMulticastCommand := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -c 1 -o -p %d -W %d %s", mc.port, mcjoinWaitTimeout, mc.group.String())} + receiveMulticastCommand := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -c 10 -o -p %d -W %d %s", mc.port, mcjoinWaitTimeout, mc.group.String())} res, _, err := data.RunCommandFromPod(testNamespace, r, mcjoinContainerName, receiveMulticastCommand) failOnError(err, t) - assert.Contains(t, res, "Total: 1 packets") + assert.Contains(t, res, "Total: 10 packets") }() } // Wait 2 seconds(-w 2) before sending multicast traffic. @@ -222,23 +224,32 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc data.RunCommandFromPod(testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) }() - if err := wait.Poll(5*time.Second, defaultTimeout, func() (bool, error) { - // Sender pods should add an outbound multicast route except running as HostNetwork. - _, mrouteResult, _, err := data.RunCommandOnNode(nodeName(mc.senderConfig.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s | grep '%s'", mc.group.String(), gatewayInterface, strings.Join(nodeMulticastInterfaces[mc.senderConfig.nodeIdx], " "))) - if err != nil { - return false, err - } - if !mc.senderConfig.isHostNetwork { - if len(mrouteResult) == 0 { - return false, nil + readyReceivers := sets.NewInt() + senderReady := false + if err := wait.Poll(3*time.Second, defaultTimeout, func() (bool, error) { + if !senderReady { + // Sender pods should add an outbound multicast route except running as HostNetwork. + _, mrouteResult, _, err := data.RunCommandOnNode(nodeName(mc.senderConfig.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s | grep '%s'", mc.group.String(), gatewayInterface, strings.Join(nodeMulticastInterfaces[mc.senderConfig.nodeIdx], " "))) + if err != nil { + return false, err } - } else { - if len(mrouteResult) != 0 { - return false, nil + if !mc.senderConfig.isHostNetwork { + if len(mrouteResult) == 0 { + return false, nil + } + } else { + if len(mrouteResult) != 0 { + return false, nil + } } + senderReady = true } + // Check inbound multicast route and whether multicast interfaces has joined the multicast group. for _, receiver := range mc.receiverConfigs { + if readyReceivers.Has(receiver.nodeIdx) { + continue + } for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiver.nodeIdx] { _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) if err != nil { @@ -272,6 +283,7 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc } } } + readyReceivers = readyReceivers.Insert(receiver.nodeIdx) } return true, nil }); err != nil { @@ -282,7 +294,7 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc // computeMulticastInterfaces computes multicastInterfaces for each node. // It returns [][]string with its index as node index and value as multicastInterfaces for this node. -func computeMulticastInterfaces(t *testing.T, data *TestData) ([][]string, error) { +func computeMulticastInterfaces(t *testing.T, data *TestData) (map[int][]string, error) { multicastInterfaces, err := data.GetMulticastInterfaces(antreaNamespace) if err != nil { return nil, err @@ -291,7 +303,7 @@ func computeMulticastInterfaces(t *testing.T, data *TestData) ([][]string, error if err != nil { t.Fatalf("Error getting transport interfaces: %v", err) } - nodeMulticastInterfaces := make([][]string, 0, len(clusterInfo.nodes)) + nodeMulticastInterfaces := make(map[int][]string) for nodeIdx := range clusterInfo.nodes { _, localInterfacesStr, _, err := data.RunCommandOnNode(nodeName(nodeIdx), "ls /sys/class/net") if err != nil { @@ -303,7 +315,7 @@ func computeMulticastInterfaces(t *testing.T, data *TestData) ([][]string, error externalMulticastInterfaces := localInterfacesSet.Intersection(multicastInterfaceSet) currNodeMulticastInterfaces := externalMulticastInterfaces.Insert(transportInterface).List() t.Logf("Multicast interfaces for node index %d is %+v", nodeIdx, currNodeMulticastInterfaces) - nodeMulticastInterfaces = append(nodeMulticastInterfaces, currNodeMulticastInterfaces) + nodeMulticastInterfaces[nodeIdx] = currNodeMulticastInterfaces } return nodeMulticastInterfaces, nil } diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 7bc59f477e4..588e0dceae9 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -703,7 +703,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint groupID := ofconfig.GroupIDType(gid) err := c.UninstallServiceFlows(svc.ip, svc.port, svc.protocol) assert.Nil(t, err) - err = c.UninstallServiceGroup(groupID) + err = c.UninstallGroup(groupID) assert.Nil(t, err) for _, ep := range endpointList { err := c.UninstallEndpointFlows(svc.protocol, ep) diff --git a/test/integration/ovs/ofctrl_test.go b/test/integration/ovs/ofctrl_test.go index b646cceeb39..73769a10e58 100644 --- a/test/integration/ovs/ofctrl_test.go +++ b/test/integration/ovs/ofctrl_test.go @@ -269,6 +269,8 @@ func TestOFctrlGroup(t *testing.T) { ovsCtlClient := ovsctl.NewClient(brName) + id := 1 + for name, buckets := range map[string][]struct { weight uint16 // Must have non-zero value. reg2reg [][4]uint32 // regNum, data, startIndex, endIndex @@ -278,12 +280,25 @@ func TestOFctrlGroup(t *testing.T) { {weight: 100, reg2reg: [][4]uint32{{0, 1, 0, 31}, {1, 2, 15, 31}}, resubmitTable: 31}, {weight: 110, resubmitTable: 42}, }, + "TypeAll": { + {weight: 100, reg2reg: [][4]uint32{{0, 1, 0, 31}, {1, 2, 15, 31}}, resubmitTable: 31}, + {weight: 110, resubmitTable: 42}, + }, } { t.Run(name, func(t *testing.T) { - group := br.CreateGroup(1) + var group binding.Group + gid := binding.GroupIDType(id) + if name == "TypeAll" { + group = br.CreateGroupTypeAll(gid) + } else { + group = br.CreateGroup(gid) + } for _, bucket := range buckets { require.NotZero(t, bucket.weight, "Weight value of a bucket must be specified") - bucketBuilder := group.Bucket().Weight(bucket.weight) + bucketBuilder := group.Bucket() + if name == "Normal" { + bucketBuilder = bucketBuilder.Weight(bucket.weight) + } if bucket.resubmitTable != 0 { bucketBuilder = bucketBuilder.ResubmitToTable(bucket.resubmitTable) } @@ -303,8 +318,10 @@ func TestOFctrlGroup(t *testing.T) { }), "Failed to install group") dumpedGroup := groups[0] for i, bucket := range buckets { - // Must have weight - assert.True(t, strings.Contains(dumpedGroup[i+1], fmt.Sprintf("weight:%d", bucket.weight))) + if name == "Normal" { + // Must have weight + assert.True(t, strings.Contains(dumpedGroup[i+1], fmt.Sprintf("weight:%d", bucket.weight))) + } for _, loading := range bucket.reg2reg { rngStr := "[]" if !(loading[2] == 0 && loading[3] == 31) { @@ -326,6 +343,7 @@ func TestOFctrlGroup(t *testing.T) { return len(groups) == 0, nil }), "Failed to delete group") }) + id++ } }