From e4c5794ae8398f1aa3ae84a02f0440e4605cc7a9 Mon Sep 17 00:00:00 2001 From: Jeff Thompson Date: Fri, 7 Jul 2023 11:10:52 +0200 Subject: [PATCH] fix: split monitoring from add current peers Signed-off-by: Jeff Thompson --- notify.go | 9 +++------ pubsub.go | 11 ++++++++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/notify.go b/notify.go index a88ad2f7..9fa7b446 100644 --- a/notify.go +++ b/notify.go @@ -20,9 +20,6 @@ func (p *PubSubNotif) startMonitoring() error { return fmt.Errorf("unable to subscribe to EventBus: %w", err) } - // add current peers - p.addPeers(p.host.Network().Peers()...) - go func() { defer sub.Close() @@ -38,7 +35,7 @@ func (p *PubSubNotif) startMonitoring() error { case event.EvtPeerConnectednessChanged: // send record to connected peer only if evt.Connectedness == network.Connected { - go p.addPeers(evt.Peer) + go p.AddPeers(evt.Peer) } case event.EvtPeerProtocolsUpdated: supportedProtocols := p.rt.Protocols() @@ -47,7 +44,7 @@ func (p *PubSubNotif) startMonitoring() error { for _, addedProtocol := range evt.Added { for _, wantedProtocol := range supportedProtocols { if wantedProtocol == addedProtocol { - go p.addPeers(evt.Peer) + go p.AddPeers(evt.Peer) break protocol_loop } } @@ -69,7 +66,7 @@ func (p *PubSubNotif) isTransient(pid peer.ID) bool { return true } -func (p *PubSubNotif) addPeers(peers ...peer.ID) { +func (p *PubSubNotif) AddPeers(peers ...peer.ID) { p.newPeersPrioLk.RLock() p.newPeersMx.Lock() diff --git a/pubsub.go b/pubsub.go index 9eb30c2b..497c4891 100644 --- a/pubsub.go +++ b/pubsub.go @@ -328,13 +328,18 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option } } + // start monitoring for new peers + notify := (*PubSubNotif)(ps) + if err := notify.startMonitoring(); err != nil { + return nil, fmt.Errorf("unable to start pubsub monitorin: %w", err) + } + ps.val.Start(ps) go ps.processLoop(ctx) - if err := (*PubSubNotif)(ps).startMonitoring(); err != nil { - return nil, fmt.Errorf("unable to start pubsub monitoring: %w", err) - } + // add current peers to notify system + notify.AddPeers(h.Network().Peers()...) return ps, nil }