Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ratelimiter: fixing rate limiter use #755

Merged
merged 1 commit into from
Feb 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions blockservice/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,10 @@ func (w *Worker) start(c Config) {
}
})

// reads from |workerChan| until process closes
w.process.Go(func(proc process.Process) {
// reads from |workerChan| until w.process closes
limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers)
limiter.Go(func(proc process.Process) {
ctx := waitable.Context(proc) // shut down in-progress HasBlock when time to die
limiter := ratelimit.NewRateLimiter(process.Background(), c.NumWorkers)
defer limiter.Close()
for {
select {
case <-proc.Closing():
Expand Down
29 changes: 16 additions & 13 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,20 +385,23 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
go func() {
// rate limiting just in case. at most 10 addrs at once.
limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10)

// permute addrs so we try different sets first each time.
for _, i := range rand.Perm(len(remoteAddrs)) {
select {
case <-foundConn: // if one of them succeeded already
break
default:
limiter.Go(func(worker process.Process) {
// permute addrs so we try different sets first each time.
for _, i := range rand.Perm(len(remoteAddrs)) {
select {
case <-foundConn: // if one of them succeeded already
break
case <-worker.Closing(): // our context was cancelled
break
default:
}

workerAddr := remoteAddrs[i] // shadow variable to avoid race
limiter.LimitedGo(func(worker process.Process) {
dialSingleAddr(workerAddr)
})
}

workerAddr := remoteAddrs[i] // shadow variable to avoid race
limiter.Go(func(worker process.Process) {
dialSingleAddr(workerAddr)
})
}
})
}()

// wair fot the results.
Expand Down
31 changes: 20 additions & 11 deletions thirdparty/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,27 @@ func (n *Notifier) StopNotify(e Notifiee) {
// hooks into your object that block you accidentally.
func (n *Notifier) NotifyAll(notify func(Notifiee)) {
n.mu.Lock()
if n.nots != nil { // so that zero-value is ready to be used.
for notifiee := range n.nots {
defer n.mu.Unlock()

if n.nots == nil { // so that zero-value is ready to be used.
return
}

if n.lim == nil { // no rate limit
go notify(notifiee)
} else {
notifiee := notifiee // rebind for data races
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
// no rate limiting.
if n.lim == nil {
for notifiee := range n.nots {
go notify(notifiee)
}
return
}
n.mu.Unlock()

// with rate limiting.
n.lim.Go(func(worker process.Process) {
for notifiee := range n.nots {
notifiee := notifiee // rebind for loop data races
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
})
}