diff --git a/chain/exchange/peer_tracker.go b/chain/exchange/peer_tracker.go index 902baadcee..835a5b8a47 100644 --- a/chain/exchange/peer_tracker.go +++ b/chain/exchange/peer_tracker.go @@ -38,20 +38,26 @@ func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeer pmgr: pmgr, } - sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer)) + evtSub, err := h.EventBus().Subscribe(new(peermgr.FilPeerEvt)) if err != nil { panic(err) } go func() { - for newPeer := range sub.Out() { - bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id) + for evt := range evtSub.Out() { + pEvt := evt.(peermgr.FilPeerEvt) + switch pEvt.Type { + case peermgr.AddFilPeerEvt: + bsPt.addPeer(pEvt.ID) + case peermgr.RemoveFilPeerEvt: + bsPt.removePeer(pEvt.ID) + } } }() lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { - return sub.Close() + return evtSub.Close() }, }) diff --git a/lib/peermgr/peermgr.go b/lib/peermgr/peermgr.go index 2fe54caeaf..ee158cc040 100644 --- a/lib/peermgr/peermgr.go +++ b/lib/peermgr/peermgr.go @@ -53,16 +53,24 @@ type PeerMgr struct { h host.Host dht *dht.IpfsDHT - notifee *net.NotifyBundle - filPeerEmitter event.Emitter + notifee *net.NotifyBundle + emitter event.Emitter done chan struct{} } -type NewFilPeer struct { - Id peer.ID +type FilPeerEvt struct { + Type FilPeerEvtType + ID peer.ID } +type FilPeerEvtType int + +const ( + AddFilPeerEvt FilPeerEvtType = iota + RemoveFilPeerEvt +) + func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) { pm := &PeerMgr{ h: h, @@ -77,16 +85,16 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes done: make(chan struct{}), } - emitter, err := h.EventBus().Emitter(new(NewFilPeer)) + emitter, err := h.EventBus().Emitter(new(FilPeerEvt)) if err != nil { - return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err) + return nil, xerrors.Errorf("creating FilPeerEvt emitter: %w", err) } - pm.filPeerEmitter = emitter + pm.emitter = emitter lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return multierr.Combine( - pm.filPeerEmitter.Close(), + pm.emitter.Close(), pm.Stop(ctx), ) }, @@ -104,7 +112,7 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes } func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { - _ = pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck + _ = pmgr.emitter.Emit(FilPeerEvt{Type: AddFilPeerEvt, ID: p}) //nolint:errcheck pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() pmgr.peers[p] = time.Duration(0) @@ -127,10 +135,19 @@ func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) { } func (pmgr *PeerMgr) Disconnect(p peer.ID) { + disconnected := false + if pmgr.h.Network().Connectedness(p) == net.NotConnected { pmgr.peersLk.Lock() - defer pmgr.peersLk.Unlock() - delete(pmgr.peers, p) + _, disconnected = pmgr.peers[p] + if disconnected { + delete(pmgr.peers, p) + } + pmgr.peersLk.Unlock() + } + + if disconnected { + _ = pmgr.emitter.Emit(FilPeerEvt{Type: RemoveFilPeerEvt, ID: p}) //nolint:errcheck } }