From 7aec500384b58016245a20e22011f03630587c61 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 6 Nov 2020 18:26:14 +0200 Subject: [PATCH 1/2] emit event for peer disconnectionsa and act upon them in the blocksync peer tracker --- chain/exchange/peer_tracker.go | 21 ++++++++++++++++++--- lib/peermgr/peermgr.go | 33 +++++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/chain/exchange/peer_tracker.go b/chain/exchange/peer_tracker.go index 902baadcee0..cc8bd4be96f 100644 --- a/chain/exchange/peer_tracker.go +++ b/chain/exchange/peer_tracker.go @@ -11,6 +11,7 @@ import ( host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/fx" + "go.uber.org/multierr" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/lib/peermgr" @@ -38,20 +39,34 @@ func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeer pmgr: pmgr, } - sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer)) + addSub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer)) if err != nil { panic(err) } go func() { - for newPeer := range sub.Out() { + for newPeer := range addSub.Out() { bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id) } }() + rmSub, err := h.EventBus().Subscribe(new(peermgr.RemoveFilPeer)) + if err != nil { + panic(err) + } + + go func() { + for rmPeer := range rmSub.Out() { + bsPt.removePeer(rmPeer.(peermgr.RemoveFilPeer).Id) + } + }() + lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { - return sub.Close() + return multierr.Combine( + addSub.Close(), + rmSub.Close(), + ) }, }) diff --git a/lib/peermgr/peermgr.go b/lib/peermgr/peermgr.go index 2fe54caeaf3..1847c11f803 100644 --- a/lib/peermgr/peermgr.go +++ b/lib/peermgr/peermgr.go @@ -54,7 +54,8 @@ type PeerMgr struct { dht *dht.IpfsDHT notifee *net.NotifyBundle - filPeerEmitter event.Emitter + addPeerEmitter event.Emitter + rmPeerEmitter event.Emitter done chan struct{} } @@ -63,6 +64,10 @@ type NewFilPeer struct { Id peer.ID } +type RemoveFilPeer struct { + Id peer.ID +} + func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) { pm := &PeerMgr{ h: h, @@ -81,12 +86,19 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes if err != nil { return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err) } - pm.filPeerEmitter = emitter + pm.addPeerEmitter = emitter + + emitter, err = h.EventBus().Emitter(new(RemoveFilPeer)) + if err != nil { + return nil, xerrors.Errorf("creating RemoveFilPeer emitter: %w", err) + } + pm.rmPeerEmitter = emitter lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return multierr.Combine( - pm.filPeerEmitter.Close(), + pm.addPeerEmitter.Close(), + pm.rmPeerEmitter.Close(), pm.Stop(ctx), ) }, @@ -104,7 +116,7 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes } func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { - _ = pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck + _ = pmgr.addPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() pmgr.peers[p] = time.Duration(0) @@ -127,10 +139,19 @@ func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) { } func (pmgr *PeerMgr) Disconnect(p peer.ID) { + disconnected := false + if pmgr.h.Network().Connectedness(p) == net.NotConnected { pmgr.peersLk.Lock() - defer pmgr.peersLk.Unlock() - delete(pmgr.peers, p) + _, disconnected = pmgr.peers[p] + if disconnected { + delete(pmgr.peers, p) + } + pmgr.peersLk.Unlock() + } + + if disconnected { + _ = pmgr.rmPeerEmitter.Emit(RemoveFilPeer{Id: p}) //nolint:errcheck } } From ef444676c2ecd40cf8483c2a614227ae0a8d987f Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 20 Nov 2020 16:15:44 +0200 Subject: [PATCH 2/2] combine add and remove evts and use a single emitter to reduce likelihood of races --- chain/exchange/peer_tracker.go | 29 +++++++++----------------- lib/peermgr/peermgr.go | 38 +++++++++++++++------------------- 2 files changed, 27 insertions(+), 40 deletions(-) diff --git a/chain/exchange/peer_tracker.go b/chain/exchange/peer_tracker.go index cc8bd4be96f..835a5b8a479 100644 --- a/chain/exchange/peer_tracker.go +++ b/chain/exchange/peer_tracker.go @@ -11,7 +11,6 @@ import ( host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/fx" - "go.uber.org/multierr" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/lib/peermgr" @@ -39,34 +38,26 @@ func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeer pmgr: pmgr, } - addSub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer)) + evtSub, err := h.EventBus().Subscribe(new(peermgr.FilPeerEvt)) if err != nil { panic(err) } go func() { - for newPeer := range addSub.Out() { - bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id) - } - }() - - rmSub, err := h.EventBus().Subscribe(new(peermgr.RemoveFilPeer)) - if err != nil { - panic(err) - } - - go func() { - for rmPeer := range rmSub.Out() { - bsPt.removePeer(rmPeer.(peermgr.RemoveFilPeer).Id) + for evt := range evtSub.Out() { + pEvt := evt.(peermgr.FilPeerEvt) + switch pEvt.Type { + case peermgr.AddFilPeerEvt: + bsPt.addPeer(pEvt.ID) + case peermgr.RemoveFilPeerEvt: + bsPt.removePeer(pEvt.ID) + } } }() lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { - return multierr.Combine( - addSub.Close(), - rmSub.Close(), - ) + return evtSub.Close() }, }) diff --git a/lib/peermgr/peermgr.go b/lib/peermgr/peermgr.go index 1847c11f803..ee158cc040a 100644 --- a/lib/peermgr/peermgr.go +++ b/lib/peermgr/peermgr.go @@ -53,20 +53,23 @@ type PeerMgr struct { h host.Host dht *dht.IpfsDHT - notifee *net.NotifyBundle - addPeerEmitter event.Emitter - rmPeerEmitter event.Emitter + notifee *net.NotifyBundle + emitter event.Emitter done chan struct{} } -type NewFilPeer struct { - Id peer.ID +type FilPeerEvt struct { + Type FilPeerEvtType + ID peer.ID } -type RemoveFilPeer struct { - Id peer.ID -} +type FilPeerEvtType int + +const ( + AddFilPeerEvt FilPeerEvtType = iota + RemoveFilPeerEvt +) func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) { pm := &PeerMgr{ @@ -82,23 +85,16 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes done: make(chan struct{}), } - emitter, err := h.EventBus().Emitter(new(NewFilPeer)) - if err != nil { - return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err) - } - pm.addPeerEmitter = emitter - - emitter, err = h.EventBus().Emitter(new(RemoveFilPeer)) + emitter, err := h.EventBus().Emitter(new(FilPeerEvt)) if err != nil { - return nil, xerrors.Errorf("creating RemoveFilPeer emitter: %w", err) + return nil, xerrors.Errorf("creating FilPeerEvt emitter: %w", err) } - pm.rmPeerEmitter = emitter + pm.emitter = emitter lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return multierr.Combine( - pm.addPeerEmitter.Close(), - pm.rmPeerEmitter.Close(), + pm.emitter.Close(), pm.Stop(ctx), ) }, @@ -116,7 +112,7 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes } func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { - _ = pmgr.addPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck + _ = pmgr.emitter.Emit(FilPeerEvt{Type: AddFilPeerEvt, ID: p}) //nolint:errcheck pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() pmgr.peers[p] = time.Duration(0) @@ -151,7 +147,7 @@ func (pmgr *PeerMgr) Disconnect(p peer.ID) { } if disconnected { - _ = pmgr.rmPeerEmitter.Emit(RemoveFilPeer{Id: p}) //nolint:errcheck + _ = pmgr.emitter.Emit(FilPeerEvt{Type: RemoveFilPeerEvt, ID: p}) //nolint:errcheck } }