Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#72 from ipfs/bugs/racy-wantlist-ha…
Browse files Browse the repository at this point in the history
…ndling

fix(wantlist): remove races on setup

This commit was moved from ipfs/go-bitswap@472a8ab
  • Loading branch information
Stebalien authored Feb 20, 2019
2 parents 75d4243 + 341c218 commit 386036d
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 201 deletions.
6 changes: 2 additions & 4 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

bs.wm.SetDelegate(bs.pm)
bs.pm.Startup()
bs.wm.Startup()
bs.pqm.Startup()
network.SetDelegate(bs)
Expand Down Expand Up @@ -361,14 +360,13 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {

// Connected/Disconnected warns bitswap about peer connections.
func (bs *Bitswap) PeerConnected(p peer.ID) {
initialWants := bs.wm.CurrentBroadcastWants()
bs.pm.Connected(p, initialWants)
bs.wm.Connected(p)
bs.engine.PeerConnected(p)
}

// Connected/Disconnected warns bitswap about peer connections.
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.pm.Disconnected(p)
bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}

Expand Down
46 changes: 20 additions & 26 deletions bitswap/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

var log = logging.Logger("bitswap")

const maxRetries = 10

// MessageNetwork is any network that can connect peers and generate a message
// sender.
type MessageNetwork interface {
Expand All @@ -32,8 +34,6 @@ type MessageQueue struct {

sender bsnet.MessageSender

refcnt int

work chan struct{}
done chan struct{}
}
Expand All @@ -46,22 +46,9 @@ func New(p peer.ID, network MessageNetwork) *MessageQueue {
wl: wantlist.NewThreadSafe(),
network: network,
p: p,
refcnt: 1,
}
}

// RefIncrement increments the refcount for a message queue.
func (mq *MessageQueue) RefIncrement() {
mq.refcnt++
}

// RefDecrement decrements the refcount for a message queue and returns true
// if the refcount is now 0.
func (mq *MessageQueue) RefDecrement() bool {
mq.refcnt--
return mq.refcnt > 0
}

// AddMessage adds new entries to an outgoing message for a given session.
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
if !mq.addEntries(entries, ses) {
Expand All @@ -73,24 +60,31 @@ func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
}
}

// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {

// new peer, we will want to give them our full wantlist
// AddWantlist adds a complete session tracked want list to a message queue
func (mq *MessageQueue) AddWantlist(initialEntries []*wantlist.Entry) {
if len(initialEntries) > 0 {
fullwantlist := bsmsg.New(true)
if mq.out == nil {
mq.out = bsmsg.New(false)
}

for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
mq.out.AddEntry(e.Cid, e.Priority)
}

select {
case mq.work <- struct{}{}:
default:
}
mq.out = fullwantlist
mq.work <- struct{}{}
}
go mq.runQueue(ctx)
}

// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup(ctx context.Context) {
go mq.runQueue(ctx)
}

// Shutdown stops the processing of messages for a message queue.
Expand Down Expand Up @@ -162,7 +156,7 @@ func (mq *MessageQueue) doWork(ctx context.Context) {
}

// send wantlist updates
for { // try to send this message until we fail.
for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
if mq.attemptSendAndRecovery(ctx, wlm) {
return
}
Expand Down
12 changes: 6 additions & 6 deletions bitswap/messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) {
if fmn.messageSenderError == nil {
return fmn.messageSender, nil
} else {
return nil, fmn.messageSenderError
}
return nil, fmn.messageSenderError

}

type fakeMessageSender struct {
Expand Down Expand Up @@ -81,8 +81,8 @@ func TestStartupAndShutdown(t *testing.T) {
ses := testutil.GenerateSessionID()
wl := testutil.GenerateWantlist(10, ses)

messageQueue.Startup(ctx, wl.Entries())

messageQueue.Startup(ctx)
messageQueue.AddWantlist(wl.Entries())
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent for initial wants")
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestSendingMessagesDeduped(t *testing.T) {
ses1 := testutil.GenerateSessionID()
ses2 := testutil.GenerateSessionID()
entries := testutil.GenerateMessageEntries(10, false)
messageQueue.Startup(ctx, nil)
messageQueue.Startup(ctx)

messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(entries, ses2)
Expand All @@ -148,7 +148,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
entries := testutil.GenerateMessageEntries(10, false)
moreEntries := testutil.GenerateMessageEntries(5, false)
secondEntries := append(entries[5:], moreEntries...)
messageQueue.Startup(ctx, nil)
messageQueue.Startup(ctx)

messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(secondEntries, ses2)
Expand Down
Loading

0 comments on commit 386036d

Please sign in to comment.