Skip to content

Commit

Permalink
feat(discovery): discover peers by tag (#2730)
Browse files Browse the repository at this point in the history
Allow to run multiple discovery subcomponents, where each can find new
peers based on preset `tag` and notify its subscribers. It could allow
any sort of separate discovery subcomponents (versioned,
pruned/archived, etc). Essentially it allows discovery to be abstracted
away and act as single function component.

Involves some minor refactoring here and there just to clean up things.

Resolves #2578
  • Loading branch information
walldiss authored Oct 9, 2023
1 parent 1ba92c3 commit ec4472a
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 152 deletions.
2 changes: 1 addition & 1 deletion nodebuilder/share/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {
PeerManagerParams peers.Parameters

LightAvailability light.Parameters `toml:",omitempty"`
Discovery discovery.Parameters
Discovery *discovery.Parameters
}

func DefaultConfig(tp node.Type) Config {
Expand Down
17 changes: 13 additions & 4 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@ import (
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/ipld"
disc "github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
)

func newDiscovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery {
const (
// fullNodesTag is the tag used to identify full nodes in the discovery service.
fullNodesTag = "full"
)

func newDiscovery(cfg *disc.Parameters,
) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) {
return func(
r routing.ContentRouting,
h host.Host,
) *disc.Discovery {
manager *peers.Manager,
) (*disc.Discovery, error) {
return disc.NewDiscovery(
cfg,
h,
routingdisc.NewRoutingDiscovery(r),
disc.WithPeersLimit(cfg.Discovery.PeersLimit),
disc.WithAdvertiseInterval(cfg.Discovery.AdvertiseInterval),
fullNodesTag,
disc.WithOnPeersUpdate(manager.UpdateFullNodePool),
)
}
}
Expand Down
4 changes: 1 addition & 3 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Provide(newModule),
fx.Invoke(func(disc *disc.Discovery) {}),
fx.Provide(fx.Annotate(
newDiscovery(*cfg),
newDiscovery(cfg.Discovery),
fx.OnStart(func(ctx context.Context, d *disc.Discovery) error {
return d.Start(ctx)
}),
Expand Down Expand Up @@ -147,7 +147,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Provide(
func(
params peers.Parameters,
discovery *disc.Discovery,
host host.Host,
connGater *conngater.BasicConnectionGater,
shrexSub *shrexsub.PubSub,
Expand All @@ -158,7 +157,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
) (*peers.Manager, error) {
return peers.NewManager(
params,
discovery,
host,
connGater,
peers.WithShrexSubPools(shrexSub, headerSub),
Expand Down
2 changes: 0 additions & 2 deletions nodebuilder/tests/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ func TestRestartNodeDiscovery(t *testing.T) {
sw.Disconnect(t, nodes[0], nodes[1])

// create and start one more FN with disabled discovery
fullCfg.Share.Discovery.PeersLimit = 0
disabledDiscoveryFN := sw.NewNodeWithConfig(node.Full, fullCfg, nodesConfig)
err = disabledDiscoveryFN.Start(ctx)
require.NoError(t, err)

// ensure that the FN with disabled discovery is discovered by both of the
Expand Down
10 changes: 7 additions & 3 deletions share/availability/full/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode {
}

func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability {
disc := discovery.NewDiscovery(
params := discovery.DefaultParameters()
params.AdvertiseInterval = time.Second
params.PeersLimit = 10
disc, err := discovery.NewDiscovery(
params,
nil,
routing.NewRoutingDiscovery(routinghelpers.Null{}),
discovery.WithAdvertiseInterval(time.Second),
discovery.WithPeersLimit(10),
"full",
)
require.NoError(t, err)
store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore())
require.NoError(t, err)
err = store.Start(context.Background())
Expand Down
9 changes: 0 additions & 9 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import (

"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
Expand All @@ -25,7 +23,6 @@ import (
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
"github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
Expand Down Expand Up @@ -210,18 +207,12 @@ func testManager(
return nil, err
}

disc := discovery.NewDiscovery(nil,
routingdisc.NewRoutingDiscovery(routinghelpers.Null{}),
discovery.WithPeersLimit(10),
discovery.WithAdvertiseInterval(time.Second),
)
connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore()))
if err != nil {
return nil, err
}
manager, err := peers.NewManager(
peers.DefaultParameters(),
disc,
host,
connGater,
peers.WithShrexSubPools(shrexSub, headerSub),
Expand Down
64 changes: 28 additions & 36 deletions share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import (
var log = logging.Logger("share/discovery")

const (
// rendezvousPoint is the namespace where peers advertise and discover each other.
rendezvousPoint = "full"

// eventbusBufSize is the size of the buffered channel to handle
// events in libp2p. We specify a larger buffer size for the channel
// to avoid overflowing and blocking subscription during disconnection bursts.
Expand All @@ -45,6 +42,8 @@ var discoveryRetryTimeout = retryTimeout
// Discovery combines advertise and discover services and allows to store discovered nodes.
// TODO: The code here gets horribly hairy, so we should refactor this at some point
type Discovery struct {
// Tag is used as rondezvous point for discovery service
tag string
set *limitedSet
host host.Host
disc discovery.Discovery
Expand All @@ -58,43 +57,50 @@ type Discovery struct {

cancel context.CancelFunc

params Parameters
params *Parameters
}

type OnUpdatedPeers func(peerID peer.ID, isAdded bool)

func (f OnUpdatedPeers) add(next OnUpdatedPeers) OnUpdatedPeers {
return func(peerID peer.ID, isAdded bool) {
f(peerID, isAdded)
next(peerID, isAdded)
}
}

// NewDiscovery constructs a new discovery.
func NewDiscovery(
params *Parameters,
h host.Host,
d discovery.Discovery,
tag string,
opts ...Option,
) *Discovery {
params := DefaultParameters()

for _, opt := range opts {
opt(&params)
) (*Discovery, error) {
if err := params.Validate(); err != nil {
return nil, err
}

if tag == "" {
return nil, fmt.Errorf("discovery: tag cannot be empty")
}
o := newOptions(opts...)
return &Discovery{
tag: tag,
set: newLimitedSet(params.PeersLimit),
host: h,
disc: d,
connector: newBackoffConnector(h, defaultBackoffFactory),
onUpdatedPeers: func(peer.ID, bool) {},
onUpdatedPeers: o.onUpdatedPeers,
params: params,
triggerDisc: make(chan struct{}),
}
}, nil
}

func (d *Discovery) Start(context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel

if d.params.PeersLimit == 0 {
log.Warn("peers limit is set to 0. Skipping discovery...")
return nil
}

sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize))
if err != nil {
return fmt.Errorf("subscribing for connection events: %w", err)
Expand All @@ -111,15 +117,6 @@ func (d *Discovery) Stop(context.Context) error {
return nil
}

// WithOnPeersUpdate chains OnPeersUpdate callbacks on every update of discovered peers list.
func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) {
prev := d.onUpdatedPeers
d.onUpdatedPeers = func(peerID peer.ID, isAdded bool) {
prev(peerID, isAdded)
f(peerID, isAdded)
}
}

// Peers provides a list of discovered peers in the "full" topic.
// If Discovery hasn't found any peers, it blocks until at least one peer is found.
func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) {
Expand All @@ -133,7 +130,7 @@ func (d *Discovery) Discard(id peer.ID) bool {
return false
}

d.host.ConnManager().Unprotect(id, rendezvousPoint)
d.host.ConnManager().Unprotect(id, d.tag)
d.connector.Backoff(id)
d.set.Remove(id)
d.onUpdatedPeers(id, false)
Expand All @@ -153,21 +150,16 @@ func (d *Discovery) Discard(id peer.ID) bool {
// Advertise is a utility function that persistently advertises a service through an Advertiser.
// TODO: Start advertising only after the reachability is confirmed by AutoNAT
func (d *Discovery) Advertise(ctx context.Context) {
if d.params.AdvertiseInterval == -1 {
log.Warn("AdvertiseInterval is set to -1. Skipping advertising...")
return
}

timer := time.NewTimer(d.params.AdvertiseInterval)
defer timer.Stop()
for {
_, err := d.disc.Advertise(ctx, rendezvousPoint)
_, err := d.disc.Advertise(ctx, d.tag)
d.metrics.observeAdvertise(ctx, err)
if err != nil {
if ctx.Err() != nil {
return
}
log.Warnw("error advertising", "rendezvous", rendezvousPoint, "err", err)
log.Warnw("error advertising", "rendezvous", d.tag, "err", err)

// we don't want retry indefinitely in busy loop
// internal discovery mechanism may need some time before attempts
Expand Down Expand Up @@ -280,7 +272,7 @@ func (d *Discovery) discover(ctx context.Context) bool {
findCancel()
}()

peers, err := d.disc.FindPeers(findCtx, rendezvousPoint)
peers, err := d.disc.FindPeers(findCtx, d.tag)
if err != nil {
log.Error("unable to start discovery", "err", err)
return false
Expand Down Expand Up @@ -371,11 +363,11 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo
d.metrics.observeHandlePeer(ctx, handlePeerConnected)
logger.Debug("added peer to set")

// tag to protect peer from being killed by ConnManager
// Tag to protect peer from being killed by ConnManager
// NOTE: This is does not protect from remote killing the connection.
// In the future, we should design a protocol that keeps bidirectional agreement on whether
// connection should be kept or not, similar to mesh link in GossipSub.
d.host.ConnManager().Protect(peer.ID, rendezvousPoint)
d.host.ConnManager().Protect(peer.ID, d.tag)
return true
}

Expand Down
Loading

0 comments on commit ec4472a

Please sign in to comment.