From 24d694907314bfed553a2d8cbb849e60d36cb779 Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 19 Sep 2023 22:40:35 +0800 Subject: [PATCH 1/6] discover peers by tag --- nodebuilder/share/config.go | 2 +- nodebuilder/share/constructors.go | 10 ++-- nodebuilder/share/module.go | 4 +- share/getters/shrex_test.go | 9 ---- share/p2p/discovery/discovery.go | 37 ++++++------- share/p2p/discovery/discovery_test.go | 78 +++++++++++++++++++++++---- share/p2p/discovery/metrics.go | 3 +- share/p2p/discovery/options.go | 41 +++++++++++++- share/p2p/peers/manager.go | 37 ++++++------- share/p2p/peers/manager_test.go | 65 +++++++++++----------- 10 files changed, 183 insertions(+), 103 deletions(-) diff --git a/nodebuilder/share/config.go b/nodebuilder/share/config.go index 7fd845a672..99ea4a5f2b 100644 --- a/nodebuilder/share/config.go +++ b/nodebuilder/share/config.go @@ -22,7 +22,7 @@ type Config struct { PeerManagerParams peers.Parameters LightAvailability light.Parameters `toml:",omitempty"` - Discovery discovery.Parameters + Discovery *discovery.Parameters } func DefaultConfig(tp node.Type) Config { diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index 0e8d17f208..4fd85846d1 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -21,18 +21,22 @@ import ( "github.com/celestiaorg/celestia-node/share/getters" "github.com/celestiaorg/celestia-node/share/ipld" disc "github.com/celestiaorg/celestia-node/share/p2p/discovery" + "github.com/celestiaorg/celestia-node/share/p2p/peers" ) -func newDiscovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery { +func newDiscovery(cfg *disc.Parameters, +) func(routing.ContentRouting, host.Host, *peers.Manager) *disc.Discovery { return func( r routing.ContentRouting, h host.Host, + manager *peers.Manager, ) *disc.Discovery { return disc.NewDiscovery( h, routingdisc.NewRoutingDiscovery(r), - disc.WithPeersLimit(cfg.Discovery.PeersLimit), - disc.WithAdvertiseInterval(cfg.Discovery.AdvertiseInterval), + disc.WithPeersLimit(cfg.PeersLimit), + disc.WithAdvertiseInterval(cfg.AdvertiseInterval), + disc.WithOnPeersUpdate(manager.UpdatedFullNodes), ) } } diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index 35f8813111..47bd67007d 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -37,7 +37,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option fx.Provide(newModule), fx.Invoke(func(disc *disc.Discovery) {}), fx.Provide(fx.Annotate( - newDiscovery(*cfg), + newDiscovery(cfg.Discovery), fx.OnStart(func(ctx context.Context, d *disc.Discovery) error { return d.Start(ctx) }), @@ -147,7 +147,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option fx.Provide( func( params peers.Parameters, - discovery *disc.Discovery, host host.Host, connGater *conngater.BasicConnectionGater, shrexSub *shrexsub.PubSub, @@ -158,7 +157,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option ) (*peers.Manager, error) { return peers.NewManager( params, - discovery, host, connGater, peers.WithShrexSubPools(shrexSub, headerSub), diff --git a/share/getters/shrex_test.go b/share/getters/shrex_test.go index 38611ed0ef..a29665f614 100644 --- a/share/getters/shrex_test.go +++ b/share/getters/shrex_test.go @@ -9,9 +9,7 @@ import ( "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/host" - routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/net/conngater" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" @@ -25,7 +23,6 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/edstest" - "github.com/celestiaorg/celestia-node/share/p2p/discovery" "github.com/celestiaorg/celestia-node/share/p2p/peers" "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" @@ -211,18 +208,12 @@ func testManager( return nil, err } - disc := discovery.NewDiscovery(nil, - routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), - discovery.WithPeersLimit(10), - discovery.WithAdvertiseInterval(time.Second), - ) connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore())) if err != nil { return nil, err } manager, err := peers.NewManager( peers.DefaultParameters(), - disc, host, connGater, peers.WithShrexSubPools(shrexSub, headerSub), diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index f24df2c88b..6026464c8c 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -45,6 +45,7 @@ var discoveryRetryTimeout = retryTimeout // Discovery combines advertise and discover services and allows to store discovered nodes. // TODO: The code here gets horribly hairy, so we should refactor this at some point type Discovery struct { + tag string set *limitedSet host host.Host disc discovery.Discovery @@ -58,11 +59,18 @@ type Discovery struct { cancel context.CancelFunc - params Parameters + params *Parameters } type OnUpdatedPeers func(peerID peer.ID, isAdded bool) +func (f OnUpdatedPeers) add(next OnUpdatedPeers) OnUpdatedPeers { + return func(peerID peer.ID, isAdded bool) { + f(peerID, isAdded) + next(peerID, isAdded) + } +} + // NewDiscovery constructs a new discovery. func NewDiscovery( h host.Host, @@ -72,15 +80,16 @@ func NewDiscovery( params := DefaultParameters() for _, opt := range opts { - opt(¶ms) + opt(params) } return &Discovery{ + tag: params.Tag, set: newLimitedSet(params.PeersLimit), host: h, disc: d, connector: newBackoffConnector(h, defaultBackoffFactory), - onUpdatedPeers: func(peer.ID, bool) {}, + onUpdatedPeers: params.onUpdatedPeers, params: params, triggerDisc: make(chan struct{}), } @@ -111,15 +120,6 @@ func (d *Discovery) Stop(context.Context) error { return nil } -// WithOnPeersUpdate chains OnPeersUpdate callbacks on every update of discovered peers list. -func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) { - prev := d.onUpdatedPeers - d.onUpdatedPeers = func(peerID peer.ID, isAdded bool) { - prev(peerID, isAdded) - f(peerID, isAdded) - } -} - // Peers provides a list of discovered peers in the "full" topic. // If Discovery hasn't found any peers, it blocks until at least one peer is found. func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) { @@ -133,7 +133,7 @@ func (d *Discovery) Discard(id peer.ID) bool { return false } - d.host.ConnManager().Unprotect(id, rendezvousPoint) + d.host.ConnManager().Unprotect(id, d.tag) d.connector.Backoff(id) d.set.Remove(id) d.onUpdatedPeers(id, false) @@ -161,13 +161,14 @@ func (d *Discovery) Advertise(ctx context.Context) { timer := time.NewTimer(d.params.AdvertiseInterval) defer timer.Stop() for { - _, err := d.disc.Advertise(ctx, rendezvousPoint) + fmt.Println(d.tag) + _, err := d.disc.Advertise(ctx, d.tag) d.metrics.observeAdvertise(ctx, err) if err != nil { if ctx.Err() != nil { return } - log.Warnw("error advertising", "rendezvous", rendezvousPoint, "err", err) + log.Warnw("error advertising", "rendezvous", d.tag, "err", err) // we don't want retry indefinitely in busy loop // internal discovery mechanism may need some time before attempts @@ -280,7 +281,7 @@ func (d *Discovery) discover(ctx context.Context) bool { findCancel() }() - peers, err := d.disc.FindPeers(findCtx, rendezvousPoint) + peers, err := d.disc.FindPeers(findCtx, d.tag) if err != nil { log.Error("unable to start discovery", "err", err) return false @@ -371,11 +372,11 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo d.metrics.observeHandlePeer(ctx, handlePeerConnected) logger.Debug("added peer to set") - // tag to protect peer from being killed by ConnManager + // Tag to protect peer from being killed by ConnManager // NOTE: This is does not protect from remote killing the connection. // In the future, we should design a protocol that keeps bidirectional agreement on whether // connection should be kept or not, similar to mesh link in GossipSub. - d.host.ConnManager().Protect(peer.ID, rendezvousPoint) + d.host.ConnManager().Protect(peer.ID, d.tag) return true } diff --git a/share/p2p/discovery/discovery_test.go b/share/p2p/discovery/discovery_test.go index 06d88a9079..3720a0c22d 100644 --- a/share/p2p/discovery/discovery_test.go +++ b/share/p2p/discovery/discovery_test.go @@ -27,23 +27,29 @@ func TestDiscovery(t *testing.T) { tn := newTestnet(ctx, t) - peerA := tn.discovery( - WithPeersLimit(nodes), - WithAdvertiseInterval(-1), - ) - type peerUpdate struct { peerID peer.ID isAdded bool } updateCh := make(chan peerUpdate) - peerA.WithOnPeersUpdate(func(peerID peer.ID, isAdded bool) { + submit := func(peerID peer.ID, isAdded bool) { updateCh <- peerUpdate{peerID: peerID, isAdded: isAdded} - }) + } + + host, routingDisc := tn.peer() + peerA := tn.discovery( + host, routingDisc, + WithPeersLimit(nodes), + WithAdvertiseInterval(-1), + WithOnPeersUpdate(submit), + ) discs := make([]*Discovery, nodes) for i := range discs { - discs[i] = tn.discovery(WithPeersLimit(0), WithAdvertiseInterval(time.Millisecond*100)) + host, routingDisc := tn.peer() + discs[i] = tn.discovery(host, routingDisc, + WithPeersLimit(0), + WithAdvertiseInterval(time.Millisecond*100)) select { case res := <-updateCh: @@ -73,6 +79,51 @@ func TestDiscovery(t *testing.T) { assert.EqualValues(t, 0, peerA.set.Size()) } +func TestDiscoveryTagged(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + tn := newTestnet(ctx, t) + + // launch 2 peers, that advertise with different tags + adv1, routingDisc1 := tn.peer() + adv2, routingDisc2 := tn.peer() + + // sub will discover both peers, but on different tags + sub, routingDisc := tn.peer() + + // create 2 discovery services for sub, each with a different tag + done1 := make(chan struct{}) + tn.discovery(sub, routingDisc, + WithTag("tag1"), + WithOnPeersUpdate(checkPeer(t, adv1.ID(), done1))) + + done2 := make(chan struct{}) + tn.discovery(sub, routingDisc, + WithTag("tag2"), + WithOnPeersUpdate(checkPeer(t, adv2.ID(), done2))) + + // run discovery services for advertisers + tn.discovery(adv1, routingDisc1, + WithTag("tag1")) + + tn.discovery(adv2, routingDisc2, + WithTag("tag2")) + + // wait for discovery services to discover each other on different tags + select { + case <-done1: + case <-ctx.Done(): + t.Fatal("did not discover peer in time") + } + + select { + case <-done2: + case <-ctx.Done(): + t.Fatal("did not discover peer in time") + } +} + type testnet struct { ctx context.Context T *testing.T @@ -97,8 +148,7 @@ func newTestnet(ctx context.Context, t *testing.T) *testnet { return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)} } -func (t *testnet) discovery(opts ...Option) *Discovery { - hst, routingDisc := t.peer() +func (t *testnet) discovery(hst host.Host, routingDisc discovery.Discovery, opts ...Option) *Discovery { disc := NewDiscovery(hst, routingDisc, opts...) err := disc.Start(t.ctx) require.NoError(t.T, err) @@ -134,3 +184,11 @@ func (t *testnet) peer() (host.Host, discovery.Discovery) { return hst, routing.NewRoutingDiscovery(dht) } + +func checkPeer(t *testing.T, expected peer.ID, done chan struct{}) func(peerID peer.ID, isAdded bool) { + return func(peerID peer.ID, isAdded bool) { + defer close(done) + require.Equal(t, expected, peerID) + require.True(t, isAdded) + } +} diff --git a/share/p2p/discovery/metrics.go b/share/p2p/discovery/metrics.go index 99c9bb4548..d0be1c219d 100644 --- a/share/p2p/discovery/metrics.go +++ b/share/p2p/discovery/metrics.go @@ -15,7 +15,6 @@ const ( handlePeerResultKey = "result" handlePeerSkipSelf handlePeerResult = "skip_self" - handlePeerEmptyAddrs handlePeerResult = "skip_empty_addresses" handlePeerEnoughPeers handlePeerResult = "skip_enough_peers" handlePeerBackoff handlePeerResult = "skip_backoff" handlePeerConnected handlePeerResult = "connected" @@ -47,7 +46,7 @@ func (d *Discovery) WithMetrics() error { return fmt.Errorf("discovery: init metrics: %w", err) } d.metrics = metrics - d.WithOnPeersUpdate(metrics.observeOnPeersUpdate) + d.onUpdatedPeers = d.onUpdatedPeers.add(metrics.observeOnPeersUpdate) return nil } diff --git a/share/p2p/discovery/options.go b/share/p2p/discovery/options.go index 8a6a162e11..71f93cd6b3 100644 --- a/share/p2p/discovery/options.go +++ b/share/p2p/discovery/options.go @@ -3,6 +3,8 @@ package discovery import ( "fmt" "time" + + "github.com/libp2p/go-libp2p/core/peer" ) // Parameters is the set of Parameters that must be configured for the Discovery module @@ -14,6 +16,10 @@ type Parameters struct { // Set -1 to disable. // NOTE: only full and bridge can advertise themselves. AdvertiseInterval time.Duration + // onUpdatedPeers will be called on peer set changes + onUpdatedPeers OnUpdatedPeers + // Tag is used as rondezvous point for discovery service + Tag string } // Option is a function that configures Discovery Parameters @@ -21,11 +27,13 @@ type Option func(*Parameters) // DefaultParameters returns the default Parameters' configuration values // for the Discovery module -func DefaultParameters() Parameters { - return Parameters{ +func DefaultParameters() *Parameters { + return &Parameters{ PeersLimit: 5, // based on https://github.com/libp2p/go-libp2p-kad-dht/pull/793 AdvertiseInterval: time.Hour * 22, + Tag: rendezvousPoint, + onUpdatedPeers: func(peer.ID, bool) {}, } } @@ -39,6 +47,21 @@ func (p *Parameters) Validate() error { ) } + if p.PeersLimit <= 0 { + return fmt.Errorf( + "discovery: invalid option: value PeersLimit %s, %s", + "is negative.", + "value must be positive", + ) + } + + if p.Tag == "" { + return fmt.Errorf( + "discovery: invalid option: value Tag %s, %s", + "is empty.", + "value must be non-empty", + ) + } return nil } @@ -57,3 +80,17 @@ func WithAdvertiseInterval(advInterval time.Duration) Option { p.AdvertiseInterval = advInterval } } + +// WithOnPeersUpdate chains OnPeersUpdate callbacks on every update of discovered peers list. +func WithOnPeersUpdate(f OnUpdatedPeers) Option { + return func(p *Parameters) { + p.onUpdatedPeers = p.onUpdatedPeers.add(f) + } +} + +// WithTag is a functional option that sets the Tag for the discovery service +func WithTag(tag string) Option { + return func(p *Parameters) { + p.Tag = tag + } +} diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index caef242eec..fdb55dc896 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -22,7 +22,6 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/p2p/discovery" "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" ) @@ -55,7 +54,6 @@ type Manager struct { // header subscription is necessary in order to Validate the inbound eds hash headerSub libhead.Subscriber[*header.ExtendedHeader] shrexSub *shrexsub.PubSub - disc *discovery.Discovery host host.Host connGater *conngater.BasicConnectionGater @@ -99,7 +97,6 @@ type syncPool struct { func NewManager( params Parameters, - discovery *discovery.Discovery, host host.Host, connGater *conngater.BasicConnectionGater, options ...Option, @@ -111,7 +108,6 @@ func NewManager( s := &Manager{ params: params, connGater: connGater, - disc: discovery, host: host, pools: make(map[string]*syncPool), blacklistedHashes: make(map[string]bool), @@ -127,23 +123,6 @@ func NewManager( } s.fullNodes = newPool(s.params.PeerCooldown) - - discovery.WithOnPeersUpdate( - func(peerID peer.ID, isAdded bool) { - if isAdded { - if s.isBlacklistedPeer(peerID) { - log.Debugw("got blacklisted peer from discovery", "peer", peerID.String()) - return - } - s.fullNodes.add(peerID) - log.Debugw("added to full nodes", "peer", peerID) - return - } - - log.Debugw("removing peer from discovered full nodes", "peer", peerID.String()) - s.fullNodes.remove(peerID) - }) - return s, nil } @@ -248,6 +227,22 @@ func (m *Manager) Peer( } } +// UpdatedFullNodes is called by discovery when new full node is discovered or removed +func (m *Manager) UpdatedFullNodes(peerID peer.ID, isAdded bool) { + if isAdded { + if m.isBlacklistedPeer(peerID) { + log.Debugw("got blacklisted peer from discovery", "peer", peerID.String()) + return + } + m.fullNodes.add(peerID) + log.Debugw("added to full nodes", "peer", peerID) + return + } + + log.Debugw("removing peer from discovered full nodes", "peer", peerID.String()) + m.fullNodes.remove(peerID) +} + func (m *Manager) newPeer( ctx context.Context, datahash share.DataHash, diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index c60a737baa..31496481ed 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/go-datastore/sync" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing" @@ -369,7 +368,7 @@ func TestIntegration(t *testing.T) { t.Cleanup(cancel) // set up bootstrapper - bsHost := nw.Hosts()[2] + bsHost := nw.Hosts()[0] bs := host.InfoFromHost(bsHost) opts := []dht.Option{ dht.Mode(dht.ModeAuto), @@ -387,62 +386,65 @@ func TestIntegration(t *testing.T) { require.NoError(t, bsRouter.Bootstrap(ctx)) // set up broadcaster node - bnHost := nw.Hosts()[0] - router1, err := dht.New(ctx, bnHost, opts...) + bnHost := nw.Hosts()[1] + bnRouter, err := dht.New(ctx, bnHost, opts...) require.NoError(t, err) bnDisc := discovery.NewDiscovery( - nw.Hosts()[0], - routingdisc.NewRoutingDiscovery(router1), + bnHost, + routingdisc.NewRoutingDiscovery(bnRouter), discovery.WithPeersLimit(0), discovery.WithAdvertiseInterval(time.Second), ) // set up full node / receiver node - fnHost := nw.Hosts()[0] - router2, err := dht.New(ctx, fnHost, opts...) + fnHost := nw.Hosts()[2] + fnRouter, err := dht.New(ctx, fnHost, opts...) require.NoError(t, err) - fnDisc := discovery.NewDiscovery( - nw.Hosts()[1], - routingdisc.NewRoutingDiscovery(router2), - discovery.WithPeersLimit(10), - discovery.WithAdvertiseInterval(time.Second), - ) - err = fnDisc.Start(ctx) - require.NoError(t, err) - t.Cleanup(func() { - err = fnDisc.Stop(ctx) - require.NoError(t, err) - }) - // hook peer manager to discovery + // init peer manager for full node connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) require.NoError(t, err) fnPeerManager, err := NewManager( DefaultParameters(), - fnDisc, nil, connGater, ) require.NoError(t, err) waitCh := make(chan struct{}) - fnDisc.WithOnPeersUpdate(func(peerID peer.ID, isAdded bool) { + checkDiscoveredPeer := func(peerID peer.ID, isAdded bool) { defer close(waitCh) - // check that obtained peer id is same as BN - require.Equal(t, nw.Hosts()[0].ID(), peerID) + // check that obtained peer id is BN + require.Equal(t, bnHost.ID(), peerID) + } + + // set up discovery for full node with hook to peer manager and check discovered peer + fnDisc := discovery.NewDiscovery( + fnHost, + routingdisc.NewRoutingDiscovery(fnRouter), + discovery.WithPeersLimit(10), + discovery.WithAdvertiseInterval(time.Second), + discovery.WithOnPeersUpdate(fnPeerManager.UpdatedFullNodes), + discovery.WithOnPeersUpdate(checkDiscoveredPeer), + ) + require.NoError(t, fnDisc.Start(ctx)) + t.Cleanup(func() { + err = fnDisc.Stop(ctx) + require.NoError(t, err) }) - require.NoError(t, router1.Bootstrap(ctx)) - require.NoError(t, router2.Bootstrap(ctx)) + require.NoError(t, bnRouter.Bootstrap(ctx)) + require.NoError(t, fnRouter.Bootstrap(ctx)) go bnDisc.Advertise(ctx) select { case <-waitCh: - require.Contains(t, fnPeerManager.fullNodes.peersList, fnHost.ID()) + require.Contains(t, fnPeerManager.fullNodes.peersList, bnHost.ID()) case <-ctx.Done(): require.NoError(t, ctx.Err()) } + }) } @@ -456,22 +458,17 @@ func testManager(ctx context.Context, headerSub libhead.Subscriber[*header.Exten return nil, err } - disc := discovery.NewDiscovery(nil, - routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), - discovery.WithPeersLimit(0), - discovery.WithAdvertiseInterval(time.Second), - ) connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) if err != nil { return nil, err } manager, err := NewManager( DefaultParameters(), - disc, host, connGater, WithShrexSubPools(shrexSub, headerSub), ) + if err != nil { return nil, err } From c82c3048e735a081749675ec8c0367657809553a Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 19 Sep 2023 23:16:53 +0800 Subject: [PATCH 2/6] remove rendezvousPoint --- share/p2p/discovery/discovery.go | 3 --- share/p2p/discovery/options.go | 10 ++++++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index 6026464c8c..9d430fc115 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -19,9 +19,6 @@ import ( var log = logging.Logger("share/discovery") const ( - // rendezvousPoint is the namespace where peers advertise and discover each other. - rendezvousPoint = "full" - // eventbusBufSize is the size of the buffered channel to handle // events in libp2p. We specify a larger buffer size for the channel // to avoid overflowing and blocking subscription during disconnection bursts. diff --git a/share/p2p/discovery/options.go b/share/p2p/discovery/options.go index 71f93cd6b3..be049d77ab 100644 --- a/share/p2p/discovery/options.go +++ b/share/p2p/discovery/options.go @@ -7,6 +7,11 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) +const ( + // fullNodesTag is the namespace where full nodes advertise and discover each other. + fullNodesTag = "full" +) + // Parameters is the set of Parameters that must be configured for the Discovery module type Parameters struct { // PeersLimit defines the soft limit of FNs to connect to via discovery. @@ -32,8 +37,9 @@ func DefaultParameters() *Parameters { PeersLimit: 5, // based on https://github.com/libp2p/go-libp2p-kad-dht/pull/793 AdvertiseInterval: time.Hour * 22, - Tag: rendezvousPoint, - onUpdatedPeers: func(peer.ID, bool) {}, + //TODO: remove fullNodesTag default value once multiple tags are supported + Tag: fullNodesTag, + onUpdatedPeers: func(peer.ID, bool) {}, } } From fc879f785a95828b0112cf16142563c6f76695eb Mon Sep 17 00:00:00 2001 From: Vlad Date: Mon, 25 Sep 2023 22:34:38 +0800 Subject: [PATCH 3/6] rework options constructor pattern --- nodebuilder/share/constructors.go | 8 ++-- share/availability/full/testing.go | 9 ++-- share/p2p/discovery/discovery.go | 15 ++++--- share/p2p/discovery/discovery_test.go | 49 ++++++++++++++-------- share/p2p/discovery/options.go | 59 ++++++++------------------- share/p2p/peers/manager_test.go | 20 ++++++--- 6 files changed, 78 insertions(+), 82 deletions(-) diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index d45fe6371f..2ef0596254 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -21,18 +21,16 @@ import ( ) func newDiscovery(cfg *disc.Parameters, -) func(routing.ContentRouting, host.Host, *peers.Manager) *disc.Discovery { +) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) { return func( r routing.ContentRouting, h host.Host, manager *peers.Manager, - ) *disc.Discovery { + ) (*disc.Discovery, error) { return disc.NewDiscovery( + cfg, h, routingdisc.NewRoutingDiscovery(r), - disc.WithPeersLimit(cfg.PeersLimit), - disc.WithAdvertiseInterval(cfg.AdvertiseInterval), - disc.WithOnPeersUpdate(manager.UpdatedFullNodes), ) } } diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index a636b26ea6..deca8741c9 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -41,12 +41,15 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { } func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability { - disc := discovery.NewDiscovery( + params := discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + params.PeersLimit = 10 + disc, err := discovery.NewDiscovery( + params, nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), - discovery.WithAdvertiseInterval(time.Second), - discovery.WithPeersLimit(10), ) + require.NoError(t, err) store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore()) require.NoError(t, err) err = store.Start(context.Background()) diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index 9d430fc115..1b67cc4bf5 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -70,26 +70,26 @@ func (f OnUpdatedPeers) add(next OnUpdatedPeers) OnUpdatedPeers { // NewDiscovery constructs a new discovery. func NewDiscovery( + params *Parameters, h host.Host, d discovery.Discovery, opts ...Option, -) *Discovery { - params := DefaultParameters() - - for _, opt := range opts { - opt(params) +) (*Discovery, error) { + if err := params.Validate(); err != nil { + return nil, err } + o := newOptions(opts...) return &Discovery{ tag: params.Tag, set: newLimitedSet(params.PeersLimit), host: h, disc: d, connector: newBackoffConnector(h, defaultBackoffFactory), - onUpdatedPeers: params.onUpdatedPeers, + onUpdatedPeers: o.onUpdatedPeers, params: params, triggerDisc: make(chan struct{}), - } + }, nil } func (d *Discovery) Start(context.Context) error { @@ -158,7 +158,6 @@ func (d *Discovery) Advertise(ctx context.Context) { timer := time.NewTimer(d.params.AdvertiseInterval) defer timer.Stop() for { - fmt.Println(d.tag) _, err := d.disc.Advertise(ctx, d.tag) d.metrics.observeAdvertise(ctx, err) if err != nil { diff --git a/share/p2p/discovery/discovery_test.go b/share/p2p/discovery/discovery_test.go index 3720a0c22d..b9db19dd2f 100644 --- a/share/p2p/discovery/discovery_test.go +++ b/share/p2p/discovery/discovery_test.go @@ -37,19 +37,24 @@ func TestDiscovery(t *testing.T) { } host, routingDisc := tn.peer() - peerA := tn.discovery( - host, routingDisc, - WithPeersLimit(nodes), - WithAdvertiseInterval(-1), + params := DefaultParameters() + params.PeersLimit = nodes + params.AdvertiseInterval = -1 + params.Tag = "full" + + peerA := tn.discovery(params, host, routingDisc, WithOnPeersUpdate(submit), ) + params = &Parameters{ + PeersLimit: 0, + AdvertiseInterval: time.Millisecond * 100, + Tag: "full", + } discs := make([]*Discovery, nodes) for i := range discs { host, routingDisc := tn.peer() - discs[i] = tn.discovery(host, routingDisc, - WithPeersLimit(0), - WithAdvertiseInterval(time.Millisecond*100)) + discs[i] = tn.discovery(params, host, routingDisc) select { case res := <-updateCh: @@ -92,23 +97,25 @@ func TestDiscoveryTagged(t *testing.T) { // sub will discover both peers, but on different tags sub, routingDisc := tn.peer() + params := DefaultParameters() + // create 2 discovery services for sub, each with a different tag + params.Tag = "tag1" done1 := make(chan struct{}) - tn.discovery(sub, routingDisc, - WithTag("tag1"), + tn.discovery(params, sub, routingDisc, WithOnPeersUpdate(checkPeer(t, adv1.ID(), done1))) + params.Tag = "tag2" done2 := make(chan struct{}) - tn.discovery(sub, routingDisc, - WithTag("tag2"), + tn.discovery(params, sub, routingDisc, WithOnPeersUpdate(checkPeer(t, adv2.ID(), done2))) // run discovery services for advertisers - tn.discovery(adv1, routingDisc1, - WithTag("tag1")) + params.Tag = "tag1" + tn.discovery(params, adv1, routingDisc1) - tn.discovery(adv2, routingDisc2, - WithTag("tag2")) + params.Tag = "tag2" + tn.discovery(params, adv2, routingDisc2) // wait for discovery services to discover each other on different tags select { @@ -148,9 +155,15 @@ func newTestnet(ctx context.Context, t *testing.T) *testnet { return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)} } -func (t *testnet) discovery(hst host.Host, routingDisc discovery.Discovery, opts ...Option) *Discovery { - disc := NewDiscovery(hst, routingDisc, opts...) - err := disc.Start(t.ctx) +func (t *testnet) discovery( + params *Parameters, + hst host.Host, + routingDisc discovery.Discovery, + opts ...Option, +) *Discovery { + disc, err := NewDiscovery(params, hst, routingDisc, opts...) + require.NoError(t.T, err) + err = disc.Start(t.ctx) require.NoError(t.T, err) t.T.Cleanup(func() { err := disc.Stop(t.ctx) diff --git a/share/p2p/discovery/options.go b/share/p2p/discovery/options.go index 11189338a9..886f1850d2 100644 --- a/share/p2p/discovery/options.go +++ b/share/p2p/discovery/options.go @@ -21,14 +21,18 @@ type Parameters struct { // Set -1 to disable. // NOTE: only full and bridge can advertise themselves. AdvertiseInterval time.Duration - // onUpdatedPeers will be called on peer set changes - onUpdatedPeers OnUpdatedPeers // Tag is used as rondezvous point for discovery service Tag string } +// options is the set of options that can be configured for the Discovery module +type options struct { + // onUpdatedPeers will be called on peer set changes + onUpdatedPeers OnUpdatedPeers +} + // Option is a function that configures Discovery Parameters -type Option func(*Parameters) +type Option func(*options) // DefaultParameters returns the default Parameters' configuration values // for the Discovery module @@ -37,29 +41,12 @@ func DefaultParameters() *Parameters { PeersLimit: 5, AdvertiseInterval: time.Hour, //TODO: remove fullNodesTag default value once multiple tags are supported - Tag: fullNodesTag, - onUpdatedPeers: func(peer.ID, bool) {}, + Tag: fullNodesTag, } } // Validate validates the values in Parameters func (p *Parameters) Validate() error { - if p.AdvertiseInterval <= 0 { - return fmt.Errorf( - "discovery: invalid option: value AdvertiseInterval %s, %s", - "is 0 or negative.", - "value must be positive", - ) - } - - if p.PeersLimit <= 0 { - return fmt.Errorf( - "discovery: invalid option: value PeersLimit %s, %s", - "is negative.", - "value must be positive", - ) - } - if p.Tag == "" { return fmt.Errorf( "discovery: invalid option: value Tag %s, %s", @@ -70,32 +57,20 @@ func (p *Parameters) Validate() error { return nil } -// WithPeersLimit is a functional option that Discovery -// uses to set the PeersLimit configuration param -func WithPeersLimit(peersLimit uint) Option { - return func(p *Parameters) { - p.PeersLimit = peersLimit - } -} - -// WithAdvertiseInterval is a functional option that Discovery -// uses to set the AdvertiseInterval configuration param -func WithAdvertiseInterval(advInterval time.Duration) Option { - return func(p *Parameters) { - p.AdvertiseInterval = advInterval - } -} - // WithOnPeersUpdate chains OnPeersUpdate callbacks on every update of discovered peers list. func WithOnPeersUpdate(f OnUpdatedPeers) Option { - return func(p *Parameters) { + return func(p *options) { p.onUpdatedPeers = p.onUpdatedPeers.add(f) } } -// WithTag is a functional option that sets the Tag for the discovery service -func WithTag(tag string) Option { - return func(p *Parameters) { - p.Tag = tag +func newOptions(opts ...Option) *options { + defaults := &options{ + onUpdatedPeers: func(peer.ID, bool) {}, + } + + for _, opt := range opts { + opt(defaults) } + return defaults } diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index 31496481ed..a461534ac8 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -389,12 +389,17 @@ func TestIntegration(t *testing.T) { bnHost := nw.Hosts()[1] bnRouter, err := dht.New(ctx, bnHost, opts...) require.NoError(t, err) - bnDisc := discovery.NewDiscovery( + + params := discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + params.PeersLimit = 0 + + bnDisc, err := discovery.NewDiscovery( + params, bnHost, routingdisc.NewRoutingDiscovery(bnRouter), - discovery.WithPeersLimit(0), - discovery.WithAdvertiseInterval(time.Second), ) + require.NoError(t, err) // set up full node / receiver node fnHost := nw.Hosts()[2] @@ -419,11 +424,14 @@ func TestIntegration(t *testing.T) { } // set up discovery for full node with hook to peer manager and check discovered peer - fnDisc := discovery.NewDiscovery( + params = discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + params.PeersLimit = 10 + + fnDisc, err := discovery.NewDiscovery( + params, fnHost, routingdisc.NewRoutingDiscovery(fnRouter), - discovery.WithPeersLimit(10), - discovery.WithAdvertiseInterval(time.Second), discovery.WithOnPeersUpdate(fnPeerManager.UpdatedFullNodes), discovery.WithOnPeersUpdate(checkDiscoveredPeer), ) From 4a2bff3789802f4d60ee3072e3fd8f56eba95bc6 Mon Sep 17 00:00:00 2001 From: Vlad Date: Thu, 28 Sep 2023 16:22:43 +0800 Subject: [PATCH 4/6] provide peer-manager callback to discovery --- nodebuilder/share/constructors.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index 2ef0596254..c337657a55 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -31,6 +31,7 @@ func newDiscovery(cfg *disc.Parameters, cfg, h, routingdisc.NewRoutingDiscovery(r), + disc.WithOnPeersUpdate(manager.UpdatedFullNodes), ) } } From 40c4b6c5c7475a71d3e91bcfa8c370df545e0459 Mon Sep 17 00:00:00 2001 From: Vlad Date: Fri, 6 Oct 2023 11:26:00 +0400 Subject: [PATCH 5/6] apply review suggestions --- nodebuilder/share/constructors.go | 2 +- share/p2p/discovery/discovery_test.go | 4 ++-- share/p2p/peers/manager.go | 4 ++-- share/p2p/peers/manager_test.go | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index c337657a55..c15c7f3cdc 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -31,7 +31,7 @@ func newDiscovery(cfg *disc.Parameters, cfg, h, routingdisc.NewRoutingDiscovery(r), - disc.WithOnPeersUpdate(manager.UpdatedFullNodes), + disc.WithOnPeersUpdate(manager.UpdateFullNodePool), ) } } diff --git a/share/p2p/discovery/discovery_test.go b/share/p2p/discovery/discovery_test.go index b9db19dd2f..61f65f470f 100644 --- a/share/p2p/discovery/discovery_test.go +++ b/share/p2p/discovery/discovery_test.go @@ -40,7 +40,7 @@ func TestDiscovery(t *testing.T) { params := DefaultParameters() params.PeersLimit = nodes params.AdvertiseInterval = -1 - params.Tag = "full" + params.Tag = fullNodesTag peerA := tn.discovery(params, host, routingDisc, WithOnPeersUpdate(submit), @@ -49,7 +49,7 @@ func TestDiscovery(t *testing.T) { params = &Parameters{ PeersLimit: 0, AdvertiseInterval: time.Millisecond * 100, - Tag: "full", + Tag: fullNodesTag, } discs := make([]*Discovery, nodes) for i := range discs { diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index fdb55dc896..b22be0534d 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -227,8 +227,8 @@ func (m *Manager) Peer( } } -// UpdatedFullNodes is called by discovery when new full node is discovered or removed -func (m *Manager) UpdatedFullNodes(peerID peer.ID, isAdded bool) { +// UpdateFullNodePool is called by discovery when new full node is discovered or removed +func (m *Manager) UpdateFullNodePool(peerID peer.ID, isAdded bool) { if isAdded { if m.isBlacklistedPeer(peerID) { log.Debugw("got blacklisted peer from discovery", "peer", peerID.String()) diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index a461534ac8..de9f159cc6 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -432,7 +432,7 @@ func TestIntegration(t *testing.T) { params, fnHost, routingdisc.NewRoutingDiscovery(fnRouter), - discovery.WithOnPeersUpdate(fnPeerManager.UpdatedFullNodes), + discovery.WithOnPeersUpdate(fnPeerManager.UpdateFullNodePool), discovery.WithOnPeersUpdate(checkDiscoveredPeer), ) require.NoError(t, fnDisc.Start(ctx)) From b1c3d2cd6a6632b79c91aa3bd5c8c395a87de17b Mon Sep 17 00:00:00 2001 From: Vlad Date: Mon, 9 Oct 2023 17:05:02 +0400 Subject: [PATCH 6/6] - move tag to constructor - fix params validation - update test to use Start/Advertise api instead of passing special values params --- nodebuilder/share/constructors.go | 6 ++++ nodebuilder/tests/p2p_test.go | 2 -- share/availability/full/testing.go | 1 + share/p2p/discovery/discovery.go | 17 ++++------ share/p2p/discovery/discovery_test.go | 45 ++++++++++++++------------- share/p2p/discovery/options.go | 21 ++++--------- share/p2p/peers/manager_test.go | 4 ++- 7 files changed, 45 insertions(+), 51 deletions(-) diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index c15c7f3cdc..aa2ac5bec1 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -20,6 +20,11 @@ import ( "github.com/celestiaorg/celestia-node/share/p2p/peers" ) +const ( + // fullNodesTag is the tag used to identify full nodes in the discovery service. + fullNodesTag = "full" +) + func newDiscovery(cfg *disc.Parameters, ) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) { return func( @@ -31,6 +36,7 @@ func newDiscovery(cfg *disc.Parameters, cfg, h, routingdisc.NewRoutingDiscovery(r), + fullNodesTag, disc.WithOnPeersUpdate(manager.UpdateFullNodePool), ) } diff --git a/nodebuilder/tests/p2p_test.go b/nodebuilder/tests/p2p_test.go index d05846f40c..9fe63fb931 100644 --- a/nodebuilder/tests/p2p_test.go +++ b/nodebuilder/tests/p2p_test.go @@ -186,9 +186,7 @@ func TestRestartNodeDiscovery(t *testing.T) { sw.Disconnect(t, nodes[0], nodes[1]) // create and start one more FN with disabled discovery - fullCfg.Share.Discovery.PeersLimit = 0 disabledDiscoveryFN := sw.NewNodeWithConfig(node.Full, fullCfg, nodesConfig) - err = disabledDiscoveryFN.Start(ctx) require.NoError(t, err) // ensure that the FN with disabled discovery is discovered by both of the diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index deca8741c9..dd21d398c2 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -48,6 +48,7 @@ func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability { params, nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), + "full", ) require.NoError(t, err) store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore()) diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index 1b67cc4bf5..0f44d42dbe 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -42,6 +42,7 @@ var discoveryRetryTimeout = retryTimeout // Discovery combines advertise and discover services and allows to store discovered nodes. // TODO: The code here gets horribly hairy, so we should refactor this at some point type Discovery struct { + // Tag is used as rondezvous point for discovery service tag string set *limitedSet host host.Host @@ -73,15 +74,19 @@ func NewDiscovery( params *Parameters, h host.Host, d discovery.Discovery, + tag string, opts ...Option, ) (*Discovery, error) { if err := params.Validate(); err != nil { return nil, err } + if tag == "" { + return nil, fmt.Errorf("discovery: tag cannot be empty") + } o := newOptions(opts...) return &Discovery{ - tag: params.Tag, + tag: tag, set: newLimitedSet(params.PeersLimit), host: h, disc: d, @@ -96,11 +101,6 @@ func (d *Discovery) Start(context.Context) error { ctx, cancel := context.WithCancel(context.Background()) d.cancel = cancel - if d.params.PeersLimit == 0 { - log.Warn("peers limit is set to 0. Skipping discovery...") - return nil - } - sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize)) if err != nil { return fmt.Errorf("subscribing for connection events: %w", err) @@ -150,11 +150,6 @@ func (d *Discovery) Discard(id peer.ID) bool { // Advertise is a utility function that persistently advertises a service through an Advertiser. // TODO: Start advertising only after the reachability is confirmed by AutoNAT func (d *Discovery) Advertise(ctx context.Context) { - if d.params.AdvertiseInterval == -1 { - log.Warn("AdvertiseInterval is set to -1. Skipping advertising...") - return - } - timer := time.NewTimer(d.params.AdvertiseInterval) defer timer.Stop() for { diff --git a/share/p2p/discovery/discovery_test.go b/share/p2p/discovery/discovery_test.go index 61f65f470f..c02931e1a4 100644 --- a/share/p2p/discovery/discovery_test.go +++ b/share/p2p/discovery/discovery_test.go @@ -17,12 +17,16 @@ import ( "github.com/stretchr/testify/require" ) +const ( + fullNodesTag = "full" +) + func TestDiscovery(t *testing.T) { const nodes = 10 // higher number brings higher coverage discoveryRetryTimeout = time.Millisecond * 100 // defined in discovery.go - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*30) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) t.Cleanup(cancel) tn := newTestnet(ctx, t) @@ -39,22 +43,21 @@ func TestDiscovery(t *testing.T) { host, routingDisc := tn.peer() params := DefaultParameters() params.PeersLimit = nodes - params.AdvertiseInterval = -1 - params.Tag = fullNodesTag - peerA := tn.discovery(params, host, routingDisc, + // start discovery listener service for peerA + peerA := tn.startNewDiscovery(params, host, routingDisc, fullNodesTag, WithOnPeersUpdate(submit), ) - params = &Parameters{ - PeersLimit: 0, - AdvertiseInterval: time.Millisecond * 100, - Tag: fullNodesTag, - } + // start discovery advertisement services for other peers + params.AdvertiseInterval = time.Millisecond * 100 discs := make([]*Discovery, nodes) for i := range discs { host, routingDisc := tn.peer() - discs[i] = tn.discovery(params, host, routingDisc) + disc, err := NewDiscovery(params, host, routingDisc, fullNodesTag) + require.NoError(t, err) + go disc.Advertise(tn.ctx) + discs[i] = tn.startNewDiscovery(params, host, routingDisc, fullNodesTag) select { case res := <-updateCh: @@ -67,6 +70,7 @@ func TestDiscovery(t *testing.T) { assert.EqualValues(t, nodes, peerA.set.Size()) + // disconnect peerA from all peers and check that notifications are received on updateCh channel for _, disc := range discs { peerID := disc.host.ID() err := peerA.host.Network().ClosePeer(peerID) @@ -100,22 +104,20 @@ func TestDiscoveryTagged(t *testing.T) { params := DefaultParameters() // create 2 discovery services for sub, each with a different tag - params.Tag = "tag1" done1 := make(chan struct{}) - tn.discovery(params, sub, routingDisc, + tn.startNewDiscovery(params, sub, routingDisc, "tag1", WithOnPeersUpdate(checkPeer(t, adv1.ID(), done1))) - params.Tag = "tag2" done2 := make(chan struct{}) - tn.discovery(params, sub, routingDisc, + tn.startNewDiscovery(params, sub, routingDisc, "tag2", WithOnPeersUpdate(checkPeer(t, adv2.ID(), done2))) // run discovery services for advertisers - params.Tag = "tag1" - tn.discovery(params, adv1, routingDisc1) + ds1 := tn.startNewDiscovery(params, adv1, routingDisc1, "tag1") + go ds1.Advertise(tn.ctx) - params.Tag = "tag2" - tn.discovery(params, adv2, routingDisc2) + ds2 := tn.startNewDiscovery(params, adv2, routingDisc2, "tag2") + go ds2.Advertise(tn.ctx) // wait for discovery services to discover each other on different tags select { @@ -155,13 +157,14 @@ func newTestnet(ctx context.Context, t *testing.T) *testnet { return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)} } -func (t *testnet) discovery( +func (t *testnet) startNewDiscovery( params *Parameters, hst host.Host, routingDisc discovery.Discovery, + tag string, opts ...Option, ) *Discovery { - disc, err := NewDiscovery(params, hst, routingDisc, opts...) + disc, err := NewDiscovery(params, hst, routingDisc, tag, opts...) require.NoError(t.T, err) err = disc.Start(t.ctx) require.NoError(t.T, err) @@ -169,8 +172,6 @@ func (t *testnet) discovery( err := disc.Stop(t.ctx) require.NoError(t.T, err) }) - - go disc.Advertise(t.ctx) return disc } diff --git a/share/p2p/discovery/options.go b/share/p2p/discovery/options.go index 886f1850d2..de4b13a7db 100644 --- a/share/p2p/discovery/options.go +++ b/share/p2p/discovery/options.go @@ -7,11 +7,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) -const ( - // fullNodesTag is the namespace where full nodes advertise and discover each other. - fullNodesTag = "full" -) - // Parameters is the set of Parameters that must be configured for the Discovery module type Parameters struct { // PeersLimit defines the soft limit of FNs to connect to via discovery. @@ -21,8 +16,6 @@ type Parameters struct { // Set -1 to disable. // NOTE: only full and bridge can advertise themselves. AdvertiseInterval time.Duration - // Tag is used as rondezvous point for discovery service - Tag string } // options is the set of options that can be configured for the Discovery module @@ -40,19 +33,17 @@ func DefaultParameters() *Parameters { return &Parameters{ PeersLimit: 5, AdvertiseInterval: time.Hour, - //TODO: remove fullNodesTag default value once multiple tags are supported - Tag: fullNodesTag, } } // Validate validates the values in Parameters func (p *Parameters) Validate() error { - if p.Tag == "" { - return fmt.Errorf( - "discovery: invalid option: value Tag %s, %s", - "is empty.", - "value must be non-empty", - ) + if p.PeersLimit <= 0 { + return fmt.Errorf("discovery: peers limit cannot be zero or negative") + } + + if p.AdvertiseInterval <= 0 { + return fmt.Errorf("discovery: advertise interval cannot be zero or negative") } return nil } diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index 79abad53c5..94ec5d5ea2 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -363,6 +363,7 @@ func TestIntegration(t *testing.T) { }) t.Run("get peer from discovery", func(t *testing.T) { + fullNodesTag := "fullNodes" nw, err := mocknet.FullMeshConnected(3) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -393,12 +394,12 @@ func TestIntegration(t *testing.T) { params := discovery.DefaultParameters() params.AdvertiseInterval = time.Second - params.PeersLimit = 0 bnDisc, err := discovery.NewDiscovery( params, bnHost, routingdisc.NewRoutingDiscovery(bnRouter), + fullNodesTag, ) require.NoError(t, err) @@ -433,6 +434,7 @@ func TestIntegration(t *testing.T) { params, fnHost, routingdisc.NewRoutingDiscovery(fnRouter), + fullNodesTag, discovery.WithOnPeersUpdate(fnPeerManager.UpdateFullNodePool), discovery.WithOnPeersUpdate(checkDiscoveredPeer), )