Skip to content

Commit

Permalink
fix: race condition in peer connector / manager interaction (#837)
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored Oct 26, 2023
1 parent 0ba8b2c commit c58d0f5
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
21 changes: 15 additions & 6 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ type PeerConnectionStrategy struct {
paused atomic.Bool
dialTimeout time.Duration
*CommonDiscoveryService
subscriptions []<-chan PeerData
subscriptions []subscription

backoff backoff.BackoffFactory
logger *zap.Logger
}

type subscription struct {
ctx context.Context
ch <-chan PeerData
}

// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func getBackOff() backoff.BackoffFactory {
rngSrc := rand.NewSource(rand.Int63())
Expand Down Expand Up @@ -85,32 +90,36 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerDa
// if not running yet, store the subscription and return
if err := c.ErrOnNotRunning(); err != nil {
c.mux.Lock()
c.subscriptions = append(c.subscriptions, ch)
c.subscriptions = append(c.subscriptions, subscription{ctx, ch})
c.mux.Unlock()
return
}
// if running start a goroutine to consume the subscription
c.WaitGroup().Add(1)
go func() {
defer c.WaitGroup().Done()
c.consumeSubscription(ch)
c.consumeSubscription(subscription{ctx, ch})
}()
}

func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) {
func (c *PeerConnectionStrategy) consumeSubscription(s subscription) {
for {
// for returning from the loop when peerConnector is paused.
select {
case <-c.Context().Done():
return
case <-s.ctx.Done():
return
default:
}
//
if !c.isPaused() {
select {
case <-c.Context().Done():
return
case p, ok := <-ch:
case <-s.ctx.Done():
return
case p, ok := <-s.ch:
if !ok {
return
}
Expand Down Expand Up @@ -166,7 +175,7 @@ func (c *PeerConnectionStrategy) isPaused() bool {
func (c *PeerConnectionStrategy) consumeSubscriptions() {
for _, subs := range c.subscriptions {
c.WaitGroup().Add(1)
go func(s <-chan PeerData) {
go func(s subscription) {
defer c.WaitGroup().Done()
c.consumeSubscription(s)
}(subs)
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) {
}
}
if connectNow {
pm.peerConnector.PushToChan(p)
go pm.peerConnector.PushToChan(p)
}
}

Expand Down

0 comments on commit c58d0f5

Please sign in to comment.