Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

serialize broadcast topology updates and coalesce updates #117

Merged
merged 6 commits into from
Oct 10, 2019
13 changes: 13 additions & 0 deletions gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ func sendPendingGossip(routers ...*Router) {
}
}

func sendPendingTopologyUpdates(routers ...*Router) {
for _, router := range routers {
router.Ourself.Lock()
pendingUpdate := router.Ourself.pendingTopologyUpdate
router.Ourself.Unlock()
if pendingUpdate {
router.Ourself.broadcastPendingTopologyUpdates()
}
}
}

func addTestGossipConnection(r1, r2 *Router) {
c1 := r1.newTestGossipConnection(r2)
c2 := r2.newTestGossipConnection(r1)
Expand Down Expand Up @@ -121,6 +132,7 @@ func checkTopology(t *testing.T, router *Router, wantedPeers ...*Peer) {
}

func flushAndCheckTopology(t *testing.T, routers []*Router, wantedPeers ...*Peer) {
sendPendingTopologyUpdates(routers...)
sendPendingGossip(routers...)
for _, r := range routers {
checkTopology(t, r, wantedPeers...)
Expand Down Expand Up @@ -155,6 +167,7 @@ func TestGossipTopology(t *testing.T) {

// Drop the connection from 1 to 3
r1.DeleteTestGossipConnection(r3)
sendPendingTopologyUpdates(routers...)
sendPendingGossip(r1, r2, r3)
checkTopology(t, r1, r1.tp(r2), r2.tp(r1))
checkTopology(t, r2, r1.tp(r2), r2.tp(r1))
Expand Down
44 changes: 37 additions & 7 deletions local_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ import (
"time"
)

const (
deferTopologyUpdateDuration = 1 * time.Second
)

// localPeer is the only "active" peer in the mesh. It extends Peer with
// additional behaviors, mostly to retrieve and manage connection state.
type localPeer struct {
sync.RWMutex
*Peer
router *Router
actionChan chan<- localPeerAction
router *Router
actionChan chan<- localPeerAction
topologyUpdates peerNameSet
timer *time.Timer
pendingTopologyUpdate bool
}

// The actor closure used by localPeer.
Expand All @@ -23,11 +30,15 @@ type localPeerAction func()
// newLocalPeer returns a usable LocalPeer.
func newLocalPeer(name PeerName, nickName string, router *Router) *localPeer {
actionChan := make(chan localPeerAction, ChannelSize)
topologyUpdates := make(peerNameSet)
peer := &localPeer{
Peer: newPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()),
router: router,
actionChan: actionChan,
Peer: newPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()),
router: router,
actionChan: actionChan,
topologyUpdates: topologyUpdates,
timer: time.NewTimer(deferTopologyUpdateDuration),
}
peer.timer.Stop()
go peer.actorLoop(actionChan)
return peer
}
Expand Down Expand Up @@ -147,10 +158,22 @@ func (peer *localPeer) actorLoop(actionChan <-chan localPeerAction) {
action()
case <-gossipTimer:
peer.router.sendAllGossip()
case <-peer.timer.C:
peer.broadcastPendingTopologyUpdates()
}
}
}

func (peer *localPeer) broadcastPendingTopologyUpdates() {
peer.Lock()
gossipData := peer.topologyUpdates
peer.topologyUpdates = make(peerNameSet)
peer.pendingTopologyUpdate = false
peer.Unlock()
gossipData[peer.Peer.Name] = struct{}{}
peer.router.broadcastTopologyUpdate(gossipData)
}

func (peer *localPeer) handleAddConnection(conn ourConnection, isRestartedPeer bool) error {
if peer.Peer != conn.getLocal() {
panic("Attempt made to add connection to peer where peer is not the source of connection")
Expand Down Expand Up @@ -194,7 +217,6 @@ func (peer *localPeer) handleAddConnection(conn ourConnection, isRestartedPeer b
conn.logf("connection added (new peer)")
peer.router.sendAllGossipDown(conn)
}

peer.router.Routes.recalculate()
peer.broadcastPeerUpdate(conn.Remote())

Expand Down Expand Up @@ -244,7 +266,15 @@ func (peer *localPeer) broadcastPeerUpdate(peers ...*Peer) {
// context of a test, but that will involve significant
// reworking of tests.
if peer.router != nil {
peer.router.broadcastTopologyUpdate(append(peers, peer.Peer))
peer.Lock()
defer peer.Unlock()
if !peer.pendingTopologyUpdate {
peer.timer.Reset(deferTopologyUpdateDuration)
peer.pendingTopologyUpdate = true
}
for _, p := range peers {
peer.topologyUpdates[p.Name] = struct{}{}
}
}
}

Expand Down
9 changes: 3 additions & 6 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,9 @@ func (router *Router) sendPendingGossip() bool {

// BroadcastTopologyUpdate is invoked whenever there is a change to the mesh
// topology, and broadcasts the new set of peers to the mesh.
func (router *Router) broadcastTopologyUpdate(update []*Peer) {
names := make(peerNameSet)
for _, p := range update {
names[p.Name] = struct{}{}
}
router.topologyGossip.GossipBroadcast(&topologyGossipData{peers: router.Peers, update: names})
func (router *Router) broadcastTopologyUpdate(update peerNameSet) {
gossipData := &topologyGossipData{peers: router.Peers, update: update}
router.topologyGossip.GossipBroadcast(gossipData)
}

// OnGossipUnicast implements Gossiper, but always returns an error, as a
Expand Down