Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

fix(wantlist): remove races on setup #72

Merged
merged 9 commits into from
Feb 20, 2019
6 changes: 2 additions & 4 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

bs.wm.SetDelegate(bs.pm)
bs.pm.Startup()
bs.wm.Startup()
bs.pqm.Startup()
network.SetDelegate(bs)
Expand Down Expand Up @@ -361,14 +360,13 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {

// Connected/Disconnected warns bitswap about peer connections.
func (bs *Bitswap) PeerConnected(p peer.ID) {
initialWants := bs.wm.CurrentBroadcastWants()
bs.pm.Connected(p, initialWants)
bs.wm.Connected(p)
bs.engine.PeerConnected(p)
}

// Connected/Disconnected warns bitswap about peer connections.
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.pm.Disconnected(p)
bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}

Expand Down
46 changes: 20 additions & 26 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

var log = logging.Logger("bitswap")

const maxRetries = 10

// MessageNetwork is any network that can connect peers and generate a message
// sender.
type MessageNetwork interface {
Expand All @@ -32,8 +34,6 @@ type MessageQueue struct {

sender bsnet.MessageSender

refcnt int

work chan struct{}
done chan struct{}
}
Expand All @@ -46,22 +46,9 @@ func New(p peer.ID, network MessageNetwork) *MessageQueue {
wl: wantlist.NewThreadSafe(),
network: network,
p: p,
refcnt: 1,
}
}

// RefIncrement increments the refcount for a message queue.
func (mq *MessageQueue) RefIncrement() {
mq.refcnt++
}

// RefDecrement decrements the refcount for a message queue and returns true
// if the refcount is now 0.
func (mq *MessageQueue) RefDecrement() bool {
mq.refcnt--
return mq.refcnt > 0
}

// AddMessage adds new entries to an outgoing message for a given session.
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
if !mq.addEntries(entries, ses) {
Expand All @@ -73,24 +60,31 @@ func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
}
}

// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {

// new peer, we will want to give them our full wantlist
// AddWantlist adds a complete session tracked want list to a message queue
func (mq *MessageQueue) AddWantlist(initialEntries []*wantlist.Entry) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO (maybe later): It would be kind of nice to deduplicate this with AddMessage. As far as I can tell, they do approximately the same thing; they just take different inputs.

if len(initialEntries) > 0 {
fullwantlist := bsmsg.New(true)
if mq.out == nil {
mq.out = bsmsg.New(false)
}

for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
mq.out.AddEntry(e.Cid, e.Priority)
}

select {
case mq.work <- struct{}{}:
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
default:
}
mq.out = fullwantlist
mq.work <- struct{}{}
}
go mq.runQueue(ctx)
}

// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup(ctx context.Context) {
go mq.runQueue(ctx)
}

// Shutdown stops the processing of messages for a message queue.
Expand Down Expand Up @@ -162,7 +156,7 @@ func (mq *MessageQueue) doWork(ctx context.Context) {
}

// send wantlist updates
for { // try to send this message until we fail.
for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
if mq.attemptSendAndRecovery(ctx, wlm) {
return
}
Expand Down
12 changes: 6 additions & 6 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) {
if fmn.messageSenderError == nil {
return fmn.messageSender, nil
} else {
return nil, fmn.messageSenderError
}
return nil, fmn.messageSenderError

}

type fakeMessageSender struct {
Expand Down Expand Up @@ -81,8 +81,8 @@ func TestStartupAndShutdown(t *testing.T) {
ses := testutil.GenerateSessionID()
wl := testutil.GenerateWantlist(10, ses)

messageQueue.Startup(ctx, wl.Entries())

messageQueue.Startup(ctx)
messageQueue.AddWantlist(wl.Entries())
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent for initial wants")
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestSendingMessagesDeduped(t *testing.T) {
ses1 := testutil.GenerateSessionID()
ses2 := testutil.GenerateSessionID()
entries := testutil.GenerateMessageEntries(10, false)
messageQueue.Startup(ctx, nil)
messageQueue.Startup(ctx)

messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(entries, ses2)
Expand All @@ -148,7 +148,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
entries := testutil.GenerateMessageEntries(10, false)
moreEntries := testutil.GenerateMessageEntries(5, false)
secondEntries := append(entries[5:], moreEntries...)
messageQueue.Startup(ctx, nil)
messageQueue.Startup(ctx)

messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(secondEntries, ses2)
Expand Down
Loading