From be3c84164d7ddbc84b3a62f7ad6bd5b08e3f8b93 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 7 Mar 2017 13:06:56 -0500 Subject: [PATCH] Move multicast flow logic into ovsController, and fix a bug The table 111 -> table 120 rule only got added once UpdateVXLANMulticastFlows() got called, which only happened after a non-local node was added, so on a single-node cluster, multicast would never be correctly set up. (Also, simplify the rule in the "no remote nodes" case; there's no need to set tun_id when we aren't sending over VXLAN.) --- pkg/sdn/plugin/ovscontroller.go | 50 +++++++++++- pkg/sdn/plugin/ovscontroller_test.go | 115 +++++++++++++++++++++++++++ pkg/sdn/plugin/pod.go | 43 +++------- pkg/sdn/plugin/pod_test.go | 46 ----------- pkg/sdn/plugin/subnets.go | 14 +--- 5 files changed, 173 insertions(+), 95 deletions(-) diff --git a/pkg/sdn/plugin/ovscontroller.go b/pkg/sdn/plugin/ovscontroller.go index 930d9ebaade3..bf20b3e56d34 100644 --- a/pkg/sdn/plugin/ovscontroller.go +++ b/pkg/sdn/plugin/ovscontroller.go @@ -2,6 +2,7 @@ package plugin import ( "fmt" + "sort" "strings" "github.com/golang/glog" @@ -150,15 +151,15 @@ func (oc *ovsController) SetupOVS(clusterNetworkCIDR, serviceNetworkCIDR, localS // eg, "table=100, reg0=${tenant_id}, priority=2, ip, nw_dst=${external_cidr}, actions=drop otx.AddFlow("table=100, priority=0, actions=output:2") - // Table 110: outbound multicast filtering, updated by updateLocalMulticastFlows() in pod.go + // Table 110: outbound multicast filtering, updated by UpdateLocalMulticastFlows() // eg, "table=110, priority=100, reg0=${tenant_id}, actions=goto_table:111 otx.AddFlow("table=110, priority=0, actions=drop") - // Table 111: multicast delivery from local pods to the VXLAN; only one rule, updated by updateVXLANMulticastRules() in subnets.go + // Table 111: multicast delivery from local pods to the VXLAN; only one rule, updated by UpdateVXLANMulticastRules() // eg, "table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:${remote_node_ip_1}->tun_dst,output:1,set_field:${remote_node_ip_2}->tun_dst,output:1,goto_table:120" - otx.AddFlow("table=111, priority=0, actions=drop") + otx.AddFlow("table=111, priority=100, actions=goto_table:120") - // Table 120: multicast delivery to local pods (either from VXLAN or local pods); updated by updateLocalMulticastFlows() in pod.go + // Table 120: multicast delivery to local pods (either from VXLAN or local pods); updated by UpdateLocalMulticastFlows() // eg, "table=120, priority=100, reg0=${tenant_id}, actions=output:${ovs_port_1},output:${ovs_port_2}" otx.AddFlow("table=120, priority=0, actions=drop") @@ -304,3 +305,44 @@ func generateBaseAddServiceRule(IP string, protocol kapi.Protocol, port int) (st } return generateBaseServiceRule(IP) + dst, nil } + +func (oc *ovsController) UpdateLocalMulticastFlows(vnid uint32, enabled bool, ofports []int) error { + otx := oc.ovs.NewTransaction() + + if enabled { + otx.AddFlow("table=110, reg0=%d, actions=goto_table:111", vnid) + } else { + otx.DeleteFlows("table=110, reg0=%d", vnid) + } + + var actions []string + if enabled && len(ofports) > 0 { + actions = make([]string, len(ofports)) + for i, ofport := range ofports { + actions[i] = fmt.Sprintf("output:%d", ofport) + } + sort.Strings(actions) + otx.AddFlow("table=120, priority=100, reg0=%d, actions=%s", vnid, strings.Join(actions, ",")) + } else { + otx.DeleteFlows("table=120, reg0=%d", vnid) + } + + return otx.EndTransaction() +} + +func (oc *ovsController) UpdateVXLANMulticastFlows(remoteIPs []string) error { + otx := oc.ovs.NewTransaction() + + if len(remoteIPs) > 0 { + actions := make([]string, len(remoteIPs)) + for i, ip := range remoteIPs { + actions[i] = fmt.Sprintf("set_field:%s->tun_dst,output:1", ip) + } + sort.Strings(actions) + otx.AddFlow("table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],%s,goto_table:120", strings.Join(actions, ",")) + } else { + otx.AddFlow("table=111, priority=100, actions=goto_table:120") + } + + return otx.EndTransaction() +} diff --git a/pkg/sdn/plugin/ovscontroller_test.go b/pkg/sdn/plugin/ovscontroller_test.go index 9dc16ac5ef7c..4d5c3e9a4d35 100644 --- a/pkg/sdn/plugin/ovscontroller_test.go +++ b/pkg/sdn/plugin/ovscontroller_test.go @@ -213,6 +213,121 @@ func TestOVSService(t *testing.T) { } } +func TestOVSMulticast(t *testing.T) { + ovsif, oc, origFlows := setup(t) + + // local flows + err := oc.UpdateLocalMulticastFlows(99, true, []int{4, 5, 6}) + if err != nil { + t.Fatalf("Unexpected error adding multicast flows: %v", err) + } + flows, err := ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowAdded, + match: []string{"table=110", "reg0=99", "goto_table:111"}, + }, + flowChange{ + kind: flowAdded, + match: []string{"table=120", "reg0=99", "output:4,output:5,output:6"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows) + } + + err = oc.UpdateLocalMulticastFlows(88, false, []int{7, 8}) + if err != nil { + t.Fatalf("Unexpected error adding multicast flows: %v", err) + } + lastFlows := flows + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(lastFlows, flows) // no changes + if err != nil { + t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows) + } + + err = oc.UpdateLocalMulticastFlows(99, false, []int{4, 5}) + if err != nil { + t.Fatalf("Unexpected error adding multicast flows: %v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows) // no changes + if err != nil { + t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows) + } + + // VXLAN + err = oc.UpdateVXLANMulticastFlows([]string{"192.168.1.2", "192.168.1.5", "192.168.1.3"}) + if err != nil { + t.Fatalf("Unexpected error adding multicast flows: %v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowRemoved, + match: []string{"table=111", "goto_table:120"}, + noMatch: []string{"->tun_dst"}, + }, + flowChange{ + kind: flowAdded, + match: []string{"table=111", "192.168.1.2->tun_dst", "192.168.1.3->tun_dst", "192.168.1.5->tun_dst"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows) + } + + err = oc.UpdateVXLANMulticastFlows([]string{"192.168.1.5", "192.168.1.3"}) + if err != nil { + t.Fatalf("Unexpected error adding multicast flows: %v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowRemoved, + match: []string{"table=111", "goto_table:120"}, + noMatch: []string{"->tun_dst"}, + }, + flowChange{ + kind: flowAdded, + match: []string{"table=111", "192.168.1.3->tun_dst", "192.168.1.5->tun_dst"}, + noMatch: []string{"192.168.1.2"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows) + } + + err = oc.UpdateVXLANMulticastFlows([]string{}) + if err != nil { + t.Fatalf("Unexpected error adding multicast flows: %v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows) // no changes + if err != nil { + t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows) + } +} + var enp1 = osapi.EgressNetworkPolicy{ TypeMeta: kapiunversioned.TypeMeta{ Kind: "EgressNetworkPolicy", diff --git a/pkg/sdn/plugin/pod.go b/pkg/sdn/plugin/pod.go index c84356acce7c..8110c2b5fff4 100644 --- a/pkg/sdn/plugin/pod.go +++ b/pkg/sdn/plugin/pod.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "net" - "sort" "sync" "github.com/openshift/origin/pkg/sdn/plugin/cniserver" @@ -181,44 +180,20 @@ func (m *podManager) handleCNIRequest(request *cniserver.PodRequest) ([]byte, er return result.Response, result.Err } -func localMulticastOutputs(runningPods map[string]*runningPod, vnid uint32) string { +func (m *podManager) updateLocalMulticastRulesWithLock(vnid uint32) { var ofports []int - for _, pod := range runningPods { - if pod.vnid == vnid { - ofports = append(ofports, pod.ofport) + enabled := m.policy.GetMulticastEnabled(vnid) + if enabled { + for _, pod := range m.runningPods { + if pod.vnid == vnid { + ofports = append(ofports, pod.ofport) + } } } - if len(ofports) == 0 { - return "" - } - sort.Ints(ofports) - outputs := "" - for _, ofport := range ofports { - if len(outputs) > 0 { - outputs += "," - } - outputs += fmt.Sprintf("output:%d", ofport) - } - return outputs -} - -func (m *podManager) updateLocalMulticastRulesWithLock(vnid uint32) { - var outputs string - otx := m.oc.NewTransaction() - if m.policy.GetMulticastEnabled(vnid) { - outputs = localMulticastOutputs(m.runningPods, vnid) - otx.AddFlow("table=110, reg0=%d, actions=goto_table:111", vnid) - } else { - otx.DeleteFlows("table=110, reg0=%d", vnid) - } - if len(outputs) > 0 { - otx.AddFlow("table=120, priority=100, reg0=%d, actions=%s", vnid, outputs) - } else { - otx.DeleteFlows("table=120, reg0=%d", vnid) - } - if err := otx.EndTransaction(); err != nil { + if err := m.oc.UpdateLocalMulticastFlows(vnid, enabled, ofports); err != nil { glog.Errorf("Error updating OVS multicast flows for VNID %d: %v", vnid, err) + } } diff --git a/pkg/sdn/plugin/pod_test.go b/pkg/sdn/plugin/pod_test.go index 750b76c77677..c7342caf9de6 100644 --- a/pkg/sdn/plugin/pod_test.go +++ b/pkg/sdn/plugin/pod_test.go @@ -487,49 +487,3 @@ func TestDirectPodUpdate(t *testing.T) { t.Fatalf("failed to update pod: %v", err) } } - -func TestUpdateMulticastFlows(t *testing.T) { - pods := map[string]*runningPod{ - "blah": { - vnid: 5, - ofport: 2, - }, - "baz": { - vnid: 5, - ofport: 8, - }, - "foobar": { - vnid: 5, - ofport: 7, - }, - "blah2": { - vnid: 6, - ofport: 3, - }, - "baz2": { - vnid: 6, - ofport: 9, - }, - "bork": { - vnid: 8, - ofport: 10, - }, - } - - outputs := localMulticastOutputs(pods, 0) - if outputs != "" { - t.Fatalf("Unexpected outputs for vnid 0: %s", outputs) - } - outputs = localMulticastOutputs(pods, 5) - if outputs != "output:2,output:7,output:8" { - t.Fatalf("Unexpected outputs for vnid 5: %s", outputs) - } - outputs = localMulticastOutputs(pods, 6) - if outputs != "output:3,output:9" { - t.Fatalf("Unexpected outputs for vnid 6: %s", outputs) - } - outputs = localMulticastOutputs(pods, 8) - if outputs != "output:10" { - t.Fatalf("Unexpected outputs for vnid 0: %s", outputs) - } -} diff --git a/pkg/sdn/plugin/subnets.go b/pkg/sdn/plugin/subnets.go index 673898526c9a..f5bc1f558943 100644 --- a/pkg/sdn/plugin/subnets.go +++ b/pkg/sdn/plugin/subnets.go @@ -3,9 +3,7 @@ package plugin import ( "fmt" "net" - "sort" "strconv" - "strings" log "github.com/golang/glog" @@ -270,19 +268,13 @@ func (master *OsdnMaster) watchSubnets() { type hostSubnetMap map[string]*osapi.HostSubnet func (plugin *OsdnNode) updateVXLANMulticastRules(subnets hostSubnetMap) { - otx := plugin.oc.NewTransaction() - - // Build the list of all nodes for multicast forwarding - tun_dsts := make([]string, 0, len(subnets)) + remoteIPs := make([]string, 0, len(subnets)-1) for _, subnet := range subnets { if subnet.HostIP != plugin.localIP { - tun_dsts = append(tun_dsts, fmt.Sprintf(",set_field:%s->tun_dst,output:1", subnet.HostIP)) + remoteIPs = append(remoteIPs, subnet.HostIP) } } - sort.Strings(tun_dsts) - otx.AddFlow("table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31]%s,goto_table:120", strings.Join(tun_dsts, "")) - - if err := otx.EndTransaction(); err != nil { + if err := plugin.oc.UpdateVXLANMulticastFlows(remoteIPs); err != nil { log.Errorf("Error updating OVS VXLAN multicast flows: %v", err) } }