From 2b089acf103390afd1f9d7899fd19f36fccc7969 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sun, 24 Jun 2018 10:46:46 -0400 Subject: [PATCH] Track HostSubnet.EgressCIDR, assign Egress IPs from it on the master --- pkg/network/common/egressip.go | 154 ++++++++++++++++- pkg/network/common/egressip_test.go | 246 ++++++++++++++++++++++++++++ pkg/network/master/egressip.go | 95 +++++++++++ pkg/network/master/master.go | 3 + pkg/network/node/egressip.go | 3 + 5 files changed, 492 insertions(+), 9 deletions(-) create mode 100644 pkg/network/master/egressip.go diff --git a/pkg/network/common/egressip.go b/pkg/network/common/egressip.go index 05f29adcb46c..73217b9166d7 100644 --- a/pkg/network/common/egressip.go +++ b/pkg/network/common/egressip.go @@ -18,10 +18,15 @@ import ( ) type nodeEgress struct { - nodeIP string - sdnIP string - requestedIPs sets.String - offline bool + nodeName string + nodeIP string + sdnIP string + + requestedIPs sets.String + requestedCIDRs sets.String + parsedCIDRs map[string]*net.IPNet + + offline bool } type namespaceEgress struct { @@ -32,7 +37,8 @@ type namespaceEgress struct { } type egressIPInfo struct { - ip string + ip string + parsed net.IP nodes []*nodeEgress namespaces []*namespaceEgress @@ -48,6 +54,8 @@ type EgressIPWatcher interface { SetNamespaceEgressNormal(vnid uint32) SetNamespaceEgressDropped(vnid uint32) SetNamespaceEgressViaEgressIP(vnid uint32, egressIP, nodeIP string) + + UpdateEgressCIDRs() } type EgressIPTracker struct { @@ -58,9 +66,11 @@ type EgressIPTracker struct { nodesByNodeIP map[string]*nodeEgress namespacesByVNID map[uint32]*namespaceEgress egressIPs map[string]*egressIPInfo + nodesWithCIDRs int changedEgressIPs map[*egressIPInfo]bool changedNamespaces map[*namespaceEgress]bool + updateEgressCIDRs bool } func NewEgressIPTracker(watcher EgressIPWatcher) *EgressIPTracker { @@ -84,7 +94,7 @@ func (eit *EgressIPTracker) Start(hostSubnetInformer networkinformers.HostSubnet func (eit *EgressIPTracker) ensureEgressIPInfo(egressIP string) *egressIPInfo { eg := eit.egressIPs[egressIP] if eg == nil { - eg = &egressIPInfo{ip: egressIP} + eg = &egressIPInfo{ip: egressIP, parsed: net.ParseIP(egressIP)} eit.egressIPs[egressIP] = eg } return eg @@ -177,10 +187,11 @@ func (eit *EgressIPTracker) UpdateHostSubnetEgress(hs *networkapi.HostSubnet) { node := eit.nodesByNodeIP[hs.HostIP] if node == nil { - if len(hs.EgressIPs) == 0 { + if len(hs.EgressIPs) == 0 && len(hs.EgressCIDRs) == 0 { return } node = &nodeEgress{ + nodeName: hs.Host, nodeIP: hs.HostIP, sdnIP: sdnIP, requestedIPs: sets.NewString(), @@ -189,10 +200,27 @@ func (eit *EgressIPTracker) UpdateHostSubnetEgress(hs *networkapi.HostSubnet) { } else if len(hs.EgressIPs) == 0 { delete(eit.nodesByNodeIP, hs.HostIP) } - oldRequestedIPs := node.requestedIPs - node.requestedIPs = sets.NewString(hs.EgressIPs...) + + // Process EgressCIDRs + newRequestedCIDRs := sets.NewString(hs.EgressCIDRs...) + if !node.requestedCIDRs.Equal(newRequestedCIDRs) { + if len(hs.EgressCIDRs) == 0 { + eit.nodesWithCIDRs-- + } else if node.requestedCIDRs.Len() == 0 { + eit.nodesWithCIDRs++ + } + node.requestedCIDRs = newRequestedCIDRs + node.parsedCIDRs = make(map[string]*net.IPNet) + for _, cidr := range hs.EgressCIDRs { + _, parsed, _ := net.ParseCIDR(cidr) + node.parsedCIDRs[cidr] = parsed + } + eit.updateEgressCIDRs = true + } // Process new and removed EgressIPs + oldRequestedIPs := node.requestedIPs + node.requestedIPs = sets.NewString(hs.EgressIPs...) for _, ip := range node.requestedIPs.Difference(oldRequestedIPs).UnsortedList() { eit.addNodeEgressIP(node, ip) } @@ -301,6 +329,13 @@ func (eit *EgressIPTracker) syncEgressIPs() { for ns := range changedNamespaces { eit.syncEgressNamespaceState(ns) } + + if eit.updateEgressCIDRs { + eit.updateEgressCIDRs = false + if eit.nodesWithCIDRs > 0 { + eit.watcher.UpdateEgressCIDRs() + } + } } func (eit *EgressIPTracker) syncEgressNodeState(eg *egressIPInfo, active bool) { @@ -313,6 +348,10 @@ func (eit *EgressIPTracker) syncEgressNodeState(eg *egressIPInfo, active bool) { eit.watcher.ReleaseEgressIP(eg.ip, eg.assignedNodeIP) eg.assignedNodeIP = "" } + + if eg.assignedNodeIP == "" { + eit.updateEgressCIDRs = true + } } func (eit *EgressIPTracker) syncEgressNamespaceState(ns *namespaceEgress) { @@ -402,3 +441,100 @@ func (eit *EgressIPTracker) Ping(ip string, timeout time.Duration) bool { return true } } + +// Finds the best node to allocate the egress IP to, given the existing allocation. The +// boolean return value indicates whether multiple nodes could host the IP. +func (eit *EgressIPTracker) findEgressIPAllocation(ip net.IP, allocation map[string][]string) (string, bool) { + bestNode := "" + otherNodes := false + + for _, node := range eit.nodesByNodeIP { + egressIPs, exists := allocation[node.nodeName] + if !exists { + continue + } + for _, parsed := range node.parsedCIDRs { + if parsed.Contains(ip) { + if bestNode != "" { + otherNodes = true + if len(allocation[bestNode]) < len(egressIPs) { + continue + } + } + bestNode = node.nodeName + break + } + } + } + + return bestNode, otherNodes +} + +// ReallocateEgressIPs returns a map from Node name to array-of-Egress-IP. Unchanged nodes are not included. +func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string { + eit.Lock() + defer eit.Unlock() + + allocation := make(map[string][]string) + changed := make(map[string]bool) + for _, node := range eit.nodesByNodeIP { + if len(node.parsedCIDRs) > 0 { + allocation[node.nodeName] = make([]string, 0, node.requestedIPs.Len()) + } + } + // For each active egress IP, if it still fits within some egress CIDR on its node, + // add it to that node's allocation. (Otherwise add the node to the "changed" map, + // since we'll be removing this egress IP from it.) + for egressIP, eip := range eit.egressIPs { + if eip.assignedNodeIP == "" { + continue + } + node := eip.nodes[0] + found := false + for _, parsed := range node.parsedCIDRs { + if parsed.Contains(eip.parsed) { + found = true + break + } + } + if found { + allocation[node.nodeName] = append(allocation[node.nodeName], egressIP) + } else { + changed[node.nodeName] = true + } + } + + // Allocate pending egress IPs that can only go to a single node + alreadyAllocated := make(map[string]bool) + for egressIP, eip := range eit.egressIPs { + if eip.assignedNodeIP != "" { + alreadyAllocated[egressIP] = true + continue + } + nodeName, otherNodes := eit.findEgressIPAllocation(eip.parsed, allocation) + if nodeName != "" && !otherNodes { + allocation[nodeName] = append(allocation[nodeName], egressIP) + changed[nodeName] = true + alreadyAllocated[egressIP] = true + } + } + // Allocate any other pending egress IPs that we can + for egressIP, eip := range eit.egressIPs { + if alreadyAllocated[egressIP] { + continue + } + nodeName, _ := eit.findEgressIPAllocation(eip.parsed, allocation) + if nodeName != "" { + allocation[nodeName] = append(allocation[nodeName], egressIP) + changed[nodeName] = true + } + } + + // Remove unchanged nodes from the return value + for _, node := range eit.nodesByNodeIP { + if !changed[node.nodeName] { + delete(allocation, node.nodeName) + } + } + return allocation +} diff --git a/pkg/network/common/egressip_test.go b/pkg/network/common/egressip_test.go index bdb9db0dbcd6..427c62de30c9 100644 --- a/pkg/network/common/egressip_test.go +++ b/pkg/network/common/egressip_test.go @@ -31,6 +31,10 @@ func (w *testEIPWatcher) SetNamespaceEgressViaEgressIP(vnid uint32, egressIP, no w.changes = append(w.changes, fmt.Sprintf("namespace %d via %s on %s", int(vnid), egressIP, nodeIP)) } +func (w *testEIPWatcher) UpdateEgressCIDRs() { + w.changes = append(w.changes, "update egress CIDRs") +} + func (w *testEIPWatcher) assertChanges(expected ...string) error { changed := w.changes w.changes = []string{} @@ -742,3 +746,245 @@ func TestOfflineEgressIPs(t *testing.T) { t.Fatalf("%v", err) } } + +func updateAllocations(eit *EgressIPTracker, allocation map[string][]string) { + for nodeName, egressIPs := range allocation { + for _, node := range eit.nodesByNodeIP { + if node.nodeName == nodeName { + eit.UpdateHostSubnetEgress(&networkapi.HostSubnet{ + Host: nodeName, + HostIP: node.nodeIP, + EgressIPs: egressIPs, + EgressCIDRs: node.requestedCIDRs.List(), + }) + break + } + } + } +} + +func TestEgressCIDRAllocation(t *testing.T) { + eit, w := setupEgressIPTracker(t) + + eit.UpdateHostSubnetEgress(&networkapi.HostSubnet{ + Host: "node-3", + HostIP: "172.17.0.3", + EgressIPs: []string{}, + EgressCIDRs: []string{"172.17.0.100/32", "172.17.0.101/32", "172.17.0.102/32", "172.17.0.103/32", "172.17.1.0/24"}, + }) + eit.UpdateHostSubnetEgress(&networkapi.HostSubnet{ + Host: "node-4", + HostIP: "172.17.0.4", + EgressIPs: []string{}, + EgressCIDRs: []string{"172.17.0.0/24"}, + }) + eit.UpdateHostSubnetEgress(&networkapi.HostSubnet{ + Host: "node-5", + HostIP: "172.17.0.5", + EgressIPs: []string{}, + EgressCIDRs: []string{}, + }) + err := w.assertChanges( + "update egress CIDRs", + "update egress CIDRs", + // no "update egress CIDRs" for node-5 since it has no EgressCIDRs + ) + if err != nil { + t.Fatalf("%v", err) + } + + // Either of these could be assigned to either node, but they should be balanced + eit.UpdateNetNamespaceEgress(&networkapi.NetNamespace{ + NetName: "ns-42", + NetID: 42, + EgressIPs: []string{"172.17.0.100"}, + }) + eit.UpdateNetNamespaceEgress(&networkapi.NetNamespace{ + NetName: "ns-43", + NetID: 43, + EgressIPs: []string{"172.17.0.101"}, + }) + err = w.assertChanges( + "namespace 42 dropped", + "update egress CIDRs", + "namespace 43 dropped", + "update egress CIDRs", + ) + if err != nil { + t.Fatalf("%v", err) + } + + allocation := eit.ReallocateEgressIPs() + node3ips := allocation["node-3"] + node4ips := allocation["node-4"] + if len(node3ips) != 1 || len(node4ips) != 1 { + t.Fatalf("Bad IP allocation: %#v", allocation) + } + var n42, n43 string + if node3ips[0] == "172.17.0.100" && node4ips[0] == "172.17.0.101" { + n42 = "172.17.0.3" + n43 = "172.17.0.4" + } else if node3ips[0] == "172.17.0.101" && node4ips[0] == "172.17.0.100" { + n42 = "172.17.0.4" + n43 = "172.17.0.3" + } else { + t.Fatalf("Bad IP allocation: %#v", allocation) + } + + updateAllocations(eit, allocation) + err = w.assertChanges( + fmt.Sprintf("claim 172.17.0.100 on %s for namespace 42", n42), + fmt.Sprintf("namespace 42 via 172.17.0.100 on %s", n42), + fmt.Sprintf("claim 172.17.0.101 on %s for namespace 43", n43), + fmt.Sprintf("namespace 43 via 172.17.0.101 on %s", n43), + ) + if err != nil { + t.Fatalf("%v", err) + } + + // First can only be assigned to node3. Second *could* be assigned to either, but + // must get assigned to node4 for balance + eit.UpdateNetNamespaceEgress(&networkapi.NetNamespace{ + NetName: "ns-44", + NetID: 44, + EgressIPs: []string{"172.17.1.1"}, + }) + eit.UpdateNetNamespaceEgress(&networkapi.NetNamespace{ + NetName: "ns-45", + NetID: 45, + EgressIPs: []string{"172.17.0.102"}, + }) + err = w.assertChanges( + "namespace 44 dropped", + "update egress CIDRs", + "namespace 45 dropped", + "update egress CIDRs", + ) + if err != nil { + t.Fatalf("%v", err) + } + + allocation = eit.ReallocateEgressIPs() + updateAllocations(eit, allocation) + err = w.assertChanges( + "claim 172.17.1.1 on 172.17.0.3 for namespace 44", + "namespace 44 via 172.17.1.1 on 172.17.0.3", + "claim 172.17.0.102 on 172.17.0.4 for namespace 45", + "namespace 45 via 172.17.0.102 on 172.17.0.4", + ) + if err != nil { + t.Fatalf("%v", err) + } + + // Manually assigning egress IPs to the node with no EgressCIDRs should have no + // effect on automatic assignments (though it will result in a spurious "update + // egress CIDRs" notification). + eit.UpdateHostSubnetEgress(&networkapi.HostSubnet{ + Host: "node-5", + HostIP: "172.17.0.5", + EgressIPs: []string{"172.17.2.100"}, + }) + eit.UpdateNetNamespaceEgress(&networkapi.NetNamespace{ + NetName: "ns-50", + NetID: 50, + EgressIPs: []string{"172.17.2.100"}, + }) + err = w.assertChanges( + "claim 172.17.2.100 on 172.17.0.5 for namespace 50", + "namespace 50 via 172.17.2.100 on 172.17.0.5", + "update egress CIDRs", + ) + if err != nil { + t.Fatalf("%v", err) + } + allocation = eit.ReallocateEgressIPs() + if len(allocation) != 0 { + t.Fatalf("Unexpected allocation: %#v", allocation) + } + updateAllocations(eit, allocation) + err = w.assertNoChanges() + if err != nil { + t.Fatalf("%v", err) + } + + // First two can only be assigned to node4. Last must get assigned to node3 for balance + eit.UpdateNetNamespaceEgress(&networkapi.NetNamespace{ + NetName: "ns-46", + NetID: 46, + EgressIPs: []string{"172.17.0.200"}, + }) + eit.UpdateNetNamespaceEgress(&networkapi.NetNamespace{ + NetName: "ns-47", + NetID: 47, + EgressIPs: []string{"172.17.0.201"}, + }) + eit.UpdateNetNamespaceEgress(&networkapi.NetNamespace{ + NetName: "ns-48", + NetID: 48, + EgressIPs: []string{"172.17.0.103"}, + }) + err = w.assertChanges( + "namespace 46 dropped", + "update egress CIDRs", + "namespace 47 dropped", + "update egress CIDRs", + "namespace 48 dropped", + "update egress CIDRs", + ) + if err != nil { + t.Fatalf("%v", err) + } + + allocation = eit.ReallocateEgressIPs() + updateAllocations(eit, allocation) + err = w.assertChanges( + "claim 172.17.0.200 on 172.17.0.4 for namespace 46", + "namespace 46 via 172.17.0.200 on 172.17.0.4", + "claim 172.17.0.201 on 172.17.0.4 for namespace 47", + "namespace 47 via 172.17.0.201 on 172.17.0.4", + "claim 172.17.0.103 on 172.17.0.3 for namespace 48", + "namespace 48 via 172.17.0.103 on 172.17.0.3", + ) + if err != nil { + t.Fatalf("%v", err) + } + + // Dropping an Egress CIDR will drop the Egress IP(s) that came from that CIDR. + // If we then reallocate, the dropped Egress IP(s) might be allocated to new nodes. + eit.UpdateHostSubnetEgress(&networkapi.HostSubnet{ + Host: "node-3", + HostIP: "172.17.0.3", + + // EgressIPs here is unchanged from its current value after the above allocations; + // we're only changing EgressCIDRs (to remove "172.17.0.103/32") + EgressIPs: []string{"172.17.0.101", "172.17.0.103", "172.17.1.1"}, + EgressCIDRs: []string{"172.17.0.100/32", "172.17.0.101/32", "172.17.0.102/32", "172.17.1.0/24"}, + }) + err = w.assertChanges( + "update egress CIDRs", + ) + if err != nil { + t.Fatalf("%v", err) + } + allocation = eit.ReallocateEgressIPs() + updateAllocations(eit, allocation) + err = w.assertChanges( + "release 172.17.0.103 on 172.17.0.3", + "namespace 48 dropped", + // Now that the egress IP has been unassigned, the tracker sees that it + // could be assigned to a new node. + "update egress CIDRs", + ) + if err != nil { + t.Fatalf("%v", err) + } + allocation = eit.ReallocateEgressIPs() + updateAllocations(eit, allocation) + err = w.assertChanges( + "claim 172.17.0.103 on 172.17.0.4 for namespace 48", + "namespace 48 via 172.17.0.103 on 172.17.0.4", + ) + if err != nil { + t.Fatalf("%v", err) + } +} diff --git a/pkg/network/master/egressip.go b/pkg/network/master/egressip.go new file mode 100644 index 000000000000..ee479bcc98d9 --- /dev/null +++ b/pkg/network/master/egressip.go @@ -0,0 +1,95 @@ +package master + +import ( + "fmt" + "sync" + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilwait "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" + + "github.com/openshift/origin/pkg/network/common" + networkinformers "github.com/openshift/origin/pkg/network/generated/informers/internalversion/network/internalversion" + networkclient "github.com/openshift/origin/pkg/network/generated/internalclientset" +) + +type egressIPManager struct { + sync.Mutex + + tracker *common.EgressIPTracker + networkClient networkclient.Interface + hostSubnetInformer networkinformers.HostSubnetInformer + + updatePending bool + updatedAgain bool +} + +func newEgressIPManager() *egressIPManager { + eim := &egressIPManager{} + eim.tracker = common.NewEgressIPTracker(eim) + return eim +} + +func (eim *egressIPManager) Start(networkClient networkclient.Interface, hostSubnetInformer networkinformers.HostSubnetInformer, netNamespaceInformer networkinformers.NetNamespaceInformer) { + eim.networkClient = networkClient + eim.hostSubnetInformer = hostSubnetInformer + eim.tracker.Start(hostSubnetInformer, netNamespaceInformer) +} + +func (eim *egressIPManager) UpdateEgressCIDRs() { + eim.Lock() + defer eim.Unlock() + + if eim.updatePending { + eim.updatedAgain = true + } else { + eim.updatePending = true + go utilwait.PollInfinite(time.Second, eim.maybeDoUpdateEgressCIDRs) + } +} + +func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) { + eim.Lock() + defer eim.Unlock() + + if eim.updatedAgain { + eim.updatedAgain = false + return false, nil + } + eim.updatePending = false + + allocation := eim.tracker.ReallocateEgressIPs() + for nodeName, egressIPs := range allocation { + resultErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + hs, err := eim.hostSubnetInformer.Lister().Get(nodeName) + if err != nil { + return err + } + + hs.EgressIPs = egressIPs + _, err = eim.networkClient.Network().HostSubnets().Update(hs) + return err + }) + if resultErr != nil { + utilruntime.HandleError(fmt.Errorf("Could not update HostSubnet EgressIPs: %v", resultErr)) + } + } + + return true, nil +} + +func (eim *egressIPManager) ClaimEgressIP(vnid uint32, egressIP, nodeIP string) { +} + +func (eim *egressIPManager) ReleaseEgressIP(egressIP, nodeIP string) { +} + +func (eim *egressIPManager) SetNamespaceEgressNormal(vnid uint32) { +} + +func (eim *egressIPManager) SetNamespaceEgressDropped(vnid uint32) { +} + +func (eim *egressIPManager) SetNamespaceEgressViaEgressIP(vnid uint32, egressIP, nodeIP string) { +} diff --git a/pkg/network/master/master.go b/pkg/network/master/master.go index 6002edc92081..a5e723aa6d44 100644 --- a/pkg/network/master/master.go +++ b/pkg/network/master/master.go @@ -193,6 +193,9 @@ func (master *OsdnMaster) startSubSystems(pluginName string) { glog.Fatalf("failed to start VNID master: %v", err) } } + + eim := newEgressIPManager() + eim.Start(master.networkClient, master.hostSubnetInformer, master.netNamespaceInformer) } func (master *OsdnMaster) checkClusterNetworkAgainstLocalNetworks() error { diff --git a/pkg/network/node/egressip.go b/pkg/network/node/egressip.go index ffaa1f9d29d7..cc153732644c 100644 --- a/pkg/network/node/egressip.go +++ b/pkg/network/node/egressip.go @@ -103,6 +103,9 @@ func (eip *egressIPWatcher) ReleaseEgressIP(egressIP, nodeIP string) { } } +func (eip *egressIPWatcher) UpdateEgressCIDRs() { +} + func (eip *egressIPWatcher) SetNamespaceEgressNormal(vnid uint32) { if err := eip.oc.SetNamespaceEgressNormal(vnid); err != nil { utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules for VNID %d: %v", vnid, err))