Skip to content

Commit

Permalink
fix(notifications) prevent deadlock when context cancelled early
Browse files Browse the repository at this point in the history
+ test(notifications)

cc @whyrusleeping @jbenet

License: MIT
Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
  • Loading branch information
Brian Tiger Chow committed Nov 26, 2014
1 parent ffb10e9 commit 11bb58e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
8 changes: 4 additions & 4 deletions exchange/bitswap/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ func (ps *impl) Shutdown() {
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {

blocksCh := make(chan *blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
if len(keys) == 0 {
close(blocksCh)
return blocksCh
}
valuesCh := ps.wrapped.SubOnceEach(toStrings(keys)...)
ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
go func() {
defer func() {
close(blocksCh)
}()
defer close(blocksCh)
defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization
for {
select {
case <-ctx.Done():
Expand Down
30 changes: 30 additions & 0 deletions exchange/bitswap/notifications/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
blocks "github.com/jbenet/go-ipfs/blocks"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
"github.com/jbenet/go-ipfs/util"
)

func TestDuplicates(t *testing.T) {
Expand Down Expand Up @@ -96,6 +98,34 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {
assertBlockChannelNil(t, blockChannel)
}

func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {

g := blocksutil.NewBlockGenerator()
ctx, cancel := context.WithCancel(context.Background())
n := New()
defer n.Shutdown()

t.Log("generate a large number of blocks. exceed default buffer")
bs := g.Blocks(1000)
ks := func() []util.Key {
var keys []util.Key
for _, b := range bs {
keys = append(keys, b.Key())
}
return keys
}()

_ = n.Subscribe(ctx, ks...) // ignore received channel

t.Log("cancel context before any blocks published")
cancel()
for _, b := range bs {
n.Publish(b)
}

t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
}

func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) {
_, ok := <-blockChannel
if ok {
Expand Down

0 comments on commit 11bb58e

Please sign in to comment.