Skip to content

Commit

Permalink
Merge pull request projectcalico#1498 from fasaxc/bg-conntrack
Browse files Browse the repository at this point in the history
Do conntrack deletions in the background.
  • Loading branch information
fasaxc authored Jul 12, 2017
2 parents f54ac44 + fd834d8 commit 3ee61d7
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 5 deletions.
53 changes: 52 additions & 1 deletion routetable/route_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type RouteTable struct {
ifaceNameToTargets map[string][]Target
pendingIfaceNameToTargets map[string][]Target

pendingConntrackCleanups map[ip.Addr]chan struct{}

inSync bool

// dataplane is our shim for the netlink/arp interface. In production, it maps directly
Expand Down Expand Up @@ -117,6 +119,7 @@ func NewWithShims(interfacePrefixes []string, ipVersion uint8, nl dataplaneIface
pendingIfaceNameToTargets: map[string][]Target{},
dirtyIfaces: set.New(),
dataplane: nl,
pendingConntrackCleanups: map[ip.Addr]chan struct{}{},
}
}

Expand Down Expand Up @@ -199,6 +202,8 @@ func (r *RouteTable) Apply() error {
return set.RemoveItem
})

r.cleanUpPendingConntrackDeletions()

if r.dirtyIfaces.Len() > 0 {
r.logCxt.Warn("Some interfaces still out-of sync.")
r.inSync = false
Expand Down Expand Up @@ -250,7 +255,7 @@ func (r *RouteTable) syncRoutesForLink(ifaceName string) error {
defer oldCIDRs.Iter(func(item interface{}) error {
// Remove and conntrack entries that should no longer be there.
dest := item.(ip.CIDR)
r.dataplane.RemoveConntrackFlows(dest.Version(), dest.Addr().AsNetIP())
r.startConntrackDeletion(dest.Addr())
return nil
})

Expand Down Expand Up @@ -323,6 +328,9 @@ func (r *RouteTable) syncRoutesForLink(ifaceName string) error {
Protocol: syscall.RTPROT_BOOT,
Scope: netlink.SCOPE_LINK,
}
// In case this IP is being re-used, wait for any previous conntrack entry
// to be cleaned up. (No-op if there are no pending deletes.)
r.waitForPendingConntrackDeletion(cidr.Addr())
if err := r.dataplane.RouteAdd(&route); err != nil {
logCxt.WithError(err).Warn("Failed to add route")
updatesFailed = true
Expand All @@ -347,6 +355,49 @@ func (r *RouteTable) syncRoutesForLink(ifaceName string) error {
return nil
}

// startConntrackDeletion starts the deletion of conntrack entries for the given CIDR in the background. Pending
// deletions are tracked in the pendingConntrackCleanups map so we can block waiting for them later.
//
// It's important to do the conntrack deletions in the background because scanning the conntrack
// table is very slow if there are a lot of entries. Previously, we did the deletion synchronously
// but that led to lengthy Apply() calls on the critical path.
func (r *RouteTable) startConntrackDeletion(ipAddr ip.Addr) {
log.WithField("ip", ipAddr).Debug("Starting goroutine to delete conntrack entries")
done := make(chan struct{})
r.pendingConntrackCleanups[ipAddr] = done
go func() {
defer close(done)
r.dataplane.RemoveConntrackFlows(r.ipVersion, ipAddr.AsNetIP())
log.WithField("ip", ipAddr).Debug("Deleted conntrack entries")
}()
}

// cleanUpPendingConntrackDeletions scans the pendingConntrackCleanups map for completed entries and removes them.
func (r *RouteTable) cleanUpPendingConntrackDeletions() {
for ipAddr, c := range r.pendingConntrackCleanups {
select {
case <-c:
log.WithField("ip", ipAddr).Debug(
"Background goroutine finished deleting conntrack entries")
delete(r.pendingConntrackCleanups, ipAddr)
default:
log.WithField("ip", ipAddr).Debug(
"Background goroutine yet to finish deleting conntrack entries")
continue
}
}
}

// waitForPendingConntrackDeletion waits for any pending conntrack deletions (if any) for the given IP to complete.
func (r *RouteTable) waitForPendingConntrackDeletion(ipAddr ip.Addr) {
if c := r.pendingConntrackCleanups[ipAddr]; c != nil {
log.WithField("ip", ipAddr).Info("Waiting for pending conntrack deletion to finish")
<-c
log.WithField("ip", ipAddr).Info("Done waiting for pending conntrack deletion to finish")
delete(r.pendingConntrackCleanups, ipAddr)
}
}

// filterErrorByIfaceState checks the current state of the interface; if it's down or gone, it
// returns IfaceDown or IfaceNotPresent, otherwise, it returns the given defaultErr.
func (r *RouteTable) filterErrorByIfaceState(ifaceName string, currentErr, defaultErr error) error {
Expand Down
63 changes: 59 additions & 4 deletions routetable/route_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ import (
"errors"
"fmt"
"net"
"strings"
"sync"
"syscall"
"time"

log "github.com/Sirupsen/logrus"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vishvananda/netlink"

"github.com/projectcalico/felix/ifacemonitor"
"github.com/projectcalico/felix/ip"
"github.com/projectcalico/felix/set"
"github.com/projectcalico/felix/testutils"

"strings"

"github.com/projectcalico/felix/ifacemonitor"
)

var (
Expand Down Expand Up @@ -107,6 +107,44 @@ var _ = Describe("RouteTable", func() {
Expect(dataplane.routeKeyToRoute).To(ConsistOf(gatewayRoute))
Expect(dataplane.addedRouteKeys).To(BeEmpty())
})
It("should delete only our conntrack entries", func() {
rt.Apply()
Eventually(dataplane.GetDeletedConntrackEntries).Should(ConsistOf(
net.ParseIP("10.0.0.1").To4(),
net.ParseIP("10.0.0.3").To4(),
))
})

Describe("with a slow conntrack deletion", func() {
const delay = 300 * time.Millisecond
BeforeEach(func() {
dataplane.ConntrackSleep = delay
})
It("should block a route add until conntrack finished", func() {
// Initial apply starts a background thread to delete
// 10.0.0.1 and 10.0.0.3.
rt.Apply()
// We try to add 10.0.0.1 back in.
rt.SetRoutes("cali1", []Target{
{CIDR: ip.MustParseCIDR("10.0.0.1/32"), DestMAC: mac1},
})
start := time.Now()
rt.Apply()
Expect(time.Since(start)).To(BeNumerically(">=", delay))
})
It("should not block an unrelated route add ", func() {
// Initial apply starts a background thread to delete
// 10.0.0.1 and 10.0.0.3.
rt.Apply()
// We try to add 10.0.0.10, which hasn't been seen before.
rt.SetRoutes("cali1", []Target{
{CIDR: ip.MustParseCIDR("10.0.0.10/32"), DestMAC: mac1},
})
start := time.Now()
rt.Apply()
Expect(time.Since(start)).To(BeNumerically("<", delay/2))
})
})

// We do the following tests in different failure (and non-failure) scenarios. In
// each case, we make the failure transient so that only the first Apply() should
Expand Down Expand Up @@ -379,6 +417,10 @@ type mockDataplane struct {
deletedRouteKeys set.Set

failuresToSimulate failFlags

mutex sync.Mutex
deletedConntrackEntries []net.IP
ConntrackSleep time.Duration
}

func (d *mockDataplane) addIface(idx int, name string, up bool, running bool) *mockLink {
Expand Down Expand Up @@ -506,7 +548,20 @@ func (d *mockDataplane) RemoveConntrackFlows(ipVersion uint8, ipAddr net.IP) {
log.WithFields(log.Fields{
"ipVersion": ipVersion,
"ipAddr": ipAddr,
"sleepTime": d.ConntrackSleep,
}).Info("Mock dataplane: Removing conntrack flows")
d.mutex.Lock()
d.deletedConntrackEntries = append(d.deletedConntrackEntries, ipAddr)
d.mutex.Unlock()
time.Sleep(d.ConntrackSleep)
}

func (d *mockDataplane) GetDeletedConntrackEntries() []net.IP {
d.mutex.Lock()
defer d.mutex.Unlock()
cpy := make([]net.IP, len(d.deletedConntrackEntries))
copy(cpy, d.deletedConntrackEntries)
return cpy
}

func keyForRoute(route *netlink.Route) string {
Expand Down

0 comments on commit 3ee61d7

Please sign in to comment.