diff --git a/nodebuilder/share/config.go b/nodebuilder/share/config.go index 5d66ea7691..1d984b6dca 100644 --- a/nodebuilder/share/config.go +++ b/nodebuilder/share/config.go @@ -26,7 +26,7 @@ type Config struct { PeerManagerParams peers.Parameters LightAvailability light.Parameters `toml:",omitempty"` - Discovery discovery.Parameters + Discovery *discovery.Parameters } func DefaultConfig(tp node.Type) Config { diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index 944b7f2d23..aa2ac5bec1 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -17,18 +17,27 @@ import ( "github.com/celestiaorg/celestia-node/share/getters" "github.com/celestiaorg/celestia-node/share/ipld" disc "github.com/celestiaorg/celestia-node/share/p2p/discovery" + "github.com/celestiaorg/celestia-node/share/p2p/peers" ) -func newDiscovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery { +const ( + // fullNodesTag is the tag used to identify full nodes in the discovery service. + fullNodesTag = "full" +) + +func newDiscovery(cfg *disc.Parameters, +) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) { return func( r routing.ContentRouting, h host.Host, - ) *disc.Discovery { + manager *peers.Manager, + ) (*disc.Discovery, error) { return disc.NewDiscovery( + cfg, h, routingdisc.NewRoutingDiscovery(r), - disc.WithPeersLimit(cfg.Discovery.PeersLimit), - disc.WithAdvertiseInterval(cfg.Discovery.AdvertiseInterval), + fullNodesTag, + disc.WithOnPeersUpdate(manager.UpdateFullNodePool), ) } } diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index b2e6eed4e5..9fbd262a32 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -37,7 +37,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option fx.Provide(newModule), fx.Invoke(func(disc *disc.Discovery) {}), fx.Provide(fx.Annotate( - newDiscovery(*cfg), + newDiscovery(cfg.Discovery), fx.OnStart(func(ctx context.Context, d *disc.Discovery) error { return d.Start(ctx) }), @@ -147,7 +147,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option fx.Provide( func( params peers.Parameters, - discovery *disc.Discovery, host host.Host, connGater *conngater.BasicConnectionGater, shrexSub *shrexsub.PubSub, @@ -158,7 +157,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option ) (*peers.Manager, error) { return peers.NewManager( params, - discovery, host, connGater, peers.WithShrexSubPools(shrexSub, headerSub), diff --git a/nodebuilder/tests/p2p_test.go b/nodebuilder/tests/p2p_test.go index d05846f40c..9fe63fb931 100644 --- a/nodebuilder/tests/p2p_test.go +++ b/nodebuilder/tests/p2p_test.go @@ -186,9 +186,7 @@ func TestRestartNodeDiscovery(t *testing.T) { sw.Disconnect(t, nodes[0], nodes[1]) // create and start one more FN with disabled discovery - fullCfg.Share.Discovery.PeersLimit = 0 disabledDiscoveryFN := sw.NewNodeWithConfig(node.Full, fullCfg, nodesConfig) - err = disabledDiscoveryFN.Start(ctx) require.NoError(t, err) // ensure that the FN with disabled discovery is discovered by both of the diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index a636b26ea6..dd21d398c2 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -41,12 +41,16 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { } func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability { - disc := discovery.NewDiscovery( + params := discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + params.PeersLimit = 10 + disc, err := discovery.NewDiscovery( + params, nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), - discovery.WithAdvertiseInterval(time.Second), - discovery.WithPeersLimit(10), + "full", ) + require.NoError(t, err) store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore()) require.NoError(t, err) err = store.Start(context.Background()) diff --git a/share/getters/shrex_test.go b/share/getters/shrex_test.go index 9b97a4dd5a..85878b204a 100644 --- a/share/getters/shrex_test.go +++ b/share/getters/shrex_test.go @@ -9,9 +9,7 @@ import ( "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/host" - routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/net/conngater" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" @@ -25,7 +23,6 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/edstest" - "github.com/celestiaorg/celestia-node/share/p2p/discovery" "github.com/celestiaorg/celestia-node/share/p2p/peers" "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" @@ -210,18 +207,12 @@ func testManager( return nil, err } - disc := discovery.NewDiscovery(nil, - routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), - discovery.WithPeersLimit(10), - discovery.WithAdvertiseInterval(time.Second), - ) connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore())) if err != nil { return nil, err } manager, err := peers.NewManager( peers.DefaultParameters(), - disc, host, connGater, peers.WithShrexSubPools(shrexSub, headerSub), diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index f24df2c88b..0f44d42dbe 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -19,9 +19,6 @@ import ( var log = logging.Logger("share/discovery") const ( - // rendezvousPoint is the namespace where peers advertise and discover each other. - rendezvousPoint = "full" - // eventbusBufSize is the size of the buffered channel to handle // events in libp2p. We specify a larger buffer size for the channel // to avoid overflowing and blocking subscription during disconnection bursts. @@ -45,6 +42,8 @@ var discoveryRetryTimeout = retryTimeout // Discovery combines advertise and discover services and allows to store discovered nodes. // TODO: The code here gets horribly hairy, so we should refactor this at some point type Discovery struct { + // Tag is used as rondezvous point for discovery service + tag string set *limitedSet host host.Host disc discovery.Discovery @@ -58,43 +57,50 @@ type Discovery struct { cancel context.CancelFunc - params Parameters + params *Parameters } type OnUpdatedPeers func(peerID peer.ID, isAdded bool) +func (f OnUpdatedPeers) add(next OnUpdatedPeers) OnUpdatedPeers { + return func(peerID peer.ID, isAdded bool) { + f(peerID, isAdded) + next(peerID, isAdded) + } +} + // NewDiscovery constructs a new discovery. func NewDiscovery( + params *Parameters, h host.Host, d discovery.Discovery, + tag string, opts ...Option, -) *Discovery { - params := DefaultParameters() - - for _, opt := range opts { - opt(¶ms) +) (*Discovery, error) { + if err := params.Validate(); err != nil { + return nil, err } + if tag == "" { + return nil, fmt.Errorf("discovery: tag cannot be empty") + } + o := newOptions(opts...) return &Discovery{ + tag: tag, set: newLimitedSet(params.PeersLimit), host: h, disc: d, connector: newBackoffConnector(h, defaultBackoffFactory), - onUpdatedPeers: func(peer.ID, bool) {}, + onUpdatedPeers: o.onUpdatedPeers, params: params, triggerDisc: make(chan struct{}), - } + }, nil } func (d *Discovery) Start(context.Context) error { ctx, cancel := context.WithCancel(context.Background()) d.cancel = cancel - if d.params.PeersLimit == 0 { - log.Warn("peers limit is set to 0. Skipping discovery...") - return nil - } - sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize)) if err != nil { return fmt.Errorf("subscribing for connection events: %w", err) @@ -111,15 +117,6 @@ func (d *Discovery) Stop(context.Context) error { return nil } -// WithOnPeersUpdate chains OnPeersUpdate callbacks on every update of discovered peers list. -func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) { - prev := d.onUpdatedPeers - d.onUpdatedPeers = func(peerID peer.ID, isAdded bool) { - prev(peerID, isAdded) - f(peerID, isAdded) - } -} - // Peers provides a list of discovered peers in the "full" topic. // If Discovery hasn't found any peers, it blocks until at least one peer is found. func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) { @@ -133,7 +130,7 @@ func (d *Discovery) Discard(id peer.ID) bool { return false } - d.host.ConnManager().Unprotect(id, rendezvousPoint) + d.host.ConnManager().Unprotect(id, d.tag) d.connector.Backoff(id) d.set.Remove(id) d.onUpdatedPeers(id, false) @@ -153,21 +150,16 @@ func (d *Discovery) Discard(id peer.ID) bool { // Advertise is a utility function that persistently advertises a service through an Advertiser. // TODO: Start advertising only after the reachability is confirmed by AutoNAT func (d *Discovery) Advertise(ctx context.Context) { - if d.params.AdvertiseInterval == -1 { - log.Warn("AdvertiseInterval is set to -1. Skipping advertising...") - return - } - timer := time.NewTimer(d.params.AdvertiseInterval) defer timer.Stop() for { - _, err := d.disc.Advertise(ctx, rendezvousPoint) + _, err := d.disc.Advertise(ctx, d.tag) d.metrics.observeAdvertise(ctx, err) if err != nil { if ctx.Err() != nil { return } - log.Warnw("error advertising", "rendezvous", rendezvousPoint, "err", err) + log.Warnw("error advertising", "rendezvous", d.tag, "err", err) // we don't want retry indefinitely in busy loop // internal discovery mechanism may need some time before attempts @@ -280,7 +272,7 @@ func (d *Discovery) discover(ctx context.Context) bool { findCancel() }() - peers, err := d.disc.FindPeers(findCtx, rendezvousPoint) + peers, err := d.disc.FindPeers(findCtx, d.tag) if err != nil { log.Error("unable to start discovery", "err", err) return false @@ -371,11 +363,11 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo d.metrics.observeHandlePeer(ctx, handlePeerConnected) logger.Debug("added peer to set") - // tag to protect peer from being killed by ConnManager + // Tag to protect peer from being killed by ConnManager // NOTE: This is does not protect from remote killing the connection. // In the future, we should design a protocol that keeps bidirectional agreement on whether // connection should be kept or not, similar to mesh link in GossipSub. - d.host.ConnManager().Protect(peer.ID, rendezvousPoint) + d.host.ConnManager().Protect(peer.ID, d.tag) return true } diff --git a/share/p2p/discovery/discovery_test.go b/share/p2p/discovery/discovery_test.go index 06d88a9079..c02931e1a4 100644 --- a/share/p2p/discovery/discovery_test.go +++ b/share/p2p/discovery/discovery_test.go @@ -17,33 +17,47 @@ import ( "github.com/stretchr/testify/require" ) +const ( + fullNodesTag = "full" +) + func TestDiscovery(t *testing.T) { const nodes = 10 // higher number brings higher coverage discoveryRetryTimeout = time.Millisecond * 100 // defined in discovery.go - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*30) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) t.Cleanup(cancel) tn := newTestnet(ctx, t) - peerA := tn.discovery( - WithPeersLimit(nodes), - WithAdvertiseInterval(-1), - ) - type peerUpdate struct { peerID peer.ID isAdded bool } updateCh := make(chan peerUpdate) - peerA.WithOnPeersUpdate(func(peerID peer.ID, isAdded bool) { + submit := func(peerID peer.ID, isAdded bool) { updateCh <- peerUpdate{peerID: peerID, isAdded: isAdded} - }) + } + + host, routingDisc := tn.peer() + params := DefaultParameters() + params.PeersLimit = nodes + + // start discovery listener service for peerA + peerA := tn.startNewDiscovery(params, host, routingDisc, fullNodesTag, + WithOnPeersUpdate(submit), + ) + // start discovery advertisement services for other peers + params.AdvertiseInterval = time.Millisecond * 100 discs := make([]*Discovery, nodes) for i := range discs { - discs[i] = tn.discovery(WithPeersLimit(0), WithAdvertiseInterval(time.Millisecond*100)) + host, routingDisc := tn.peer() + disc, err := NewDiscovery(params, host, routingDisc, fullNodesTag) + require.NoError(t, err) + go disc.Advertise(tn.ctx) + discs[i] = tn.startNewDiscovery(params, host, routingDisc, fullNodesTag) select { case res := <-updateCh: @@ -56,6 +70,7 @@ func TestDiscovery(t *testing.T) { assert.EqualValues(t, nodes, peerA.set.Size()) + // disconnect peerA from all peers and check that notifications are received on updateCh channel for _, disc := range discs { peerID := disc.host.ID() err := peerA.host.Network().ClosePeer(peerID) @@ -73,6 +88,51 @@ func TestDiscovery(t *testing.T) { assert.EqualValues(t, 0, peerA.set.Size()) } +func TestDiscoveryTagged(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + tn := newTestnet(ctx, t) + + // launch 2 peers, that advertise with different tags + adv1, routingDisc1 := tn.peer() + adv2, routingDisc2 := tn.peer() + + // sub will discover both peers, but on different tags + sub, routingDisc := tn.peer() + + params := DefaultParameters() + + // create 2 discovery services for sub, each with a different tag + done1 := make(chan struct{}) + tn.startNewDiscovery(params, sub, routingDisc, "tag1", + WithOnPeersUpdate(checkPeer(t, adv1.ID(), done1))) + + done2 := make(chan struct{}) + tn.startNewDiscovery(params, sub, routingDisc, "tag2", + WithOnPeersUpdate(checkPeer(t, adv2.ID(), done2))) + + // run discovery services for advertisers + ds1 := tn.startNewDiscovery(params, adv1, routingDisc1, "tag1") + go ds1.Advertise(tn.ctx) + + ds2 := tn.startNewDiscovery(params, adv2, routingDisc2, "tag2") + go ds2.Advertise(tn.ctx) + + // wait for discovery services to discover each other on different tags + select { + case <-done1: + case <-ctx.Done(): + t.Fatal("did not discover peer in time") + } + + select { + case <-done2: + case <-ctx.Done(): + t.Fatal("did not discover peer in time") + } +} + type testnet struct { ctx context.Context T *testing.T @@ -97,17 +157,21 @@ func newTestnet(ctx context.Context, t *testing.T) *testnet { return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)} } -func (t *testnet) discovery(opts ...Option) *Discovery { - hst, routingDisc := t.peer() - disc := NewDiscovery(hst, routingDisc, opts...) - err := disc.Start(t.ctx) +func (t *testnet) startNewDiscovery( + params *Parameters, + hst host.Host, + routingDisc discovery.Discovery, + tag string, + opts ...Option, +) *Discovery { + disc, err := NewDiscovery(params, hst, routingDisc, tag, opts...) + require.NoError(t.T, err) + err = disc.Start(t.ctx) require.NoError(t.T, err) t.T.Cleanup(func() { err := disc.Stop(t.ctx) require.NoError(t.T, err) }) - - go disc.Advertise(t.ctx) return disc } @@ -134,3 +198,11 @@ func (t *testnet) peer() (host.Host, discovery.Discovery) { return hst, routing.NewRoutingDiscovery(dht) } + +func checkPeer(t *testing.T, expected peer.ID, done chan struct{}) func(peerID peer.ID, isAdded bool) { + return func(peerID peer.ID, isAdded bool) { + defer close(done) + require.Equal(t, expected, peerID) + require.True(t, isAdded) + } +} diff --git a/share/p2p/discovery/metrics.go b/share/p2p/discovery/metrics.go index 99c9bb4548..d0be1c219d 100644 --- a/share/p2p/discovery/metrics.go +++ b/share/p2p/discovery/metrics.go @@ -15,7 +15,6 @@ const ( handlePeerResultKey = "result" handlePeerSkipSelf handlePeerResult = "skip_self" - handlePeerEmptyAddrs handlePeerResult = "skip_empty_addresses" handlePeerEnoughPeers handlePeerResult = "skip_enough_peers" handlePeerBackoff handlePeerResult = "skip_backoff" handlePeerConnected handlePeerResult = "connected" @@ -47,7 +46,7 @@ func (d *Discovery) WithMetrics() error { return fmt.Errorf("discovery: init metrics: %w", err) } d.metrics = metrics - d.WithOnPeersUpdate(metrics.observeOnPeersUpdate) + d.onUpdatedPeers = d.onUpdatedPeers.add(metrics.observeOnPeersUpdate) return nil } diff --git a/share/p2p/discovery/options.go b/share/p2p/discovery/options.go index 9d2735c50d..de4b13a7db 100644 --- a/share/p2p/discovery/options.go +++ b/share/p2p/discovery/options.go @@ -3,6 +3,8 @@ package discovery import ( "fmt" "time" + + "github.com/libp2p/go-libp2p/core/peer" ) // Parameters is the set of Parameters that must be configured for the Discovery module @@ -16,13 +18,19 @@ type Parameters struct { AdvertiseInterval time.Duration } +// options is the set of options that can be configured for the Discovery module +type options struct { + // onUpdatedPeers will be called on peer set changes + onUpdatedPeers OnUpdatedPeers +} + // Option is a function that configures Discovery Parameters -type Option func(*Parameters) +type Option func(*options) // DefaultParameters returns the default Parameters' configuration values // for the Discovery module -func DefaultParameters() Parameters { - return Parameters{ +func DefaultParameters() *Parameters { + return &Parameters{ PeersLimit: 5, AdvertiseInterval: time.Hour, } @@ -30,29 +38,30 @@ func DefaultParameters() Parameters { // Validate validates the values in Parameters func (p *Parameters) Validate() error { - if p.AdvertiseInterval <= 0 { - return fmt.Errorf( - "discovery: invalid option: value AdvertiseInterval %s, %s", - "is 0 or negative.", - "value must be positive", - ) + if p.PeersLimit <= 0 { + return fmt.Errorf("discovery: peers limit cannot be zero or negative") } + if p.AdvertiseInterval <= 0 { + return fmt.Errorf("discovery: advertise interval cannot be zero or negative") + } return nil } -// WithPeersLimit is a functional option that Discovery -// uses to set the PeersLimit configuration param -func WithPeersLimit(peersLimit uint) Option { - return func(p *Parameters) { - p.PeersLimit = peersLimit +// WithOnPeersUpdate chains OnPeersUpdate callbacks on every update of discovered peers list. +func WithOnPeersUpdate(f OnUpdatedPeers) Option { + return func(p *options) { + p.onUpdatedPeers = p.onUpdatedPeers.add(f) } } -// WithAdvertiseInterval is a functional option that Discovery -// uses to set the AdvertiseInterval configuration param -func WithAdvertiseInterval(advInterval time.Duration) Option { - return func(p *Parameters) { - p.AdvertiseInterval = advInterval +func newOptions(opts ...Option) *options { + defaults := &options{ + onUpdatedPeers: func(peer.ID, bool) {}, + } + + for _, opt := range opts { + opt(defaults) } + return defaults } diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 4935db2774..e39a181150 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -21,7 +21,6 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/p2p/discovery" "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" ) @@ -54,7 +53,6 @@ type Manager struct { // header subscription is necessary in order to Validate the inbound eds hash headerSub libhead.Subscriber[*header.ExtendedHeader] shrexSub *shrexsub.PubSub - disc *discovery.Discovery host host.Host connGater *conngater.BasicConnectionGater @@ -98,7 +96,6 @@ type syncPool struct { func NewManager( params Parameters, - discovery *discovery.Discovery, host host.Host, connGater *conngater.BasicConnectionGater, options ...Option, @@ -110,7 +107,6 @@ func NewManager( s := &Manager{ params: params, connGater: connGater, - disc: discovery, host: host, pools: make(map[string]*syncPool), blacklistedHashes: make(map[string]bool), @@ -126,23 +122,6 @@ func NewManager( } s.fullNodes = newPool(s.params.PeerCooldown) - - discovery.WithOnPeersUpdate( - func(peerID peer.ID, isAdded bool) { - if isAdded { - if s.isBlacklistedPeer(peerID) { - log.Debugw("got blacklisted peer from discovery", "peer", peerID.String()) - return - } - s.fullNodes.add(peerID) - log.Debugw("added to full nodes", "peer", peerID) - return - } - - log.Debugw("removing peer from discovered full nodes", "peer", peerID.String()) - s.fullNodes.remove(peerID) - }) - return s, nil } @@ -247,6 +226,22 @@ func (m *Manager) Peer( } } +// UpdateFullNodePool is called by discovery when new full node is discovered or removed +func (m *Manager) UpdateFullNodePool(peerID peer.ID, isAdded bool) { + if isAdded { + if m.isBlacklistedPeer(peerID) { + log.Debugw("got blacklisted peer from discovery", "peer", peerID.String()) + return + } + m.fullNodes.add(peerID) + log.Debugw("added to full nodes", "peer", peerID) + return + } + + log.Debugw("removing peer from discovered full nodes", "peer", peerID.String()) + m.fullNodes.remove(peerID) +} + func (m *Manager) newPeer( ctx context.Context, datahash share.DataHash, diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index 1ad7e65c02..94ec5d5ea2 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/go-datastore/sync" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing" @@ -364,13 +363,14 @@ func TestIntegration(t *testing.T) { }) t.Run("get peer from discovery", func(t *testing.T) { + fullNodesTag := "fullNodes" nw, err := mocknet.FullMeshConnected(3) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(cancel) // set up bootstrapper - bsHost := nw.Hosts()[2] + bsHost := nw.Hosts()[0] bs := host.InfoFromHost(bsHost) opts := []dht.Option{ dht.Mode(dht.ModeAuto), @@ -388,62 +388,74 @@ func TestIntegration(t *testing.T) { require.NoError(t, bsRouter.Bootstrap(ctx)) // set up broadcaster node - bnHost := nw.Hosts()[0] - router1, err := dht.New(ctx, bnHost, opts...) + bnHost := nw.Hosts()[1] + bnRouter, err := dht.New(ctx, bnHost, opts...) require.NoError(t, err) - bnDisc := discovery.NewDiscovery( - nw.Hosts()[0], - routingdisc.NewRoutingDiscovery(router1), - discovery.WithPeersLimit(0), - discovery.WithAdvertiseInterval(time.Second), + + params := discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + + bnDisc, err := discovery.NewDiscovery( + params, + bnHost, + routingdisc.NewRoutingDiscovery(bnRouter), + fullNodesTag, ) + require.NoError(t, err) // set up full node / receiver node - fnHost := nw.Hosts()[0] - router2, err := dht.New(ctx, fnHost, opts...) - require.NoError(t, err) - fnDisc := discovery.NewDiscovery( - nw.Hosts()[1], - routingdisc.NewRoutingDiscovery(router2), - discovery.WithPeersLimit(10), - discovery.WithAdvertiseInterval(time.Second), - ) - err = fnDisc.Start(ctx) + fnHost := nw.Hosts()[2] + fnRouter, err := dht.New(ctx, fnHost, opts...) require.NoError(t, err) - t.Cleanup(func() { - err = fnDisc.Stop(ctx) - require.NoError(t, err) - }) - // hook peer manager to discovery + // init peer manager for full node connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) require.NoError(t, err) fnPeerManager, err := NewManager( DefaultParameters(), - fnDisc, nil, connGater, ) require.NoError(t, err) waitCh := make(chan struct{}) - fnDisc.WithOnPeersUpdate(func(peerID peer.ID, isAdded bool) { + checkDiscoveredPeer := func(peerID peer.ID, isAdded bool) { defer close(waitCh) - // check that obtained peer id is same as BN - require.Equal(t, nw.Hosts()[0].ID(), peerID) + // check that obtained peer id is BN + require.Equal(t, bnHost.ID(), peerID) + } + + // set up discovery for full node with hook to peer manager and check discovered peer + params = discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + params.PeersLimit = 10 + + fnDisc, err := discovery.NewDiscovery( + params, + fnHost, + routingdisc.NewRoutingDiscovery(fnRouter), + fullNodesTag, + discovery.WithOnPeersUpdate(fnPeerManager.UpdateFullNodePool), + discovery.WithOnPeersUpdate(checkDiscoveredPeer), + ) + require.NoError(t, fnDisc.Start(ctx)) + t.Cleanup(func() { + err = fnDisc.Stop(ctx) + require.NoError(t, err) }) - require.NoError(t, router1.Bootstrap(ctx)) - require.NoError(t, router2.Bootstrap(ctx)) + require.NoError(t, bnRouter.Bootstrap(ctx)) + require.NoError(t, fnRouter.Bootstrap(ctx)) go bnDisc.Advertise(ctx) select { case <-waitCh: - require.Contains(t, fnPeerManager.fullNodes.peersList, fnHost.ID()) + require.Contains(t, fnPeerManager.fullNodes.peersList, bnHost.ID()) case <-ctx.Done(): require.NoError(t, ctx.Err()) } + }) } @@ -457,22 +469,17 @@ func testManager(ctx context.Context, headerSub libhead.Subscriber[*header.Exten return nil, err } - disc := discovery.NewDiscovery(nil, - routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), - discovery.WithPeersLimit(0), - discovery.WithAdvertiseInterval(time.Second), - ) connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) if err != nil { return nil, err } manager, err := NewManager( DefaultParameters(), - disc, host, connGater, WithShrexSubPools(shrexSub, headerSub), ) + if err != nil { return nil, err }