diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 0f3bd816c05..19f96b7a10f 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -579,6 +579,7 @@ func run(o *Options) error { } mcastController := multicast.NewMulticastController( ofClient, + v4GroupIDAllocator, nodeConfig, ifaceStore, multicastSocket, diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index dd4d251225e..98182083675 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -311,6 +311,7 @@ func (pc *podConfigurator) removeInterfaces(containerID string) error { if err := pc.routeClient.DeleteLocalAntreaFlexibleIPAMPodRule(containerConfig.IPs); err != nil { return err } + return nil } diff --git a/pkg/agent/interfacestore/interface_cache.go b/pkg/agent/interfacestore/interface_cache.go index 0045d29f4b7..88e23f10875 100644 --- a/pkg/agent/interfacestore/interface_cache.go +++ b/pkg/agent/interfacestore/interface_cache.go @@ -64,7 +64,8 @@ const ( // Todo: add periodic task to sync local cache with container veth pair type interfaceCache struct { - cache cache.Indexer + cache cache.Indexer + podInterfaceDeletionHandlers []chan *InterfaceConfig } func (c *interfaceCache) Initialize(interfaces []*InterfaceConfig) { @@ -107,6 +108,9 @@ func (c *interfaceCache) DeleteInterface(interfaceConfig *InterfaceConfig) { c.cache.Delete(interfaceConfig) if interfaceConfig.Type == ContainerInterface { + for _, h := range c.podInterfaceDeletionHandlers { + h <- interfaceConfig + } metrics.PodCount.Dec() } } @@ -208,6 +212,10 @@ func (c *interfaceCache) GetInterfaceByOFPort(ofPort uint32) (*InterfaceConfig, return interfaceConfigs[0].(*InterfaceConfig), true } +func (c *interfaceCache) RegisterInterfaceDeletionHandler(handler chan *InterfaceConfig) { + c.podInterfaceDeletionHandlers = append(c.podInterfaceDeletionHandlers, handler) +} + func interfaceNameIndexFunc(obj interface{}) ([]string, error) { interfaceConfig := obj.(*InterfaceConfig) return []string{interfaceConfig.InterfaceName}, nil diff --git a/pkg/agent/interfacestore/testing/mock_interfacestore.go b/pkg/agent/interfacestore/testing/mock_interfacestore.go index 3e8aa50fbc5..887ff94b6ba 100644 --- a/pkg/agent/interfacestore/testing/mock_interfacestore.go +++ b/pkg/agent/interfacestore/testing/mock_interfacestore.go @@ -1,4 +1,4 @@ -// Copyright 2021 Antrea Authors +// Copyright 2022 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -257,3 +257,15 @@ func (mr *MockInterfaceStoreMockRecorder) Len() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Len", reflect.TypeOf((*MockInterfaceStore)(nil).Len)) } + +// RegisterInterfaceDeletionHandler mocks base method +func (m *MockInterfaceStore) RegisterInterfaceDeletionHandler(arg0 chan *interfacestore.InterfaceConfig) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RegisterInterfaceDeletionHandler", arg0) +} + +// RegisterInterfaceDeletionHandler indicates an expected call of RegisterInterfaceDeletionHandler +func (mr *MockInterfaceStoreMockRecorder) RegisterInterfaceDeletionHandler(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterInterfaceDeletionHandler", reflect.TypeOf((*MockInterfaceStore)(nil).RegisterInterfaceDeletionHandler), arg0) +} diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index a443dd6dd6d..323f53917ba 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -105,6 +105,7 @@ type InterfaceStore interface { GetInterfacesByType(interfaceType InterfaceType) []*InterfaceConfig Len() int GetInterfaceKeysByType(interfaceType InterfaceType) []string + RegisterInterfaceDeletionHandler(handler chan *InterfaceConfig) } // NewContainerInterface creates InterfaceConfig for a Pod. diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index c11a923371f..1def41afd11 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -28,6 +28,7 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" + binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" ) @@ -60,9 +61,10 @@ type mcastGroupEvent struct { type GroupMemberStatus struct { group net.IP // localMembers is a set for the local Pod's interface name which has joined in the multicast group. - localMembers sets.String + localMembers map[string]time.Time lastIGMPReport time.Time mutex sync.RWMutex + ofGroupID binding.GroupIDType } // eventHandler process the multicast Group membership report or leave messages. @@ -82,7 +84,8 @@ func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) { status := &GroupMemberStatus{ group: e.group, lastIGMPReport: e.time, - localMembers: sets.NewString(e.iface.InterfaceName), + localMembers: map[string]time.Time{e.iface.InterfaceName: e.time}, + ofGroupID: c.v4GroupAllocator.Allocate(), } c.groupCache.Add(status) c.queue.Add(e.group.String()) @@ -101,25 +104,35 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent defer status.mutex.Unlock() newStatus := &GroupMemberStatus{ group: status.group, - localMembers: status.localMembers.Union(nil), + localMembers: make(map[string]time.Time), lastIGMPReport: status.lastIGMPReport, + ofGroupID: status.ofGroupID, } - exist := status.localMembers.Has(e.iface.InterfaceName) + for m, t := range status.localMembers { + newStatus.localMembers[m] = t + } + _, exist := status.localMembers[e.iface.InterfaceName] switch e.eType { case groupJoin: newStatus.lastIGMPReport = e.time + newStatus.localMembers[e.iface.InterfaceName] = e.time + c.groupCache.Update(newStatus) if !exist { - newStatus.localMembers.Insert(e.iface.InterfaceName) klog.InfoS("Added member to multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + c.queue.Add(newStatus.group.String()) } - c.groupCache.Update(newStatus) - c.queue.Add(newStatus.group.String()) case groupLeave: if exist { - newStatus.localMembers.Delete(e.iface.InterfaceName) + delete(newStatus.localMembers, e.iface.InterfaceName) c.groupCache.Update(newStatus) klog.InfoS("Deleted member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - if len(newStatus.localMembers) == 0 { + _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) + // Notify worker immediately about member leave event if the member doesn't exist on the Node, or there are + // other local member in the multicast group. + if !found || len(newStatus.localMembers) > 0 { + c.queue.Add(newStatus.group.String()) + } else { + // Check if all local members have left the multicast group. klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) c.checkLastMember(e.group) } @@ -139,7 +152,7 @@ func (c *Controller) checkLastMember(group net.IP) { c.queue.AddAfter(group.String(), igmpMaxResponseTime) } -// clearStaleGroups checks the stale groups which have not been updated for mcastGroupTimeout, and then notifies worker +// clearStaleGroups checks the stale group members which have not been updated for mcastGroupTimeout, and then notifies worker // to remove them from groupCache. func (c *Controller) clearStaleGroups() { now := time.Now() @@ -147,44 +160,83 @@ func (c *Controller) clearStaleGroups() { status := obj.(*GroupMemberStatus) status.mutex.RLock() diff := now.Sub(status.lastIGMPReport) - status.mutex.RUnlock() if diff > mcastGroupTimeout { + // Notify worker to remove the group from groupCache if all members in tt are not updated mcastGroupTimeout. c.queue.Add(status.group.String()) + } else { + // Create a "leave" event for local member if it is not updated for mcastGroupTimeout. + for member, lastUpdate := range status.localMembers { + if now.Sub(lastUpdate) > mcastGroupTimeout { + ifConfig, _ := c.ifaceStore.GetInterfaceByName(member) + event := &mcastGroupEvent{ + group: status.group, + eType: groupLeave, + time: now, + iface: ifConfig, + } + c.groupEventCh <- event + } + } } + status.mutex.RUnlock() + } +} + +// removeLocalInterface searches the GroupMemberStatus which the deleted interface has joined, and then trigger a member +// leave event so that antrea can remove the corresponding interface from local multicast receivers on OVS. This function +// should be called if the removed Pod receiver fails to send IGMP leave message before deletion. +func (c *Controller) removeLocalInterface(ifConfig *interfacestore.InterfaceConfig) { + groupStatuses := c.getGroupMemberStatusesByPod(ifConfig.InterfaceName) + for _, g := range groupStatuses { + event := &mcastGroupEvent{ + group: g.group, + eType: groupLeave, + time: time.Now(), + iface: ifConfig, + } + c.groupEventCh <- event } } type Controller struct { - ofClient openflow.Client - nodeConfig *config.NodeConfig - igmpSnooper *IGMPSnooper - groupEventCh chan *mcastGroupEvent - groupCache cache.Indexer - queue workqueue.RateLimitingInterface + ofClient openflow.Client + v4GroupAllocator openflow.GroupAllocator + ifaceStore interfacestore.InterfaceStore + nodeConfig *config.NodeConfig + igmpSnooper *IGMPSnooper + groupEventCh chan *mcastGroupEvent + groupCache cache.Indexer + queue workqueue.RateLimitingInterface // installedGroups saves the groups which are configured on both OVS and the host. installedGroups sets.String installedGroupsMutex sync.RWMutex mRouteClient *MRouteClient ovsBridgeClient ovsconfig.OVSBridgeClient + podDeletionCh chan *interfacestore.InterfaceConfig } -func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, multicastSocket RouteInterface, multicastInterfaces sets.String, ovsBridgeClient ovsconfig.OVSBridgeClient) *Controller { +func NewMulticastController(ofClient openflow.Client, v4GroupAllocator openflow.GroupAllocator, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, multicastSocket RouteInterface, multicastInterfaces sets.String, ovsBridgeClient ovsconfig.OVSBridgeClient) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) groupSnooper := newSnooper(ofClient, ifaceStore, eventCh) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces) + podDeletionCh := make(chan *interfacestore.InterfaceConfig, 10) + ifaceStore.RegisterInterfaceDeletionHandler(podDeletionCh) return &Controller{ - ofClient: ofClient, - nodeConfig: nodeConfig, - igmpSnooper: groupSnooper, - groupEventCh: eventCh, - groupCache: groupCache, - installedGroups: sets.NewString(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), - mRouteClient: multicastRouteClient, - ovsBridgeClient: ovsBridgeClient, + ofClient: ofClient, + ifaceStore: ifaceStore, + v4GroupAllocator: v4GroupAllocator, + nodeConfig: nodeConfig, + igmpSnooper: groupSnooper, + groupEventCh: eventCh, + groupCache: groupCache, + installedGroups: sets.NewString(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), + mRouteClient: multicastRouteClient, + ovsBridgeClient: ovsBridgeClient, + podDeletionCh: podDeletionCh, } } @@ -222,6 +274,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { // Periodically check the group member status, and remove the groups in which no members exist go wait.NonSlidingUntil(c.clearStaleGroups, queryInterval, stopCh) + go c.podDeletionHandler(stopCh) go c.eventHandler(stopCh) for i := 0; i < int(workerCount); i++ { @@ -298,15 +351,29 @@ func (c *Controller) syncGroup(groupKey string) error { return nil } status := obj.(*GroupMemberStatus) + memberPorts := make([]uint32, 0) + status.mutex.Lock() + defer status.mutex.Unlock() + for memberInterfaceName := range status.localMembers { + obj, found := c.ifaceStore.GetInterfaceByName(memberInterfaceName) + if !found { + klog.InfoS("Failed to find interface from cache", "interface", memberInterfaceName) + continue + } + memberPorts = append(memberPorts, uint32(obj.OFPort)) + } if c.groupHasInstalled(groupKey) { - status.mutex.Lock() - defer status.mutex.Unlock() if c.groupIsStale(status) { // Remove the multicast flow entry if no local Pod is in the group. if err := c.ofClient.UninstallMulticastFlow(status.group); err != nil { klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) return err } + // Remove the multicast flow entry if no local Pod is in the group. + if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil { + klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey) + return err + } err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) if err != nil { klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) @@ -320,10 +387,22 @@ func (c *Controller) syncGroup(groupKey string) error { c.delInstalledGroup(groupKey) c.groupCache.Delete(status) klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) + return nil + } + // Reinstall OpenFlow group because the local pod receivers have changed. + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + return err } + klog.V(2).InfoS("Updated OpenFlow group for local receivers for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) return nil } - if err := c.ofClient.InstallMulticastFlow(status.group); err != nil { + // Install OpenFlow group for a new multicast group in which local pod receivers have joined. + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + return err + } + klog.V(2).InfoS("Installed OpenFlow group for local receivers for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + // Install OpenFlow flow to forward packets to local pod receivers which are included in the group. + if err := c.ofClient.InstallMulticastFlow(status.group, status.ofGroupID); err != nil { klog.ErrorS(err, "Failed to install multicast flows", "group", status.group) return err } @@ -377,11 +456,23 @@ func (c *Controller) addOrUpdateGroupEvent(e *mcastGroupEvent) { c.updateGroupMemberStatus(obj, e) } } + return +} + +func (c *Controller) podDeletionHandler(stopCh <-chan struct{}) { + for { + select { + case e := <-c.podDeletionCh: + c.removeLocalInterface(e) + case <-stopCh: + return + } + } } func podInterfaceIndexFunc(obj interface{}) ([]string, error) { groupState := obj.(*GroupMemberStatus) - podInterfaces := make([]string, 0, len(groupState.localMembers)) + podInterfaces := make([]string, 0) for podInterface := range groupState.localMembers { podInterfaces = append(podInterfaces, podInterface) } diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index 9918b5ffeb6..0fe1521eb25 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -36,6 +36,7 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" ifaceStoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" multicasttest "antrea.io/antrea/pkg/agent/multicast/testing" + "antrea.io/antrea/pkg/agent/openflow" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" ) @@ -45,16 +46,21 @@ var ( mockMulticastSocket *multicasttest.MockRouteInterface mockIfaceStore *ifaceStoretest.MockInterfaceStore ovsClient *ovsconfigtest.MockOVSBridgeClient - mgroup = net.ParseIP("224.96.1.3") if1 = &interfacestore.InterfaceConfig{ Type: interfacestore.ContainerInterface, InterfaceName: "if1", IPs: []net.IP{net.ParseIP("192.168.1.1")}, + OVSPortConfig: &interfacestore.OVSPortConfig{ + OFPort: 1, + }, } if2 = &interfacestore.InterfaceConfig{ Type: interfacestore.ContainerInterface, InterfaceName: "if2", IPs: []net.IP{net.ParseIP("192.168.1.2")}, + OVSPortConfig: &interfacestore.OVSPortConfig{ + OFPort: 2, + }, } nodeIf1IP = net.ParseIP("192.168.20.22") externalInterfaceIP = net.ParseIP("192.168.50.23") @@ -63,6 +69,7 @@ var ( ) func TestAddGroupMemberStatus(t *testing.T) { + mgroup := net.ParseIP("224.96.1.3") event := &mcastGroupEvent{ group: mgroup, eType: groupJoin, @@ -82,7 +89,9 @@ func TestAddGroupMemberStatus(t *testing.T) { key, ok := obj.(string) assert.True(t, ok) assert.Equal(t, mgroup.String(), key) - mockOFClient.EXPECT().InstallMulticastFlow(mgroup).Times(1) + mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastFlow(mgroup, gomock.Any()).Times(1) mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) err = mctrl.syncGroup(key) assert.Nil(t, err) @@ -94,6 +103,7 @@ func TestUpdateGroupMemberStatus(t *testing.T) { err := mctrl.initialize(t) assert.Nil(t, err) igmpMaxResponseTime = time.Second * 1 + mgroup := net.ParseIP("224.96.1.4") event := &mcastGroupEvent{ group: mgroup, eType: groupJoin, @@ -101,7 +111,6 @@ func TestUpdateGroupMemberStatus(t *testing.T) { iface: if1, } mctrl.addGroupMemberStatus(event) - obj, _, _ := mctrl.groupCache.GetByKey(event.group.String()) mockOFClient.EXPECT().SendIGMPQueryPacketOut(igmpQueryDstMac, mcastAllHosts, uint32(openflow13.P_NORMAL), gomock.Any()).Times(len(queryVersions)) for _, e := range []*mcastGroupEvent{ {group: mgroup, eType: groupJoin, time: event.time.Add(time.Second * 20), iface: if1}, @@ -110,6 +119,10 @@ func TestUpdateGroupMemberStatus(t *testing.T) { {group: mgroup, eType: groupLeave, time: event.time.Add(time.Second * 61), iface: if1}, {group: mgroup, eType: groupLeave, time: event.time.Add(time.Second * 62), iface: if2}, } { + obj, _, _ := mctrl.groupCache.GetByKey(event.group.String()) + if e.eType == groupLeave { + mockIfaceStore.EXPECT().GetInterfaceByName(e.iface.InterfaceName).Return(e.iface, true) + } mctrl.updateGroupMemberStatus(obj, e) groupCache := mctrl.groupCache compareGroupStatus(t, groupCache, e) @@ -127,9 +140,10 @@ func TestCheckLastMember(t *testing.T) { workerCount = 1 igmpMaxResponseTime = time.Second * 1 lastProbe := time.Now() + mgroup := net.ParseIP("224.96.1.2") testCheckLastMember := func(ev *mcastGroupEvent, expExist bool) { status := &GroupMemberStatus{ - localMembers: sets.NewString(), + localMembers: map[string]time.Time{}, lastIGMPReport: lastProbe, } if ev != nil { @@ -167,6 +181,9 @@ func TestCheckLastMember(t *testing.T) { } mctrl.queue.Forget(obj) } + mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true).Times(1) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(2) mockOFClient.EXPECT().UninstallMulticastFlow(gomock.Any()).Times(2) for _, tc := range []struct { ev *mcastGroupEvent @@ -194,30 +211,31 @@ func TestClearStaleGroups(t *testing.T) { mctrl.worker() wg.Done() }() - now := time.Now() + validUpdateTime := now.Add(-queryInterval) validGroups := []*GroupMemberStatus{ { group: net.ParseIP("224.96.1.2"), - localMembers: sets.NewString("p1", "p2"), - lastIGMPReport: now.Add(-queryInterval), + localMembers: map[string]time.Time{"p1": now, "p2": validUpdateTime}, + lastIGMPReport: validUpdateTime, }, { group: net.ParseIP("224.96.1.3"), - localMembers: sets.NewString(), - lastIGMPReport: now.Add(-queryInterval), + localMembers: map[string]time.Time{"p2": validUpdateTime}, + lastIGMPReport: validUpdateTime, }, } + staleUpdateTime := now.Add(-mcastGroupTimeout - time.Second) staleGroups := []*GroupMemberStatus{ { group: net.ParseIP("224.96.1.4"), - localMembers: sets.NewString("p1", "p3"), - lastIGMPReport: now.Add(-mcastGroupTimeout - time.Second), + localMembers: map[string]time.Time{"p1": staleUpdateTime, "p3": staleUpdateTime}, + lastIGMPReport: staleUpdateTime, }, { group: net.ParseIP("224.96.1.5"), - localMembers: sets.NewString(), - lastIGMPReport: now.Add(-mcastGroupTimeout - time.Second), + localMembers: map[string]time.Time{}, + lastIGMPReport: staleUpdateTime, }, } for _, g := range validGroups { @@ -225,11 +243,18 @@ func TestClearStaleGroups(t *testing.T) { assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) } + fakePort := int32(1) for _, g := range staleGroups { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + for m := range g.localMembers { + mockIface := &interfacestore.InterfaceConfig{InterfaceName: m, OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: fakePort}} + mockIfaceStore.EXPECT().GetInterfaceByName(m).Return(mockIface, true) + fakePort++ + } } + mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(len(staleGroups)) mockOFClient.EXPECT().UninstallMulticastFlow(gomock.Any()).Times(len(staleGroups)) mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(len(staleGroups)) mctrl.clearStaleGroups() @@ -251,7 +276,9 @@ func TestProcessPacketIn(t *testing.T) { snooper := mockController.igmpSnooper stopCh := make(chan struct{}) defer close(stopCh) - go mockController.eventHandler(stopCh) + go func() { + mockController.eventHandler(stopCh) + }() getIPs := func(ipStrs []string) []net.IP { ips := make([]net.IP, len(ipStrs)) @@ -285,6 +312,7 @@ func TestProcessPacketIn(t *testing.T) { version: 3, }, } { + mockIfaceStore.EXPECT().GetInterfaceByName(tc.iface.InterfaceName).Return(tc.iface, true).AnyTimes() packets := createIGMPReportPacketIn(getIPs(tc.joinedGroups.List()), getIPs(tc.leftGroups.List()), tc.version, uint32(tc.iface.OFPort)) mockOFClient.EXPECT().SendIGMPQueryPacketOut(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() for _, pkt := range packets { @@ -311,11 +339,11 @@ func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEven assert.Equal(t, event.group, status.group) if event.eType == groupJoin { assert.True(t, status.lastIGMPReport.Equal(event.time) || status.lastIGMPReport.After(event.time)) - exists := status.localMembers.Has(event.iface.InterfaceName) + _, exists := status.localMembers[event.iface.InterfaceName] assert.Truef(t, exists, "member is not added into cache") } else { assert.True(t, status.lastIGMPReport.Before(event.time)) - exists := status.localMembers.Has(event.iface.InterfaceName) + _, exists := status.localMembers[event.iface.InterfaceName] assert.Falsef(t, exists, "member is not removed from cache") } } @@ -324,12 +352,14 @@ func newMockMulticastController(t *testing.T) *Controller { controller := gomock.NewController(t) mockOFClient = openflowtest.NewMockClient(controller) mockIfaceStore = ifaceStoretest.NewMockInterfaceStore(controller) + mockIfaceStore.EXPECT().RegisterInterfaceDeletionHandler(gomock.Any()).Times(1) mockMulticastSocket = multicasttest.NewMockRouteInterface(controller) ovsClient = ovsconfigtest.NewMockOVSBridgeClient(controller) addr := &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addr} mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) - mctrl := NewMulticastController(mockOFClient, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient) + groupAllocator := openflow.NewGroupAllocator(false) + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient) return mctrl } diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go index 5f418513ce3..7d4b7fe91fc 100644 --- a/pkg/agent/multicast/mcast_discovery.go +++ b/pkg/agent/multicast/mcast_discovery.go @@ -131,7 +131,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { fallthrough case protocol.IGMPv2Report: mgroup := igmp.(*protocol.IGMPv1or2).GroupAddress - klog.InfoS("Received IGMPv1or2 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) + klog.V(2).InfoS("Received IGMPv1or2 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) event := &mcastGroupEvent{ group: mgroup, eType: groupJoin, @@ -143,7 +143,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { msg := igmp.(*protocol.IGMPv3MembershipReport) for _, gr := range msg.GroupRecords { mgroup := gr.MulticastAddress - klog.InfoS("Received IGMPv3 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName, "recordType", gr.Type, "sourceCount", gr.NumberOfSources) + klog.V(2).InfoS("Received IGMPv3 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName, "recordType", gr.Type, "sourceCount", gr.NumberOfSources) evtType := groupJoin if (gr.Type == protocol.IGMPIsIn || gr.Type == protocol.IGMPToIn) && gr.NumberOfSources == 0 { evtType = groupLeave @@ -159,7 +159,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { case protocol.IGMPv2LeaveGroup: mgroup := igmp.(*protocol.IGMPv1or2).GroupAddress - klog.InfoS("Received IGMPv2 Leave message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) + klog.V(2).InfoS("Received IGMPv2 Leave message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName) event := &mcastGroupEvent{ group: mgroup, eType: groupLeave, diff --git a/pkg/agent/multicast/mcast_route_test.go b/pkg/agent/multicast/mcast_route_test.go index 081171e7c00..b9f4c14d69c 100644 --- a/pkg/agent/multicast/mcast_route_test.go +++ b/pkg/agent/multicast/mcast_route_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -76,12 +77,12 @@ func TestProcessIGMPNocacheMsg(t *testing.T) { mRoute.internalInterfaceVIF = uint16(0) status1 := &GroupMemberStatus{ group: net.ParseIP("224.3.3.8"), - localMembers: sets.NewString("aa"), + localMembers: map[string]time.Time{"aa": time.Now()}, } mRoute.groupCache.Add(status1) status2 := &GroupMemberStatus{ group: net.ParseIP("224.3.3.9"), - localMembers: sets.NewString(), + localMembers: map[string]time.Time{}, } mRoute.groupCache.Add(status2) for _, m := range []struct { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index ef097e9d778..bbd73107722 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -94,9 +94,9 @@ type Client interface { // InstallServiceGroup installs a group for Service LB. Each endpoint // is a bucket of the group. For now, each bucket has the same weight. InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error - // UninstallServiceGroup removes the group and its buckets that are - // installed by InstallServiceGroup. - UninstallServiceGroup(groupID binding.GroupIDType) error + // UninstallGroup removes the group and its buckets that are + // installed by InstallServiceGroup or InstallMulticastGroup. + UninstallGroup(groupID binding.GroupIDType) error // InstallEndpointFlows installs flows for accessing Endpoints. // If an Endpoint is on the current Node, then flows for hairpin and endpoint @@ -292,7 +292,7 @@ type Client interface { InstallMulticastInitialFlows(pktInReason uint8) error // InstallMulticastFlow installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 // to ensure it can be forwarded to the external addresses. - InstallMulticastFlow(multicastIP net.IP) error + InstallMulticastFlow(multicastIP net.IP, groupID binding.GroupIDType) error // UninstallMulticastFlow removes the flow matching the given multicastIP. UninstallMulticastFlow(multicastIP net.IP) error // SendIGMPQueryPacketOut sends the IGMPQuery packet as a packet-out to OVS from the gateway port. @@ -301,6 +301,7 @@ type Client interface { dstIP net.IP, outPort uint32, igmp ofutil.Message) error + InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32) error } // GetFlowTableStatus returns an array of flow table status. @@ -554,7 +555,7 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff return nil } -func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error { +func (c *client) UninstallGroup(groupID binding.GroupIDType) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() if !c.bridge.DeleteGroup(groupID) { @@ -729,6 +730,11 @@ func (c *client) initialize() error { return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for TraceFlow packet-in rate limiting: %v", PacketInMeterIDTF, PacketInMeterRateTF, err) } } + if c.enableMulticast { + if err := c.ofEntryOperations.AddAll(c.featureMulticast.initialize(cookie.Default)); err != nil { + return fmt.Errorf("failed to install feature Multicast initial flows: %v", err) + } + } return nil } @@ -817,7 +823,7 @@ func (c *client) generatePipelines() { if c.enableMulticast { // TODO: add support for IPv6 protocol - c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}) + c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge) c.activatedFeatures = append(c.activatedFeatures, c.featureMulticast) } c.featureTraceflow = newFeatureTraceflow() @@ -918,6 +924,9 @@ func (c *client) ReplayFlows() { } c.featureService.replayGroups() + if c.enableMulticast { + c.featureMulticast.replayGroups() + } for _, activeFeature := range c.activatedFeatures { if err := c.ofEntryOperations.AddAll(activeFeature.replayFlows()); err != nil { @@ -1193,8 +1202,8 @@ func (c *client) InstallMulticastInitialFlows(pktInReason uint8) error { return c.addFlows(c.featureMulticast.mcastFlowCache, cacheKey, flows) } -func (c *client) InstallMulticastFlow(multicastIP net.IP) error { - flows := c.featureMulticast.localMulticastForwardFlow(multicastIP) +func (c *client) InstallMulticastFlow(multicastIP net.IP, groupID binding.GroupIDType) error { + flows := c.featureMulticast.localMulticastForwardFlow(multicastIP, groupID) cacheKey := fmt.Sprintf("multicast_%s", multicastIP.String()) c.replayMutex.RLock() defer c.replayMutex.RUnlock() @@ -1246,3 +1255,14 @@ func (c *client) InstallBridgeUplinkFlows() error { } return nil } + +func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + + targetPorts := append([]uint32{config.HostGatewayOFPort}, localReceivers...) + if err := c.featureMulticast.multicastReceiversGroup(groupID, targetPorts...); err != nil { + return err + } + return nil +} diff --git a/pkg/agent/openflow/framework.go b/pkg/agent/openflow/framework.go index c01edea5ca3..349932fdc2e 100644 --- a/pkg/agent/openflow/framework.go +++ b/pkg/agent/openflow/framework.go @@ -243,6 +243,7 @@ func (f *featureEgress) getRequiredTables() []*Table { func (f *featureMulticast) getRequiredTables() []*Table { return []*Table{ MulticastTable, + MulticastOutputTable, } } diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 63bd69736dd..2c2b22fa008 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -15,7 +15,11 @@ package openflow import ( + "fmt" "net" + "sync" + + "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/openflow/cookie" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -26,8 +30,10 @@ var _, mcastCIDR, _ = net.ParseCIDR("224.0.0.0/4") type featureMulticast struct { cookieAllocator cookie.Allocator ipProtocols []binding.Protocol + bridge binding.Bridge mcastFlowCache *flowCategoryCache + groupCache sync.Map category cookie.Category } @@ -36,12 +42,14 @@ func (f *featureMulticast) getFeatureName() string { return "Multicast" } -func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol) *featureMulticast { +func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge) *featureMulticast { return &featureMulticast{ cookieAllocator: cookieAllocator, ipProtocols: ipProtocols, + bridge: bridge, mcastFlowCache: newFlowCategoryCache(), category: cookie.Multicast, + groupCache: sync.Map{}, } } @@ -63,3 +71,45 @@ func (f *featureMulticast) replayFlows() []binding.Flow { // Get cached flows. return getCachedFlows(f.mcastFlowCache) } + +func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, ports ...uint32) error { + group := f.bridge.CreateGroupTypeAll(groupID).ResetBuckets() + for i := range ports { + group = group.Bucket(). + LoadToRegField(OFPortFoundRegMark.GetField(), OFPortFoundRegMark.GetValue()). + LoadToRegField(TargetOFPortField, ports[i]). + ResubmitToTable(MulticastTable.GetNext()). + Done() + } + if err := group.Add(); err != nil { + return fmt.Errorf("error when installing Multicast receiver Group: %w", err) + } + f.groupCache.Store(groupID, group) + return nil +} + +func (f *featureMulticast) multicastOutputFlows(cookieID uint64) binding.Flow { + return MulticastOutputTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchRegMark(OFPortFoundRegMark). + Action().OutputToRegField(TargetOFPortField). + Done() +} + +func (f *featureMulticast) initialize(category cookie.Category) []binding.Flow { + cookieID := f.cookieAllocator.Request(category).Raw() + return []binding.Flow{ + f.multicastOutputFlows(cookieID), + } +} + +func (f *featureMulticast) replayGroups() { + f.groupCache.Range(func(id, value interface{}) bool { + group := value.(binding.Group) + group.Reset() + if err := group.Add(); err != nil { + klog.Errorf("Error when replaying cached group %d: %v", id, err) + } + return true + }) +} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 6c8c3a79a9a..393bd1f789c 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -182,6 +182,9 @@ var ( // Tables in stageRouting: MulticastTable = newTable("Multicast", stageRouting, pipelineMulticast) + // Tables in stageOutput + MulticastOutputTable = newTable("MulticastOutput", stageOutput, pipelineMulticast) + // Flow priority level priorityHigh = uint16(210) priorityNormal = uint16(200) @@ -2652,14 +2655,13 @@ func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { // group and to the external receivers. For external multicast packets accessing to the given multicast IP also hits the // flow, and the packet is not sent back to Antrea gateway because OVS datapath will drop it when it finds the output // port is the same as the input port. -func (f *featureMulticast) localMulticastForwardFlow(multicastIP net.IP) []binding.Flow { +func (f *featureMulticast) localMulticastForwardFlow(multicastIP net.IP, groupID binding.GroupIDType) []binding.Flow { return []binding.Flow{ MulticastTable.ofTable.BuildFlow(priorityNormal). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchProtocol(binding.ProtocolIP). MatchDstIP(multicastIP). - Action().Output(config.HostGatewayOFPort). - Action().Normal(). + Action().Group(groupID). Done(), } } @@ -2673,7 +2675,9 @@ func (f *featureMulticast) externalMulticastReceiverFlow() binding.Flow { Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchProtocol(binding.ProtocolIP). MatchDstIPNet(*mcastCIDR). - Action().Output(config.HostGatewayOFPort). + Action().LoadRegMark(OFPortFoundRegMark). + Action().LoadToRegField(TargetOFPortField, config.HostGatewayOFPort). + Action().GotoTable(MulticastTable.GetNext()). Done() } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index a03e5047b4d..56d7547833c 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -366,17 +366,31 @@ func (mr *MockClientMockRecorder) InstallGatewayFlows() *gomock.Call { } // InstallMulticastFlow mocks base method -func (m *MockClient) InstallMulticastFlow(arg0 net.IP) error { +func (m *MockClient) InstallMulticastFlow(arg0 net.IP, arg1 openflow.GroupIDType) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallMulticastFlow", arg0) + ret := m.ctrl.Call(m, "InstallMulticastFlow", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // InstallMulticastFlow indicates an expected call of InstallMulticastFlow -func (mr *MockClientMockRecorder) InstallMulticastFlow(arg0 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallMulticastFlow(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastFlow", reflect.TypeOf((*MockClient)(nil).InstallMulticastFlow), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastFlow", reflect.TypeOf((*MockClient)(nil).InstallMulticastFlow), arg0, arg1) +} + +// InstallMulticastGroup mocks base method +func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticastGroup indicates an expected call of InstallMulticastGroup +func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1) } // InstallMulticastInitialFlows mocks base method @@ -695,6 +709,20 @@ func (mr *MockClientMockRecorder) UninstallEndpointFlows(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallEndpointFlows", reflect.TypeOf((*MockClient)(nil).UninstallEndpointFlows), arg0, arg1) } +// UninstallGroup mocks base method +func (m *MockClient) UninstallGroup(arg0 openflow.GroupIDType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallGroup", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallGroup indicates an expected call of UninstallGroup +func (mr *MockClientMockRecorder) UninstallGroup(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallGroup", reflect.TypeOf((*MockClient)(nil).UninstallGroup), arg0) +} + // UninstallMulticastFlow mocks base method func (m *MockClient) UninstallMulticastFlow(arg0 net.IP) error { m.ctrl.T.Helper() @@ -794,20 +822,6 @@ func (mr *MockClientMockRecorder) UninstallServiceFlows(arg0, arg1, arg2 interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallServiceFlows", reflect.TypeOf((*MockClient)(nil).UninstallServiceFlows), arg0, arg1, arg2) } -// UninstallServiceGroup mocks base method -func (m *MockClient) UninstallServiceGroup(arg0 openflow.GroupIDType) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UninstallServiceGroup", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// UninstallServiceGroup indicates an expected call of UninstallServiceGroup -func (mr *MockClientMockRecorder) UninstallServiceGroup(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallServiceGroup", reflect.TypeOf((*MockClient)(nil).UninstallServiceGroup), arg0) -} - // UninstallTraceflowFlows mocks base method func (m *MockClient) UninstallTraceflowFlows(arg0 byte) error { m.ctrl.T.Helper() diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 2ab8f7eeb7b..6dcb1a9dcc0 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -169,7 +169,7 @@ func (p *proxier) removeStaleServices() { // Remove Service group whose Endpoints are local. if svcInfo.NodeLocalExternal() { if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist { - if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil { + if err := p.ofClient.UninstallGroup(groupIDLocal); err != nil { klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName) continue } @@ -178,7 +178,7 @@ func (p *proxier) removeStaleServices() { } // Remove Service group which has all Endpoints. if groupID, exist := p.groupCounter.Get(svcPortName, false); exist { - if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { + if err := p.ofClient.UninstallGroup(groupID); err != nil { klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName) continue } @@ -469,7 +469,7 @@ func (p *proxier) installServices() { // group exists on OVS, and after it is uninstalled successfully, then the return value will be also // nil. if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist { - if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil { + if err := p.ofClient.UninstallGroup(groupIDLocal); err != nil { klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName) continue } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index f5685e3cb49..440707a3424 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -625,7 +625,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(1) fp.syncProxyRules() fp.serviceChanges.OnServiceUpdate(service, nil) @@ -1094,8 +1094,8 @@ func TestServicesWithSameEndpoints(t *testing.T) { mockOFClient.EXPECT().InstallServiceFlows(groupID2, svcIP2, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP1, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP2, uint16(svcPort), bindingProtocol).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(groupID1).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(groupID2).Times(1) + mockOFClient.EXPECT().UninstallGroup(groupID1).Times(1) + mockOFClient.EXPECT().UninstallGroup(groupID2).Times(1) // Since these two Services reference to the same Endpoint, there should only be one operation. mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index b6511edc0be..8c219e0f7dc 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -94,6 +94,7 @@ type Bridge interface { CreateTable(table Table, next uint8, missAction MissActionType) Table // AddTable adds table on the Bridge. Return true if the operation succeeds, otherwise return false. DeleteTable(id uint8) bool + CreateGroupTypeAll(id GroupIDType) Group CreateGroup(id GroupIDType) Group DeleteGroup(id GroupIDType) bool CreateMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index c1b5e11dffb..7acf5a9bbbc 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -189,8 +189,16 @@ type OFBridge struct { multipartReplyChs map[uint32]chan *openflow13.MultipartReply } +func (b *OFBridge) CreateGroupTypeAll(id GroupIDType) Group { + return b.createGroup(id, ofctrl.GroupAll) +} + func (b *OFBridge) CreateGroup(id GroupIDType) Group { - ofctrlGroup, err := b.ofSwitch.NewGroup(uint32(id), ofctrl.GroupSelect) + return b.createGroup(id, ofctrl.GroupSelect) +} + +func (b *OFBridge) createGroup(id GroupIDType, groupType ofctrl.GroupType) Group { + ofctrlGroup, err := b.ofSwitch.NewGroup(uint32(id), groupType) if err != nil { // group already exists ofctrlGroup = b.ofSwitch.GetGroup(uint32(id)) } diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index f79cafa2cb9..e162dd2885f 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -134,6 +134,20 @@ func (mr *MockBridgeMockRecorder) CreateGroup(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGroup", reflect.TypeOf((*MockBridge)(nil).CreateGroup), arg0) } +// CreateGroupTypeAll mocks base method +func (m *MockBridge) CreateGroupTypeAll(arg0 openflow.GroupIDType) openflow.Group { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateGroupTypeAll", arg0) + ret0, _ := ret[0].(openflow.Group) + return ret0 +} + +// CreateGroupTypeAll indicates an expected call of CreateGroupTypeAll +func (mr *MockBridgeMockRecorder) CreateGroupTypeAll(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGroupTypeAll", reflect.TypeOf((*MockBridge)(nil).CreateGroupTypeAll), arg0) +} + // CreateMeter mocks base method func (m *MockBridge) CreateMeter(arg0 openflow.MeterIDType, arg1 ofctrl.MeterFlag) openflow.Meter { m.ctrl.T.Helper() diff --git a/test/e2e/multicast_test.go b/test/e2e/multicast_test.go index 31a49751bbe..0ca3807f5e0 100644 --- a/test/e2e/multicast_test.go +++ b/test/e2e/multicast_test.go @@ -189,7 +189,7 @@ func runTestMulticastForwardToMultipleInterfaces(t *testing.T, data *TestData, s } } -func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces [][]string) { +func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string) { mcjoinWaitTimeout := defaultTimeout / time.Second gatewayInterface, err := data.GetGatewayInterfaceName(antreaNamespace) failOnError(err, t) @@ -209,10 +209,10 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc defer wg.Done() // The following command joins a multicast group and sets the timeout to 100 seconds(-W 100) before exit. // The command will return after receiving 1 packet(-c 1). - receiveMulticastCommand := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -c 1 -o -p %d -W %d %s", mc.port, mcjoinWaitTimeout, mc.group.String())} + receiveMulticastCommand := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -c 10 -o -p %d -W %d %s", mc.port, mcjoinWaitTimeout, mc.group.String())} res, _, err := data.RunCommandFromPod(testNamespace, r, mcjoinContainerName, receiveMulticastCommand) failOnError(err, t) - assert.Contains(t, res, "Total: 1 packets") + assert.Contains(t, res, "Total: 10 packets") }() } // Wait 2 seconds(-w 2) before sending multicast traffic. @@ -222,23 +222,32 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc data.RunCommandFromPod(testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) }() - if err := wait.Poll(5*time.Second, defaultTimeout, func() (bool, error) { - // Sender pods should add an outbound multicast route except running as HostNetwork. - _, mrouteResult, _, err := data.RunCommandOnNode(nodeName(mc.senderConfig.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s | grep '%s'", mc.group.String(), gatewayInterface, strings.Join(nodeMulticastInterfaces[mc.senderConfig.nodeIdx], " "))) - if err != nil { - return false, err - } - if !mc.senderConfig.isHostNetwork { - if len(mrouteResult) == 0 { - return false, nil + readyReceivers := sets.NewInt() + senderReady := false + if err := wait.Poll(3*time.Second, defaultTimeout, func() (bool, error) { + if !senderReady { + // Sender pods should add an outbound multicast route except running as HostNetwork. + _, mrouteResult, _, err := data.RunCommandOnNode(nodeName(mc.senderConfig.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s | grep '%s'", mc.group.String(), gatewayInterface, strings.Join(nodeMulticastInterfaces[mc.senderConfig.nodeIdx], " "))) + if err != nil { + return false, err } - } else { - if len(mrouteResult) != 0 { - return false, nil + if !mc.senderConfig.isHostNetwork { + if len(mrouteResult) == 0 { + return false, nil + } + } else { + if len(mrouteResult) != 0 { + return false, nil + } } + senderReady = true } + // Check inbound multicast route and whether multicast interfaces has joined the multicast group. for _, receiver := range mc.receiverConfigs { + if readyReceivers.Has(receiver.nodeIdx) { + continue + } for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiver.nodeIdx] { _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) if err != nil { @@ -272,6 +281,7 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc } } } + readyReceivers = readyReceivers.Insert(receiver.nodeIdx) } return true, nil }); err != nil { @@ -282,7 +292,7 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc // computeMulticastInterfaces computes multicastInterfaces for each node. // It returns [][]string with its index as node index and value as multicastInterfaces for this node. -func computeMulticastInterfaces(t *testing.T, data *TestData) ([][]string, error) { +func computeMulticastInterfaces(t *testing.T, data *TestData) (map[int][]string, error) { multicastInterfaces, err := data.GetMulticastInterfaces(antreaNamespace) if err != nil { return nil, err @@ -291,7 +301,7 @@ func computeMulticastInterfaces(t *testing.T, data *TestData) ([][]string, error if err != nil { t.Fatalf("Error getting transport interfaces: %v", err) } - nodeMulticastInterfaces := make([][]string, 0, len(clusterInfo.nodes)) + nodeMulticastInterfaces := make(map[int][]string) for nodeIdx := range clusterInfo.nodes { _, localInterfacesStr, _, err := data.RunCommandOnNode(nodeName(nodeIdx), "ls /sys/class/net") if err != nil { @@ -303,7 +313,7 @@ func computeMulticastInterfaces(t *testing.T, data *TestData) ([][]string, error externalMulticastInterfaces := localInterfacesSet.Intersection(multicastInterfaceSet) currNodeMulticastInterfaces := externalMulticastInterfaces.Insert(transportInterface).List() t.Logf("Multicast interfaces for node index %d is %+v", nodeIdx, currNodeMulticastInterfaces) - nodeMulticastInterfaces = append(nodeMulticastInterfaces, currNodeMulticastInterfaces) + nodeMulticastInterfaces[nodeIdx] = currNodeMulticastInterfaces } return nodeMulticastInterfaces, nil } diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 834af6536fd..98727c89fd3 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -716,7 +716,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint groupID := ofconfig.GroupIDType(gid) err := c.UninstallServiceFlows(svc.ip, svc.port, svc.protocol) assert.Nil(t, err) - err = c.UninstallServiceGroup(groupID) + err = c.UninstallGroup(groupID) assert.Nil(t, err) for _, ep := range endpointList { err := c.UninstallEndpointFlows(svc.protocol, ep) diff --git a/test/integration/ovs/ofctrl_test.go b/test/integration/ovs/ofctrl_test.go index b646cceeb39..73769a10e58 100644 --- a/test/integration/ovs/ofctrl_test.go +++ b/test/integration/ovs/ofctrl_test.go @@ -269,6 +269,8 @@ func TestOFctrlGroup(t *testing.T) { ovsCtlClient := ovsctl.NewClient(brName) + id := 1 + for name, buckets := range map[string][]struct { weight uint16 // Must have non-zero value. reg2reg [][4]uint32 // regNum, data, startIndex, endIndex @@ -278,12 +280,25 @@ func TestOFctrlGroup(t *testing.T) { {weight: 100, reg2reg: [][4]uint32{{0, 1, 0, 31}, {1, 2, 15, 31}}, resubmitTable: 31}, {weight: 110, resubmitTable: 42}, }, + "TypeAll": { + {weight: 100, reg2reg: [][4]uint32{{0, 1, 0, 31}, {1, 2, 15, 31}}, resubmitTable: 31}, + {weight: 110, resubmitTable: 42}, + }, } { t.Run(name, func(t *testing.T) { - group := br.CreateGroup(1) + var group binding.Group + gid := binding.GroupIDType(id) + if name == "TypeAll" { + group = br.CreateGroupTypeAll(gid) + } else { + group = br.CreateGroup(gid) + } for _, bucket := range buckets { require.NotZero(t, bucket.weight, "Weight value of a bucket must be specified") - bucketBuilder := group.Bucket().Weight(bucket.weight) + bucketBuilder := group.Bucket() + if name == "Normal" { + bucketBuilder = bucketBuilder.Weight(bucket.weight) + } if bucket.resubmitTable != 0 { bucketBuilder = bucketBuilder.ResubmitToTable(bucket.resubmitTable) } @@ -303,8 +318,10 @@ func TestOFctrlGroup(t *testing.T) { }), "Failed to install group") dumpedGroup := groups[0] for i, bucket := range buckets { - // Must have weight - assert.True(t, strings.Contains(dumpedGroup[i+1], fmt.Sprintf("weight:%d", bucket.weight))) + if name == "Normal" { + // Must have weight + assert.True(t, strings.Contains(dumpedGroup[i+1], fmt.Sprintf("weight:%d", bucket.weight))) + } for _, loading := range bucket.reg2reg { rngStr := "[]" if !(loading[2] == 0 && loading[3] == 31) { @@ -326,6 +343,7 @@ func TestOFctrlGroup(t *testing.T) { return len(groups) == 0, nil }), "Failed to delete group") }) + id++ } }