diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 120a112f6..a4dbd2c69 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -35,12 +35,17 @@ type PeerConnectionStrategy struct { paused atomic.Bool dialTimeout time.Duration *CommonDiscoveryService - subscriptions []<-chan PeerData + subscriptions []subscription backoff backoff.BackoffFactory logger *zap.Logger } +type subscription struct { + ctx context.Context + ch <-chan PeerData +} + // backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer func getBackOff() backoff.BackoffFactory { rngSrc := rand.NewSource(rand.Int63()) @@ -85,7 +90,7 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerDa // if not running yet, store the subscription and return if err := c.ErrOnNotRunning(); err != nil { c.mux.Lock() - c.subscriptions = append(c.subscriptions, ch) + c.subscriptions = append(c.subscriptions, subscription{ctx, ch}) c.mux.Unlock() return } @@ -93,16 +98,18 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerDa c.WaitGroup().Add(1) go func() { defer c.WaitGroup().Done() - c.consumeSubscription(ch) + c.consumeSubscription(subscription{ctx, ch}) }() } -func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) { +func (c *PeerConnectionStrategy) consumeSubscription(s subscription) { for { // for returning from the loop when peerConnector is paused. select { case <-c.Context().Done(): return + case <-s.ctx.Done(): + return default: } // @@ -110,7 +117,9 @@ func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) { select { case <-c.Context().Done(): return - case p, ok := <-ch: + case <-s.ctx.Done(): + return + case p, ok := <-s.ch: if !ok { return } @@ -166,7 +175,7 @@ func (c *PeerConnectionStrategy) isPaused() bool { func (c *PeerConnectionStrategy) consumeSubscriptions() { for _, subs := range c.subscriptions { c.WaitGroup().Add(1) - go func(s <-chan PeerData) { + go func(s subscription) { defer c.WaitGroup().Done() c.consumeSubscription(s) }(subs) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index a99b2cfc2..7fbd722a3 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -339,7 +339,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { } } if connectNow { - pm.peerConnector.PushToChan(p) + go pm.peerConnector.PushToChan(p) } }