diff --git a/routes.go b/routes.go index 35bed9d..858da78 100644 --- a/routes.go +++ b/routes.go @@ -3,6 +3,7 @@ package mesh import ( "math" "sync" + "time" ) type unicastRoutes map[PeerName]PeerName @@ -11,23 +12,29 @@ type broadcastRoutes map[PeerName][]PeerName // routes aggregates unicast and broadcast routes for our peer. type routes struct { sync.RWMutex - ourself *localPeer - peers *Peers - onChange []func() - unicast unicastRoutes - unicastAll unicastRoutes // [1] - broadcast broadcastRoutes - broadcastAll broadcastRoutes // [1] - recalc chan<- *struct{} - wait chan chan struct{} - action chan<- func() + ourself *localPeer + peers *Peers + onChange []func() + unicast unicastRoutes + unicastAll unicastRoutes // [1] + broadcast broadcastRoutes + broadcastAll broadcastRoutes // [1] + recalcTimer *time.Timer + pendingRecalc bool + wait chan chan struct{} + action chan<- func() // [1] based on *all* connections, not just established & // symmetric ones } +const ( + // We defer recalculation requests by up to 100ms, in order to + // coalesce multiple recalcs together. + recalcDeferTime = 100 * time.Millisecond +) + // newRoutes returns a usable Routes based on the LocalPeer and existing Peers. func newRoutes(ourself *localPeer, peers *Peers) *routes { - recalculate := make(chan *struct{}, 1) wait := make(chan chan struct{}) action := make(chan func()) r := &routes{ @@ -37,11 +44,12 @@ func newRoutes(ourself *localPeer, peers *Peers) *routes { unicastAll: unicastRoutes{ourself.Name: UnknownPeerName}, broadcast: broadcastRoutes{ourself.Name: []PeerName{}}, broadcastAll: broadcastRoutes{ourself.Name: []PeerName{}}, - recalc: recalculate, + recalcTimer: time.NewTimer(time.Hour), wait: wait, action: action, } - go r.run(recalculate, wait, action) + r.recalcTimer.Stop() + go r.run(wait, action) return r } @@ -156,13 +164,18 @@ func (r *routes) randomNeighbours(except PeerName) []PeerName { // can effectively be made synchronous with a subsequent call to // EnsureRecalculated. func (r *routes) recalculate() { - // The use of a 1-capacity channel in combination with the - // non-blocking send is an optimisation that results in multiple - // requests being coalesced. - select { - case r.recalc <- nil: - default: + r.Lock() + if !r.pendingRecalc { + r.recalcTimer.Reset(recalcDeferTime) + r.pendingRecalc = true } + r.Unlock() +} + +func (r *routes) clearPendingRecalcFlag() { + r.Lock() + r.pendingRecalc = false + r.Unlock() } // EnsureRecalculated waits for any preceding Recalculate requests to finish. @@ -178,16 +191,20 @@ func (r *routes) ensureRecalculated() { <-done } -func (r *routes) run(recalculate <-chan *struct{}, wait <-chan chan struct{}, action <-chan func()) { +func (r *routes) run(wait <-chan chan struct{}, action <-chan func()) { for { select { - case <-recalculate: + case <-r.recalcTimer.C: + r.clearPendingRecalcFlag() r.calculate() case done := <-wait: - select { - case <-recalculate: + r.Lock() + pending := r.pendingRecalc + r.Unlock() + if pending { + <-r.recalcTimer.C + r.clearPendingRecalcFlag() r.calculate() - default: } close(done) case f := <-action: @@ -196,6 +213,8 @@ func (r *routes) run(recalculate <-chan *struct{}, wait <-chan chan struct{}, ac } } +// Calculate unicast and broadcast routes from r.ourself, and reset +// the broadcast route cache. func (r *routes) calculate() { r.peers.RLock() r.ourself.RLock()