From 8b7668ea9ccc79dbd069f8349d160566a0e83411 Mon Sep 17 00:00:00 2001 From: Shaun Crampton Date: Fri, 7 Jul 2017 16:00:15 +0100 Subject: [PATCH] Do conntrack deletions in the background. Block on pending deletions if IP is reused. --- routetable/route_table.go | 49 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/routetable/route_table.go b/routetable/route_table.go index 5a5329236e..2793b158cb 100644 --- a/routetable/route_table.go +++ b/routetable/route_table.go @@ -75,6 +75,8 @@ type RouteTable struct { ifaceNameToTargets map[string][]Target pendingIfaceNameToTargets map[string][]Target + pendingConntrackCleanups map[ip.Addr]chan struct{} + inSync bool // dataplane is our shim for the netlink/arp interface. In production, it maps directly @@ -117,6 +119,7 @@ func NewWithShims(interfacePrefixes []string, ipVersion uint8, nl dataplaneIface pendingIfaceNameToTargets: map[string][]Target{}, dirtyIfaces: set.New(), dataplane: nl, + pendingConntrackCleanups: map[ip.Addr]chan struct{}{}, } } @@ -199,6 +202,8 @@ func (r *RouteTable) Apply() error { return set.RemoveItem }) + r.cleanUpPendingConntrackDeletes() + if r.dirtyIfaces.Len() > 0 { r.logCxt.Warn("Some interfaces still out-of sync.") r.inSync = false @@ -250,7 +255,7 @@ func (r *RouteTable) syncRoutesForLink(ifaceName string) error { defer oldCIDRs.Iter(func(item interface{}) error { // Remove and conntrack entries that should no longer be there. dest := item.(ip.CIDR) - r.dataplane.RemoveConntrackFlows(dest.Version(), dest.Addr().AsNetIP()) + r.startConntrackDeletion(dest.Addr()) return nil }) @@ -323,6 +328,9 @@ func (r *RouteTable) syncRoutesForLink(ifaceName string) error { Protocol: syscall.RTPROT_BOOT, Scope: netlink.SCOPE_LINK, } + // In case this IP is being re-used, wait for any previous conntrack entry + // to be cleaned up. (No-op if there are no pending deletes.) + r.waitForPendingConntrackDelete(cidr.Addr()) if err := r.dataplane.RouteAdd(&route); err != nil { logCxt.WithError(err).Warn("Failed to add route") updatesFailed = true @@ -347,6 +355,45 @@ func (r *RouteTable) syncRoutesForLink(ifaceName string) error { return nil } +// startConntrackDeletion starts the deletion of conntrack entries for the given CIDR in the background. Pending +// deletions are tracked in the pendingConntrackCleanups map so we can block waiting for them later. +func (r *RouteTable) startConntrackDeletion(ipAddr ip.Addr) { + log.WithField("ip", ipAddr).Debug("Starting goroutine to delete conntrack entries") + done := make(chan struct{}) + r.pendingConntrackCleanups[ipAddr] = done + go func() { + defer close(done) + r.dataplane.RemoveConntrackFlows(r.ipVersion, ipAddr.AsNetIP()) + log.WithField("ip", ipAddr).Debug("Deleted conntrack entries") + }() +} + +// cleanUpPendingConntrackDeletes scans the pendingConntrackCleanups map for completed entries and removes them. +func (r *RouteTable) cleanUpPendingConntrackDeletes() { + for ipAddr, c := range r.pendingConntrackCleanups { + select { + case <-c: + log.WithField("ip", ipAddr).Debug( + "Background goroutine finished deleting conntrack entries") + delete(r.pendingConntrackCleanups, ipAddr) + default: + log.WithField("ip", ipAddr).Debug( + "Background goroutine yet to finish deleting conntrack entries") + continue + } + } +} + +// waitForPendingConntrackDelete waits for any pending conntrack deletions (if any) for the given IP to complete. +func (r *RouteTable) waitForPendingConntrackDelete(ipAddr ip.Addr) { + if c := r.pendingConntrackCleanups[ipAddr]; c != nil { + log.WithField("ip", ipAddr).Info("Waiting for pending conntrack deletion to finish") + <-c + log.WithField("ip", ipAddr).Info("Done waiting for pending conntrack deletion to finish") + delete(r.pendingConntrackCleanups, ipAddr) + } +} + // filterErrorByIfaceState checks the current state of the interface; if it's down or gone, it // returns IfaceDown or IfaceNotPresent, otherwise, it returns the given defaultErr. func (r *RouteTable) filterErrorByIfaceState(ifaceName string, currentErr, defaultErr error) error {