Skip to content

Commit

Permalink
Merge pull request #755 from jbenet/fix-ratelimiter-use
Browse files Browse the repository at this point in the history
ratelimiter: fixing rate limiter use
  • Loading branch information
jbenet committed Feb 6, 2015
2 parents 8558838 + 91a79bc commit 03c910f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 28 deletions.
7 changes: 3 additions & 4 deletions blockservice/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,10 @@ func (w *Worker) start(c Config) {
}
})

// reads from |workerChan| until process closes
w.process.Go(func(proc process.Process) {
// reads from |workerChan| until w.process closes
limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers)
limiter.Go(func(proc process.Process) {
ctx := waitable.Context(proc) // shut down in-progress HasBlock when time to die
limiter := ratelimit.NewRateLimiter(process.Background(), c.NumWorkers)
defer limiter.Close()
for {
select {
case <-proc.Closing():
Expand Down
29 changes: 16 additions & 13 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,20 +385,23 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
go func() {
// rate limiting just in case. at most 10 addrs at once.
limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10)

// permute addrs so we try different sets first each time.
for _, i := range rand.Perm(len(remoteAddrs)) {
select {
case <-foundConn: // if one of them succeeded already
break
default:
limiter.Go(func(worker process.Process) {
// permute addrs so we try different sets first each time.
for _, i := range rand.Perm(len(remoteAddrs)) {
select {
case <-foundConn: // if one of them succeeded already
break
case <-worker.Closing(): // our context was cancelled
break
default:
}

workerAddr := remoteAddrs[i] // shadow variable to avoid race
limiter.LimitedGo(func(worker process.Process) {
dialSingleAddr(workerAddr)
})
}

workerAddr := remoteAddrs[i] // shadow variable to avoid race
limiter.Go(func(worker process.Process) {
dialSingleAddr(workerAddr)
})
}
})
}()

// wair fot the results.
Expand Down
31 changes: 20 additions & 11 deletions thirdparty/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,27 @@ func (n *Notifier) StopNotify(e Notifiee) {
// hooks into your object that block you accidentally.
func (n *Notifier) NotifyAll(notify func(Notifiee)) {
n.mu.Lock()
if n.nots != nil { // so that zero-value is ready to be used.
for notifiee := range n.nots {
defer n.mu.Unlock()

if n.nots == nil { // so that zero-value is ready to be used.
return
}

if n.lim == nil { // no rate limit
go notify(notifiee)
} else {
notifiee := notifiee // rebind for data races
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
// no rate limiting.
if n.lim == nil {
for notifiee := range n.nots {
go notify(notifiee)
}
return
}
n.mu.Unlock()

// with rate limiting.
n.lim.Go(func(worker process.Process) {
for notifiee := range n.nots {
notifiee := notifiee // rebind for loop data races
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
})
}

0 comments on commit 03c910f

Please sign in to comment.