Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HA for fully-automatic egress IPs #20485

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions pkg/network/common/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,22 +450,32 @@ func (eit *EgressIPTracker) SetNodeOffline(nodeIP string, offline bool) {
eit.egressIPChanged(eg)
}
}

if node.requestedCIDRs.Len() != 0 {
eit.updateEgressCIDRs = true
}

eit.syncEgressIPs()
}

func (eit *EgressIPTracker) lookupNodeIP(ip string) string {
eit.Lock()
defer eit.Unlock()

if node := eit.nodesByNodeIP[ip]; node != nil {
return node.sdnIP
}
return ip
}

// Ping a node and return whether or not it is online. We do this by trying to open a TCP
// connection to the "discard" service (port 9); if the node is offline, the attempt will
// time out with no response (and we will return false). If the node is online then we
// presumably will get a "connection refused" error; the code below assumes that anything
// other than timing out indicates that the node is online.
func (eit *EgressIPTracker) Ping(ip string, timeout time.Duration) bool {
eit.Lock()
defer eit.Unlock()

// If the caller used a public node IP, replace it with the SDN IP
if node := eit.nodesByNodeIP[ip]; node != nil {
ip = node.sdnIP
}
ip = eit.lookupNodeIP(ip)

conn, err := net.DialTimeout("tcp", ip+":9", timeout)
if conn != nil {
Expand All @@ -485,6 +495,9 @@ func (eit *EgressIPTracker) findEgressIPAllocation(ip net.IP, allocation map[str
otherNodes := false

for _, node := range eit.nodes {
if node.offline {
continue
}
egressIPs, exists := allocation[node.nodeName]
if !exists {
continue
Expand All @@ -506,22 +519,20 @@ func (eit *EgressIPTracker) findEgressIPAllocation(ip net.IP, allocation map[str
return bestNode, otherNodes
}

// ReallocateEgressIPs returns a map from Node name to array-of-Egress-IP. Unchanged nodes are not included.
// ReallocateEgressIPs returns a map from Node name to array-of-Egress-IP for all auto-allocated egress IPs
func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string {
eit.Lock()
defer eit.Unlock()

allocation := make(map[string][]string)
changed := make(map[string]bool)
alreadyAllocated := make(map[string]bool)
for _, node := range eit.nodes {
if len(node.parsedCIDRs) > 0 {
allocation[node.nodeName] = make([]string, 0, node.requestedIPs.Len())
}
}
// For each active egress IP, if it still fits within some egress CIDR on its node,
// add it to that node's allocation. (Otherwise add the node to the "changed" map,
// since we'll be removing this egress IP from it.)
// add it to that node's allocation.
for egressIP, eip := range eit.egressIPs {
if eip.assignedNodeIP == "" {
continue
Expand All @@ -534,10 +545,8 @@ func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string {
break
}
}
if found {
if found && !node.offline {
allocation[node.nodeName] = append(allocation[node.nodeName], egressIP)
} else {
changed[node.nodeName] = true
}
// (We set alreadyAllocated even if the egressIP will be removed from
// its current node; we can't assign it to a new node until the next
Expand All @@ -553,7 +562,6 @@ func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string {
nodeName, otherNodes := eit.findEgressIPAllocation(eip.parsed, allocation)
if nodeName != "" && !otherNodes {
allocation[nodeName] = append(allocation[nodeName], egressIP)
changed[nodeName] = true
alreadyAllocated[egressIP] = true
}
}
Expand All @@ -565,15 +573,8 @@ func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string {
nodeName, _ := eit.findEgressIPAllocation(eip.parsed, allocation)
if nodeName != "" {
allocation[nodeName] = append(allocation[nodeName], egressIP)
changed[nodeName] = true
}
}

// Remove unchanged nodes from the return value
for _, node := range eit.nodes {
if !changed[node.nodeName] {
delete(allocation, node.nodeName)
}
}
return allocation
}
112 changes: 109 additions & 3 deletions pkg/network/common/egressip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ func (w *testEIPWatcher) assertNoChanges() error {
return w.assertChanges()
}

func (w *testEIPWatcher) flushChanges() {
w.changes = []string{}
}

func (w *testEIPWatcher) assertUpdateEgressCIDRsNotification() error {
for _, change := range w.changes {
if change == "update egress CIDRs" {
w.flushChanges()
return nil
}
}
return fmt.Errorf("expected change \"update egress CIDRs\", got %#v", w.changes)
}

func setupEgressIPTracker(t *testing.T) (*EgressIPTracker, *testEIPWatcher) {
watcher := &testEIPWatcher{}
return NewEgressIPTracker(watcher), watcher
Expand Down Expand Up @@ -864,9 +878,6 @@ func TestEgressCIDRAllocation(t *testing.T) {
t.Fatalf("%v", err)
}
allocation = eit.ReallocateEgressIPs()
if len(allocation) != 0 {
t.Fatalf("Unexpected allocation: %#v", allocation)
}
updateAllocations(eit, allocation)
err = w.assertNoChanges()
if err != nil {
Expand Down Expand Up @@ -1031,3 +1042,98 @@ func TestEgressNodeRenumbering(t *testing.T) {
t.Fatalf("%v", err)
}
}

func TestEgressCIDRAllocationOffline(t *testing.T) {
eit, w := setupEgressIPTracker(t)

// Create nodes...
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
HostIP: "172.17.0.3",
EgressIPs: []string{},
EgressCIDRs: []string{"172.17.0.0/24", "172.17.1.0/24"},
})
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
HostIP: "172.17.0.4",
EgressIPs: []string{},
EgressCIDRs: []string{"172.17.0.0/24"},
})
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
HostIP: "172.17.0.5",
EgressIPs: []string{},
EgressCIDRs: []string{"172.17.1.0/24"},
})

// Create namespaces
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 100,
EgressIPs: []string{"172.17.0.100"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 101,
EgressIPs: []string{"172.17.0.101"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 102,
EgressIPs: []string{"172.17.0.102"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 200,
EgressIPs: []string{"172.17.1.200"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 201,
EgressIPs: []string{"172.17.1.201"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 202,
EgressIPs: []string{"172.17.1.202"},
})

// In a perfect world, we'd get 2 IPs on each node, but depending on processing
// order, this isn't guaranteed. Eg, if the three 172.17.0.x IPs get processed
// first, we could get two of them on node-3 and one on node-4. Then the first two
// 172.17.1.x IPs get assigned to node-5, and the last one could go to either
// node-3 or node-5. Regardless of order, node-3 is guaranteed to get at least
// two IPs since there's no way either node-4 or node-5 could be assigned a
// third IP if node-3 still only had one.
allocation := eit.ReallocateEgressIPs()
node3ips := allocation["node-3"]
node4ips := allocation["node-4"]
node5ips := allocation["node-5"]
if len(node3ips) < 2 || len(node4ips) == 0 || len(node5ips) == 0 ||
len(node3ips)+len(node4ips)+len(node5ips) != 6 {
t.Fatalf("Bad IP allocation: %#v", allocation)
}
updateAllocations(eit, allocation)

w.flushChanges()

// Now take node-3 offline
eit.SetNodeOffline("172.17.0.3", true)
err := w.assertUpdateEgressCIDRsNotification()
if err != nil {
t.Fatalf("%v", err)
}

// First reallocation should empty out node-3
allocation = eit.ReallocateEgressIPs()
if node3ips, ok := allocation["node-3"]; !ok || len(node3ips) != 0 {
t.Fatalf("Bad IP allocation: %#v", allocation)
}
updateAllocations(eit, allocation)

err = w.assertUpdateEgressCIDRsNotification()
if err != nil {
t.Fatalf("%v", err)
}

// Next reallocation should reassign egress IPs to node-4 and node-5
allocation = eit.ReallocateEgressIPs()
node3ips = allocation["node-3"]
node4ips = allocation["node-4"]
node5ips = allocation["node-5"]
if len(node3ips) != 0 || len(node4ips) != 3 || len(node5ips) != 3 {
t.Fatalf("Bad IP allocation: %#v", allocation)
}
updateAllocations(eit, allocation)
}
100 changes: 98 additions & 2 deletions pkg/network/master/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"sync"
"time"

"github.com/golang/glog"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

Expand All @@ -23,6 +26,15 @@ type egressIPManager struct {

updatePending bool
updatedAgain bool

monitorNodes map[string]*egressNode
stop chan struct{}
}

type egressNode struct {
ip string
offline bool
retries int
}

func newEgressIPManager() *egressIPManager {
Expand Down Expand Up @@ -75,25 +87,109 @@ func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) {
// we won't process that until this reallocation is complete.

allocation := eim.tracker.ReallocateEgressIPs()
monitorNodes := make(map[string]*egressNode, len(allocation))
for nodeName, egressIPs := range allocation {
resultErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
hs, err := eim.hostSubnetInformer.Lister().Get(nodeName)
if err != nil {
return err
}

hs.EgressIPs = egressIPs
_, err = eim.networkClient.Network().HostSubnets().Update(hs)
if node := eim.monitorNodes[hs.HostIP]; node != nil {
monitorNodes[hs.HostIP] = node
} else {
monitorNodes[hs.HostIP] = &egressNode{ip: hs.HostIP}
}

oldIPs := sets.NewString(hs.EgressIPs...)
newIPs := sets.NewString(egressIPs...)
if !oldIPs.Equal(newIPs) {
hs.EgressIPs = egressIPs
_, err = eim.networkClient.Network().HostSubnets().Update(hs)
}
return err
})
if resultErr != nil {
utilruntime.HandleError(fmt.Errorf("Could not update HostSubnet EgressIPs: %v", resultErr))
}
}

eim.monitorNodes = monitorNodes
if len(monitorNodes) > 0 {
if eim.stop == nil {
eim.stop = make(chan struct{})
go eim.poll(eim.stop)
}
} else {
if eim.stop != nil {
close(eim.stop)
eim.stop = nil
}
}

return true, nil
}

const (
pollInterval = 5 * time.Second
repollInterval = time.Second
maxRetries = 2
)

func (eim *egressIPManager) poll(stop chan struct{}) {
retry := false
for {
select {
case <-stop:
return
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more idomatic to do something like:

var timeout time.Duration
select {
case <-stop:
	return
case <-time.After(timeout):
	start := time.Now()
	retry := eim.check(retry)
	if !retry {
		//  If less than pollInterval has passed since start, then sleep until it has
		timeout = time.Until(start.Add(pollInterval))
	}
}

}

start := time.Now()
retry := eim.check(retry)
if !retry {
// If less than pollInterval has passed since start, then sleep until it has
time.Sleep(start.Add(pollInterval).Sub(time.Now()))
}
}
}

func (eim *egressIPManager) check(retrying bool) bool {
var timeout time.Duration
if retrying {
timeout = repollInterval
} else {
timeout = pollInterval
}

needRetry := false
for _, node := range eim.monitorNodes {
if retrying && node.retries == 0 {
continue
}

online := eim.tracker.Ping(node.ip, timeout)
if node.offline && online {
glog.Infof("Node %s is back online", node.ip)
node.offline = false
eim.tracker.SetNodeOffline(node.ip, false)
} else if !node.offline && !online {
node.retries++
if node.retries > maxRetries {
glog.Warningf("Node %s is offline", node.ip)
node.retries = 0
node.offline = true
eim.tracker.SetNodeOffline(node.ip, true)
} else {
glog.V(2).Infof("Node %s may be offline... retrying", node.ip)
needRetry = true
}
}
}

return needRetry
}

func (eim *egressIPManager) ClaimEgressIP(vnid uint32, egressIP, nodeIP string) {
}

Expand Down