Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat(peermanager): move refcnt
Browse files Browse the repository at this point in the history
Move refcnt tracking from the messagequeue to the peermanager, where it's relevant
  • Loading branch information
hannahhoward committed Feb 20, 2019
1 parent 434e0f4 commit d4191c4
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 49 deletions.
20 changes: 0 additions & 20 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ type MessageQueue struct {

sender bsnet.MessageSender

refcnt int

work chan struct{}
done chan struct{}
}
Expand All @@ -48,27 +46,9 @@ func New(p peer.ID, network MessageNetwork) *MessageQueue {
wl: wantlist.NewThreadSafe(),
network: network,
p: p,
refcnt: 0,
}
}

// RefCount returns the number of open connections for this queue.
func (mq *MessageQueue) RefCount() int {
return mq.refcnt
}

// RefIncrement increments the refcount for a message queue.
func (mq *MessageQueue) RefIncrement() {
mq.refcnt++
}

// RefDecrement decrements the refcount for a message queue and returns true
// if the refcount is now 0.
func (mq *MessageQueue) RefDecrement() bool {
mq.refcnt--
return mq.refcnt > 0
}

// AddMessage adds new entries to an outgoing message for a given session.
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
if !mq.addEntries(entries, ses) {
Expand Down
56 changes: 35 additions & 21 deletions peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ var (

// PeerQueue provides a queer of messages to be sent for a single peer.
type PeerQueue interface {
RefIncrement()
RefDecrement() bool
RefCount() int
AddMessage(entries []*bsmsg.Entry, ses uint64)
Startup(ctx context.Context)
AddWantlist(initialEntries []*wantlist.Entry)
Expand All @@ -35,10 +32,15 @@ type peerMessage interface {
handle(pm *PeerManager)
}

type peerQueueInstance struct {
refcnt int
pq PeerQueue
}

// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
// peerQueues -- interact through internal utility functions get/set/remove/iterate
peerQueues map[peer.ID]PeerQueue
peerQueues map[peer.ID]*peerQueueInstance
peerQueuesLk sync.RWMutex

createPeerQueue PeerQueueFactory
Expand All @@ -48,7 +50,7 @@ type PeerManager struct {
// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
return &PeerManager{
peerQueues: make(map[peer.ID]PeerQueue),
peerQueues: make(map[peer.ID]*peerQueueInstance),
createPeerQueue: createPeerQueue,
ctx: ctx,
}
Expand All @@ -68,28 +70,39 @@ func (pm *PeerManager) ConnectedPeers() []peer.ID {
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
mq := pm.getOrCreate(p)
pm.peerQueuesLk.Lock()

pq := pm.getOrCreate(p)

if mq.RefCount() == 0 {
mq.AddWantlist(initialEntries)
if pq.refcnt == 0 {
pq.pq.AddWantlist(initialEntries)
}
mq.RefIncrement()

pq.refcnt++

pm.peerQueuesLk.Unlock()
}

// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.peerQueuesLk.Lock()
pq, ok := pm.peerQueues[p]

if !ok || pq.RefDecrement() {
if !ok {
pm.peerQueuesLk.Unlock()
return
}

pq.refcnt--
if pq.refcnt > 0 {
pm.peerQueuesLk.Unlock()
return
}

delete(pm.peerQueues, p)
pm.peerQueuesLk.Unlock()

pq.Shutdown()
pq.pq.Shutdown()

}

Expand All @@ -99,25 +112,26 @@ func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, fr
if len(targets) == 0 {
pm.peerQueuesLk.RLock()
for _, p := range pm.peerQueues {
p.AddMessage(entries, from)
p.pq.AddMessage(entries, from)
}
pm.peerQueuesLk.RUnlock()
} else {
for _, t := range targets {
p := pm.getOrCreate(t)
p.AddMessage(entries, from)
pm.peerQueuesLk.Lock()
pqi := pm.getOrCreate(t)
pm.peerQueuesLk.Unlock()
pqi.pq.AddMessage(entries, from)
}
}
}

func (pm *PeerManager) getOrCreate(p peer.ID) PeerQueue {
pm.peerQueuesLk.Lock()
pq, ok := pm.peerQueues[p]
func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance {
pqi, ok := pm.peerQueues[p]
if !ok {
pq = pm.createPeerQueue(p)
pq := pm.createPeerQueue(p)
pq.Startup(pm.ctx)
pm.peerQueues[p] = pq
pqi = &peerQueueInstance{0, pq}
pm.peerQueues[p] = pqi
}
pm.peerQueuesLk.Unlock()
return pq
return pqi
}
9 changes: 1 addition & 8 deletions peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@ type messageSent struct {
}

type fakePeer struct {
refcnt int
p peer.ID
messagesSent chan messageSent
}

func (fp *fakePeer) Startup(ctx context.Context) {}
func (fp *fakePeer) Shutdown() {}
func (fp *fakePeer) RefCount() int { return fp.refcnt }
func (fp *fakePeer) RefIncrement() { fp.refcnt++ }
func (fp *fakePeer) RefDecrement() bool {
fp.refcnt--
return fp.refcnt > 0
}

func (fp *fakePeer) AddMessage(entries []*bsmsg.Entry, ses uint64) {
fp.messagesSent <- messageSent{fp.p, entries, ses}
}
Expand All @@ -41,7 +35,6 @@ func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory {
return func(p peer.ID) PeerQueue {
return &fakePeer{
p: p,
refcnt: 0,
messagesSent: messagesSent,
}
}
Expand Down

0 comments on commit d4191c4

Please sign in to comment.