Skip to content

Commit

Permalink
[Multicast] Use group as flow actions for multicast traffic
Browse files Browse the repository at this point in the history
Add local pod receivers into an OpenFlow "all" type group for each
multicast group, and use that group in the flow action. Remove the pod
from group buckets if the pod has left the multicast group or is
deleted.

Signed-off-by: wenyingd <wenyingd@vmware.com>
  • Loading branch information
wenyingd committed Mar 23, 2022
1 parent 2c7d486 commit 6becda3
Show file tree
Hide file tree
Showing 19 changed files with 310 additions and 68 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ func run(o *Options) error {
}
mcastController := multicast.NewMulticastController(
ofClient,
v4GroupIDAllocator,
nodeConfig,
ifaceStore,
multicastSocket,
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func (pc *podConfigurator) removeInterfaces(containerID string) error {
if err := pc.routeClient.DeleteLocalAntreaFlexibleIPAMPodRule(containerConfig.IPs); err != nil {
return err
}

return nil
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/agent/interfacestore/interface_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ const (
// Todo: add periodic task to sync local cache with container veth pair

type interfaceCache struct {
cache cache.Indexer
cache cache.Indexer
podInterfaceDeletionHandlers []chan *InterfaceConfig
}

func (c *interfaceCache) Initialize(interfaces []*InterfaceConfig) {
Expand Down Expand Up @@ -107,6 +108,9 @@ func (c *interfaceCache) DeleteInterface(interfaceConfig *InterfaceConfig) {
c.cache.Delete(interfaceConfig)

if interfaceConfig.Type == ContainerInterface {
for _, h := range c.podInterfaceDeletionHandlers {
h <- interfaceConfig
}
metrics.PodCount.Dec()
}
}
Expand Down Expand Up @@ -208,6 +212,10 @@ func (c *interfaceCache) GetInterfaceByOFPort(ofPort uint32) (*InterfaceConfig,
return interfaceConfigs[0].(*InterfaceConfig), true
}

func (c *interfaceCache) RegisterInterfaceDeletionHandler(handler chan *InterfaceConfig) {
c.podInterfaceDeletionHandlers = append(c.podInterfaceDeletionHandlers, handler)
}

func interfaceNameIndexFunc(obj interface{}) ([]string, error) {
interfaceConfig := obj.(*InterfaceConfig)
return []string{interfaceConfig.InterfaceName}, nil
Expand Down
14 changes: 13 additions & 1 deletion pkg/agent/interfacestore/testing/mock_interfacestore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/agent/interfacestore/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type InterfaceStore interface {
GetInterfacesByType(interfaceType InterfaceType) []*InterfaceConfig
Len() int
GetInterfaceKeysByType(interfaceType InterfaceType) []string
RegisterInterfaceDeletionHandler(handler chan *InterfaceConfig)
}

// NewContainerInterface creates InterfaceConfig for a Pod.
Expand Down
109 changes: 89 additions & 20 deletions pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
)

Expand Down Expand Up @@ -63,6 +64,7 @@ type GroupMemberStatus struct {
localMembers sets.String
lastIGMPReport time.Time
mutex sync.RWMutex
ofGroupID binding.GroupIDType
}

// eventHandler process the multicast Group membership report or leave messages.
Expand All @@ -83,6 +85,7 @@ func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) {
group: e.group,
lastIGMPReport: e.time,
localMembers: sets.NewString(e.iface.InterfaceName),
ofGroupID: c.v4GroupAllocator.Allocate(),
}
c.groupCache.Add(status)
c.queue.Add(e.group.String())
Expand Down Expand Up @@ -111,9 +114,9 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent
if !exist {
newStatus.localMembers.Insert(e.iface.InterfaceName)
klog.InfoS("Added member to multicast group", "group", e.group.String(), "member", e.iface.InterfaceName)
c.queue.Add(newStatus.group.String())
}
c.groupCache.Update(newStatus)
c.queue.Add(newStatus.group.String())
case groupLeave:
if exist {
newStatus.localMembers.Delete(e.iface.InterfaceName)
Expand All @@ -122,6 +125,8 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent
if len(newStatus.localMembers) == 0 {
klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName)
c.checkLastMember(e.group)
} else {
c.queue.Add(newStatus.group.String())
}
}
}
Expand Down Expand Up @@ -154,37 +159,61 @@ func (c *Controller) clearStaleGroups() {
}
}

// removeLocalInterface searches the GroupMemberStatus which the deleted interface has joined, and then trigger a member
// leave event so that antrea can remove the corresponding interface from local multicast receivers on OVS. This function
// should be called if the removed Pod receiver fails to send IGMP leave message before deletion.
func (c *Controller) removeLocalInterface(ifConfig *interfacestore.InterfaceConfig) {
groupStatuses := c.getGroupMemberStatusesByPod(ifConfig.InterfaceName)
for _, g := range groupStatuses {
event := &mcastGroupEvent{
group: g.group,
eType: groupLeave,
time: time.Now(),
iface: ifConfig,
}
c.groupEventCh <- event
}
}

type Controller struct {
ofClient openflow.Client
nodeConfig *config.NodeConfig
igmpSnooper *IGMPSnooper
groupEventCh chan *mcastGroupEvent
groupCache cache.Indexer
queue workqueue.RateLimitingInterface
ofClient openflow.Client
v4GroupAllocator openflow.GroupAllocator
ifaceStore interfacestore.InterfaceStore
nodeConfig *config.NodeConfig
igmpSnooper *IGMPSnooper
groupEventCh chan *mcastGroupEvent
groupCache cache.Indexer
queue workqueue.RateLimitingInterface
// installedGroups saves the groups which are configured on both OVS and the host.
installedGroups sets.String
installedGroupsMutex sync.RWMutex
mRouteClient *MRouteClient
ovsBridgeClient ovsconfig.OVSBridgeClient
podDeletionCh chan *interfacestore.InterfaceConfig
}

func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, multicastSocket RouteInterface, multicastInterfaces sets.String, ovsBridgeClient ovsconfig.OVSBridgeClient) *Controller {
func NewMulticastController(ofClient openflow.Client, v4GroupAllocator openflow.GroupAllocator, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, multicastSocket RouteInterface, multicastInterfaces sets.String, ovsBridgeClient ovsconfig.OVSBridgeClient) *Controller {
eventCh := make(chan *mcastGroupEvent, workerCount)
groupSnooper := newSnooper(ofClient, ifaceStore, eventCh)
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
podInterfaceIndex: podInterfaceIndexFunc,
})
multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces)
podDeletionCh := make(chan *interfacestore.InterfaceConfig, 10)
ifaceStore.RegisterInterfaceDeletionHandler(podDeletionCh)
return &Controller{
ofClient: ofClient,
nodeConfig: nodeConfig,
igmpSnooper: groupSnooper,
groupEventCh: eventCh,
groupCache: groupCache,
installedGroups: sets.NewString(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"),
mRouteClient: multicastRouteClient,
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
ifaceStore: ifaceStore,
v4GroupAllocator: v4GroupAllocator,
nodeConfig: nodeConfig,
igmpSnooper: groupSnooper,
groupEventCh: eventCh,
groupCache: groupCache,
installedGroups: sets.NewString(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"),
mRouteClient: multicastRouteClient,
ovsBridgeClient: ovsBridgeClient,
podDeletionCh: podDeletionCh,
}
}

Expand Down Expand Up @@ -222,6 +251,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
// Periodically check the group member status, and remove the groups in which no members exist
go wait.NonSlidingUntil(c.clearStaleGroups, queryInterval, stopCh)

go c.podDeletionHandler(stopCh)
go c.eventHandler(stopCh)

for i := 0; i < int(workerCount); i++ {
Expand Down Expand Up @@ -298,15 +328,30 @@ func (c *Controller) syncGroup(groupKey string) error {
return nil
}
status := obj.(*GroupMemberStatus)
memberPorts := make([]uint32, len(status.localMembers))
status.mutex.Lock()
defer status.mutex.Unlock()
for i := range status.localMembers.List() {
memberInterfaceName := status.localMembers.List()[i]
obj, found := c.ifaceStore.GetInterfaceByName(memberInterfaceName)
if !found {
klog.InfoS("Failed to find interface from cache", "interface", memberInterfaceName)
continue
}
memberPorts[i] = uint32(obj.OFPort)
}
if c.groupHasInstalled(groupKey) {
status.mutex.Lock()
defer status.mutex.Unlock()
if c.groupIsStale(status) {
// Remove the multicast flow entry if no local Pod is in the group.
if err := c.ofClient.UninstallMulticastFlow(status.group); err != nil {
klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey)
return err
}
// Remove the multicast flow entry if no local Pod is in the group.
if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil {
klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey)
return err
}
err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group)
if err != nil {
klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey)
Expand All @@ -320,10 +365,22 @@ func (c *Controller) syncGroup(groupKey string) error {
c.delInstalledGroup(groupKey)
c.groupCache.Delete(status)
klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey)
return nil
}
// Reinstall OpenFlow group because the local pod receivers have changed.
if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil {
return err
}
klog.V(2).InfoS("Updated OpenFlow group for local receivers for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts)
return nil
}
if err := c.ofClient.InstallMulticastFlow(status.group); err != nil {
// Install OpenFlow group for a new multicast group in which local pod receivers have joined.
if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil {
return err
}
klog.V(2).InfoS("Installed OpenFlow group for local receivers for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts)
// Install OpenFlow flow to forward packets to local pod receivers which are included in the group.
if err := c.ofClient.InstallMulticastFlow(status.group, status.ofGroupID); err != nil {
klog.ErrorS(err, "Failed to install multicast flows", "group", status.group)
return err
}
Expand Down Expand Up @@ -377,6 +434,18 @@ func (c *Controller) addOrUpdateGroupEvent(e *mcastGroupEvent) {
c.updateGroupMemberStatus(obj, e)
}
}
return
}

func (c *Controller) podDeletionHandler(stopCh <-chan struct{}) {
for {
select {
case e := <-c.podDeletionCh:
c.removeLocalInterface(e)
case <-stopCh:
return
}
}
}

func podInterfaceIndexFunc(obj interface{}) ([]string, error) {
Expand Down
26 changes: 23 additions & 3 deletions pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"antrea.io/antrea/pkg/agent/interfacestore"
ifaceStoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
multicasttest "antrea.io/antrea/pkg/agent/multicast/testing"
"antrea.io/antrea/pkg/agent/openflow"
openflowtest "antrea.io/antrea/pkg/agent/openflow/testing"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
)
Expand All @@ -50,11 +51,17 @@ var (
Type: interfacestore.ContainerInterface,
InterfaceName: "if1",
IPs: []net.IP{net.ParseIP("192.168.1.1")},
OVSPortConfig: &interfacestore.OVSPortConfig{
OFPort: 1,
},
}
if2 = &interfacestore.InterfaceConfig{
Type: interfacestore.ContainerInterface,
InterfaceName: "if2",
IPs: []net.IP{net.ParseIP("192.168.1.2")},
OVSPortConfig: &interfacestore.OVSPortConfig{
OFPort: 2,
},
}
nodeIf1IP = net.ParseIP("192.168.20.22")
externalInterfaceIP = net.ParseIP("192.168.50.23")
Expand Down Expand Up @@ -82,7 +89,9 @@ func TestAddGroupMemberStatus(t *testing.T) {
key, ok := obj.(string)
assert.True(t, ok)
assert.Equal(t, mgroup.String(), key)
mockOFClient.EXPECT().InstallMulticastFlow(mgroup).Times(1)
mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true)
mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any())
mockOFClient.EXPECT().InstallMulticastFlow(mgroup, gomock.Any()).Times(1)
mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1)
err = mctrl.syncGroup(key)
assert.Nil(t, err)
Expand Down Expand Up @@ -167,6 +176,9 @@ func TestCheckLastMember(t *testing.T) {
}
mctrl.queue.Forget(obj)
}
mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true).Times(1)
mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()).Times(1)
mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(2)
mockOFClient.EXPECT().UninstallMulticastFlow(gomock.Any()).Times(2)
for _, tc := range []struct {
ev *mcastGroupEvent
Expand Down Expand Up @@ -194,7 +206,6 @@ func TestClearStaleGroups(t *testing.T) {
mctrl.worker()
wg.Done()
}()

now := time.Now()
validGroups := []*GroupMemberStatus{
{
Expand Down Expand Up @@ -225,11 +236,18 @@ func TestClearStaleGroups(t *testing.T) {
assert.Nil(t, err)
mctrl.addInstalledGroup(g.group.String())
}
fakePort := int32(1)
for _, g := range staleGroups {
err := mctrl.groupCache.Add(g)
assert.Nil(t, err)
mctrl.addInstalledGroup(g.group.String())
for _, m := range g.localMembers.List() {
mockIface := &interfacestore.InterfaceConfig{InterfaceName: m, OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: fakePort}}
mockIfaceStore.EXPECT().GetInterfaceByName(m).Return(mockIface, true)
fakePort++
}
}
mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(len(staleGroups))
mockOFClient.EXPECT().UninstallMulticastFlow(gomock.Any()).Times(len(staleGroups))
mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(len(staleGroups))
mctrl.clearStaleGroups()
Expand Down Expand Up @@ -324,12 +342,14 @@ func newMockMulticastController(t *testing.T) *Controller {
controller := gomock.NewController(t)
mockOFClient = openflowtest.NewMockClient(controller)
mockIfaceStore = ifaceStoretest.NewMockInterfaceStore(controller)
mockIfaceStore.EXPECT().RegisterInterfaceDeletionHandler(gomock.Any()).Times(1)
mockMulticastSocket = multicasttest.NewMockRouteInterface(controller)
ovsClient = ovsconfigtest.NewMockOVSBridgeClient(controller)
addr := &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}
nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addr}
mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any(), gomock.Any()).Times(1)
mctrl := NewMulticastController(mockOFClient, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient)
groupAllocator := openflow.NewGroupAllocator(false)
mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient)
return mctrl
}

Expand Down
Loading

0 comments on commit 6becda3

Please sign in to comment.