Skip to content

Commit

Permalink
Merge pull request #1858 from ipfs/fix/bitswap-limiter
Browse files Browse the repository at this point in the history
fix panic in bitswap working limit spawning
  • Loading branch information
jbenet committed Oct 19, 2015
2 parents 9ca0be3 + 22f0b87 commit b910d8a
Showing 1 changed file with 26 additions and 22 deletions.
48 changes: 26 additions & 22 deletions exchange/bitswap/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

key "github.com/ipfs/go-ipfs/blocks/key"
Expand Down Expand Up @@ -74,43 +73,48 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {

func (bs *Bitswap) provideWorker(px process.Process) {

limiter := ratelimit.NewRateLimiter(px, provideWorkerMax)
limit := make(chan struct{}, provideWorkerMax)

limitedGoProvide := func(k key.Key, wid int) {
defer func() {
// replace token when done
<-limit
}()
ev := logging.LoggableMap{"ID": wid}
limiter.LimitedGo(func(px process.Process) {

ctx := procctx.OnClosingContext(px) // derive ctx from px
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
ctx := procctx.OnClosingContext(px) // derive ctx from px
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()

ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
defer cancel()
ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
defer cancel()

if err := bs.network.Provide(ctx, k); err != nil {
log.Error(err)
}
})
if err := bs.network.Provide(ctx, k); err != nil {
log.Error(err)
}
}

// worker spawner, reads from bs.provideKeys until it closes, spawning a
// _ratelimited_ number of workers to handle each key.
limiter.Go(func(px process.Process) {
for wid := 2; ; wid++ {
ev := logging.LoggableMap{"ID": 1}
log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
for wid := 2; ; wid++ {
ev := logging.LoggableMap{"ID": 1}
log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)

select {
case <-px.Closing():
return
case k, ok := <-bs.provideKeys:
if !ok {
log.Debug("provideKeys channel closed")
return
}
select {
case <-px.Closing():
return
case k, ok := <-bs.provideKeys:
if !ok {
log.Debug("provideKeys channel closed")
return
}
limitedGoProvide(k, wid)
case limit <- struct{}{}:
go limitedGoProvide(k, wid)
}
}
})
}
}

func (bs *Bitswap) provideCollector(ctx context.Context) {
Expand Down

0 comments on commit b910d8a

Please sign in to comment.