From 91a79bc203218b76a9ac8c3e4ca6d56eb39875f5 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 6 Feb 2015 10:59:03 -0800 Subject: [PATCH] ratelimiter: fixing rate limiter use Use of the ratelimiter should be conscious of the ratelimiter's potential closing. any loops that add work to ratelimiter should (a) only do so if the rate limiter is not closed, or (b) prevent limiter while work is added (i.e. use limiter.Go(addWorkHere)) --- blockservice/worker/worker.go | 7 +++---- p2p/net/swarm/swarm_dial.go | 29 ++++++++++++++++------------- thirdparty/notifier/notifier.go | 31 ++++++++++++++++++++----------- 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/blockservice/worker/worker.go b/blockservice/worker/worker.go index 77097bef65b..ee45d32ad55 100644 --- a/blockservice/worker/worker.go +++ b/blockservice/worker/worker.go @@ -117,11 +117,10 @@ func (w *Worker) start(c Config) { } }) - // reads from |workerChan| until process closes - w.process.Go(func(proc process.Process) { + // reads from |workerChan| until w.process closes + limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers) + limiter.Go(func(proc process.Process) { ctx := waitable.Context(proc) // shut down in-progress HasBlock when time to die - limiter := ratelimit.NewRateLimiter(process.Background(), c.NumWorkers) - defer limiter.Close() for { select { case <-proc.Closing(): diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 39fd0297d44..696c903e93a 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -385,20 +385,23 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote go func() { // rate limiting just in case. at most 10 addrs at once. limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10) - - // permute addrs so we try different sets first each time. - for _, i := range rand.Perm(len(remoteAddrs)) { - select { - case <-foundConn: // if one of them succeeded already - break - default: + limiter.Go(func(worker process.Process) { + // permute addrs so we try different sets first each time. + for _, i := range rand.Perm(len(remoteAddrs)) { + select { + case <-foundConn: // if one of them succeeded already + break + case <-worker.Closing(): // our context was cancelled + break + default: + } + + workerAddr := remoteAddrs[i] // shadow variable to avoid race + limiter.LimitedGo(func(worker process.Process) { + dialSingleAddr(workerAddr) + }) } - - workerAddr := remoteAddrs[i] // shadow variable to avoid race - limiter.Go(func(worker process.Process) { - dialSingleAddr(workerAddr) - }) - } + }) }() // wair fot the results. diff --git a/thirdparty/notifier/notifier.go b/thirdparty/notifier/notifier.go index c3721138180..aff69409356 100644 --- a/thirdparty/notifier/notifier.go +++ b/thirdparty/notifier/notifier.go @@ -120,18 +120,27 @@ func (n *Notifier) StopNotify(e Notifiee) { // hooks into your object that block you accidentally. func (n *Notifier) NotifyAll(notify func(Notifiee)) { n.mu.Lock() - if n.nots != nil { // so that zero-value is ready to be used. - for notifiee := range n.nots { + defer n.mu.Unlock() + + if n.nots == nil { // so that zero-value is ready to be used. + return + } - if n.lim == nil { // no rate limit - go notify(notifiee) - } else { - notifiee := notifiee // rebind for data races - n.lim.LimitedGo(func(worker process.Process) { - notify(notifiee) - }) - } + // no rate limiting. + if n.lim == nil { + for notifiee := range n.nots { + go notify(notifiee) } + return } - n.mu.Unlock() + + // with rate limiting. + n.lim.Go(func(worker process.Process) { + for notifiee := range n.nots { + notifiee := notifiee // rebind for loop data races + n.lim.LimitedGo(func(worker process.Process) { + notify(notifiee) + }) + } + }) }