Skip to content

Commit

Permalink
combine add and remove evts and use a single emitter
Browse files Browse the repository at this point in the history
to reduce likelihood of races
  • Loading branch information
vyzo committed Nov 20, 2020
1 parent 7aec500 commit ef44467
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 40 deletions.
29 changes: 10 additions & 19 deletions chain/exchange/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/fx"
"go.uber.org/multierr"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/peermgr"
Expand Down Expand Up @@ -39,34 +38,26 @@ func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeer
pmgr: pmgr,
}

addSub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer))
evtSub, err := h.EventBus().Subscribe(new(peermgr.FilPeerEvt))
if err != nil {
panic(err)
}

go func() {
for newPeer := range addSub.Out() {
bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id)
}
}()

rmSub, err := h.EventBus().Subscribe(new(peermgr.RemoveFilPeer))
if err != nil {
panic(err)
}

go func() {
for rmPeer := range rmSub.Out() {
bsPt.removePeer(rmPeer.(peermgr.RemoveFilPeer).Id)
for evt := range evtSub.Out() {
pEvt := evt.(peermgr.FilPeerEvt)
switch pEvt.Type {
case peermgr.AddFilPeerEvt:
bsPt.addPeer(pEvt.ID)
case peermgr.RemoveFilPeerEvt:
bsPt.removePeer(pEvt.ID)
}
}
}()

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return multierr.Combine(
addSub.Close(),
rmSub.Close(),
)
return evtSub.Close()
},
})

Expand Down
38 changes: 17 additions & 21 deletions lib/peermgr/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,23 @@ type PeerMgr struct {
h host.Host
dht *dht.IpfsDHT

notifee *net.NotifyBundle
addPeerEmitter event.Emitter
rmPeerEmitter event.Emitter
notifee *net.NotifyBundle
emitter event.Emitter

done chan struct{}
}

type NewFilPeer struct {
Id peer.ID
type FilPeerEvt struct {
Type FilPeerEvtType
ID peer.ID
}

type RemoveFilPeer struct {
Id peer.ID
}
type FilPeerEvtType int

const (
AddFilPeerEvt FilPeerEvtType = iota
RemoveFilPeerEvt
)

func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) {
pm := &PeerMgr{
Expand All @@ -82,23 +85,16 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes

done: make(chan struct{}),
}
emitter, err := h.EventBus().Emitter(new(NewFilPeer))
if err != nil {
return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err)
}
pm.addPeerEmitter = emitter

emitter, err = h.EventBus().Emitter(new(RemoveFilPeer))
emitter, err := h.EventBus().Emitter(new(FilPeerEvt))
if err != nil {
return nil, xerrors.Errorf("creating RemoveFilPeer emitter: %w", err)
return nil, xerrors.Errorf("creating FilPeerEvt emitter: %w", err)
}
pm.rmPeerEmitter = emitter
pm.emitter = emitter

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return multierr.Combine(
pm.addPeerEmitter.Close(),
pm.rmPeerEmitter.Close(),
pm.emitter.Close(),
pm.Stop(ctx),
)
},
Expand All @@ -116,7 +112,7 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
}

func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
_ = pmgr.addPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck
_ = pmgr.emitter.Emit(FilPeerEvt{Type: AddFilPeerEvt, ID: p}) //nolint:errcheck
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
pmgr.peers[p] = time.Duration(0)
Expand Down Expand Up @@ -151,7 +147,7 @@ func (pmgr *PeerMgr) Disconnect(p peer.ID) {
}

if disconnected {
_ = pmgr.rmPeerEmitter.Emit(RemoveFilPeer{Id: p}) //nolint:errcheck
_ = pmgr.emitter.Emit(FilPeerEvt{Type: RemoveFilPeerEvt, ID: p}) //nolint:errcheck
}
}

Expand Down

0 comments on commit ef44467

Please sign in to comment.