From c68a035d71db0eb2c430393c47875db9a5eefc53 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 2 May 2024 19:56:00 +0530 Subject: [PATCH] simplify connectedness events --- p2p/net/swarm/swarm.go | 104 +++++++++++++---------------------------- 1 file changed, 33 insertions(+), 71 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 20006fbb90..3242bf3076 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -158,9 +158,8 @@ type Swarm struct { conns struct { sync.RWMutex - m map[peer.ID][]*Conn - connectednessEventQueue map[peer.ID][]network.Connectedness - lastConnectednessEvent map[peer.ID]network.Connectedness + m map[peer.ID][]*Conn + connectednessEvents chan peer.ID } listeners struct { @@ -240,8 +239,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts } s.conns.m = make(map[peer.ID][]*Conn) - s.conns.connectednessEventQueue = make(map[peer.ID][]network.Connectedness) - s.conns.lastConnectednessEvent = make(map[peer.ID]network.Connectedness) + s.conns.connectednessEvents = make(chan peer.ID, 32) s.listeners.m = make(map[transport.Listener]struct{}) s.transports.m = make(map[int]transport.Transport) s.notifs.m = make(map[network.Notifiee]struct{}) @@ -308,17 +306,10 @@ func (s *Swarm) close() { // Wait for everything to finish. s.refs.Wait() - close(s.connectednessEventCh) + close(s.conns.connectednessEvents) <-s.connectednessEmitterDone s.emitter.Close() - // Remove the connectedness map only after we have closed the connection and sent all the disconnection - // events - s.conns.Lock() - s.conns.connectednessEventQueue = nil - s.conns.lastConnectednessEvent = nil - s.conns.Unlock() - // Now close out any transports (if necessary). Do this after closing // all connections/listeners. s.transports.Lock() @@ -402,8 +393,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.streams.m = make(map[*Stream]struct{}) s.conns.m[p] = append(s.conns.m[p], c) - s.maybeEnqueueConnectednessUnlocked(p) - // Add two swarm refs: // * One will be decremented after the close notifications fire in Conn.doClose // * The other will be decremented when Conn.start exits. @@ -414,6 +403,13 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.notifyLk.Lock() s.conns.Unlock() + // Block this goroutine till this request is enqueued. + // This ensures that there are only a finite number of goroutines that are waiting to send + // the connectedness event on the disconnection side in swarm.removeConn. + // This is so because the goroutine to enqueue disconnection event can only be started + // from either a subscriber or a notifier or after calling c.start + s.conns.connectednessEvents <- p + if !isLimited { // Notify goroutines waiting for a direct connection // @@ -790,68 +786,34 @@ func (s *Swarm) removeConn(c *Conn) { } } } - s.maybeEnqueueConnectednessUnlocked(p) s.conns.Unlock() -} - -func (s *Swarm) lastConnectednessEventUnlocked(p peer.ID) network.Connectedness { - events := s.conns.connectednessEventQueue[p] - if len(events) > 0 { - return events[len(events)-1] - } - return s.conns.lastConnectednessEvent[p] -} - -func (s *Swarm) maybeEnqueueConnectednessUnlocked(p peer.ID) { - oldState := s.lastConnectednessEventUnlocked(p) - newState := s.connectednessUnlocked(p) - if oldState != newState { - if s.conns.connectednessEventQueue != nil { - s.conns.connectednessEventQueue[p] = append(s.conns.connectednessEventQueue[p], newState) - select { - case s.connectednessEventCh <- struct{}{}: - default: - } - } else { - log.Errorf("SWARM BUG: nil connectedness map") - } - } + // Do this in a separate go routine to not block the caller. + // This ensures that if a event subscriber closes the connection from the subscription goroutine + // this doesn't deadlock + s.refs.Add(1) + go func() { + defer s.refs.Done() + s.conns.connectednessEvents <- p + }() } func (s *Swarm) connectednessEventEmitter() { defer close(s.connectednessEmitterDone) - for range s.connectednessEventCh { - for { - var c network.Connectedness - var peer peer.ID - s.conns.Lock() - for p, v := range s.conns.connectednessEventQueue { - if len(v) == 0 { - // this shouldn't happen - delete(s.conns.connectednessEventQueue, p) - log.Errorf("SWARM BUG: empty connectedness event slice %v %v", p, v) - continue - } - c = v[0] - peer = p - s.conns.connectednessEventQueue[p] = v[1:] - if len(s.conns.connectednessEventQueue[p]) == 0 { - delete(s.conns.connectednessEventQueue, p) - } - if c == network.NotConnected { - delete(s.conns.lastConnectednessEvent, p) - } else { - s.conns.lastConnectednessEvent[p] = c - } - break - } - s.conns.Unlock() - if peer == "" { - break - } + lastConnectednessEvents := make(map[peer.ID]network.Connectedness) + for p := range s.conns.connectednessEvents { + s.conns.Lock() + oldState := lastConnectednessEvents[p] + newState := s.connectednessUnlocked(p) + if newState != network.NotConnected { + lastConnectednessEvents[p] = newState + } else { + delete(lastConnectednessEvents, p) + } + s.conns.Unlock() + if newState != oldState { s.emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: peer, - Connectedness: c, + Peer: p, + Connectedness: newState, }) } }