Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(discovery)!: discover peers by tag #2730

Merged
merged 8 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nodebuilder/share/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {
PeerManagerParams peers.Parameters

LightAvailability light.Parameters `toml:",omitempty"`
Discovery discovery.Parameters
Discovery *discovery.Parameters
}

func DefaultConfig(tp node.Type) Config {
Expand Down
10 changes: 6 additions & 4 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ import (
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/ipld"
disc "github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
)

func newDiscovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery {
func newDiscovery(cfg *disc.Parameters,
) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) {
return func(
r routing.ContentRouting,
h host.Host,
) *disc.Discovery {
manager *peers.Manager,
) (*disc.Discovery, error) {
return disc.NewDiscovery(
cfg,
h,
routingdisc.NewRoutingDiscovery(r),
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
disc.WithPeersLimit(cfg.Discovery.PeersLimit),
disc.WithAdvertiseInterval(cfg.Discovery.AdvertiseInterval),
)
}
}
Expand Down
4 changes: 1 addition & 3 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Provide(newModule),
fx.Invoke(func(disc *disc.Discovery) {}),
fx.Provide(fx.Annotate(
newDiscovery(*cfg),
newDiscovery(cfg.Discovery),
fx.OnStart(func(ctx context.Context, d *disc.Discovery) error {
return d.Start(ctx)
}),
Expand Down Expand Up @@ -147,7 +147,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Provide(
func(
params peers.Parameters,
discovery *disc.Discovery,
host host.Host,
connGater *conngater.BasicConnectionGater,
shrexSub *shrexsub.PubSub,
Expand All @@ -158,7 +157,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
) (*peers.Manager, error) {
return peers.NewManager(
params,
discovery,
host,
connGater,
peers.WithShrexSubPools(shrexSub, headerSub),
Expand Down
9 changes: 6 additions & 3 deletions share/availability/full/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode {
}

func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability {
disc := discovery.NewDiscovery(
params := discovery.DefaultParameters()
params.AdvertiseInterval = time.Second
params.PeersLimit = 10
disc, err := discovery.NewDiscovery(
params,
nil,
routing.NewRoutingDiscovery(routinghelpers.Null{}),
discovery.WithAdvertiseInterval(time.Second),
discovery.WithPeersLimit(10),
)
require.NoError(t, err)
store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore())
require.NoError(t, err)
err = store.Start(context.Background())
Expand Down
9 changes: 0 additions & 9 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import (

"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
Expand All @@ -25,7 +23,6 @@ import (
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
"github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
Expand Down Expand Up @@ -210,18 +207,12 @@ func testManager(
return nil, err
}

disc := discovery.NewDiscovery(nil,
routingdisc.NewRoutingDiscovery(routinghelpers.Null{}),
discovery.WithPeersLimit(10),
discovery.WithAdvertiseInterval(time.Second),
)
connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore()))
if err != nil {
return nil, err
}
manager, err := peers.NewManager(
peers.DefaultParameters(),
disc,
host,
connGater,
peers.WithShrexSubPools(shrexSub, headerSub),
Expand Down
49 changes: 23 additions & 26 deletions share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import (
var log = logging.Logger("share/discovery")

const (
// rendezvousPoint is the namespace where peers advertise and discover each other.
rendezvousPoint = "full"

// eventbusBufSize is the size of the buffered channel to handle
// events in libp2p. We specify a larger buffer size for the channel
// to avoid overflowing and blocking subscription during disconnection bursts.
Expand All @@ -45,6 +42,7 @@ var discoveryRetryTimeout = retryTimeout
// Discovery combines advertise and discover services and allows to store discovered nodes.
// TODO: The code here gets horribly hairy, so we should refactor this at some point
type Discovery struct {
tag string
set *limitedSet
host host.Host
disc discovery.Discovery
Expand All @@ -58,32 +56,40 @@ type Discovery struct {

cancel context.CancelFunc

params Parameters
params *Parameters
}

type OnUpdatedPeers func(peerID peer.ID, isAdded bool)

func (f OnUpdatedPeers) add(next OnUpdatedPeers) OnUpdatedPeers {
return func(peerID peer.ID, isAdded bool) {
f(peerID, isAdded)
next(peerID, isAdded)
}
}

// NewDiscovery constructs a new discovery.
func NewDiscovery(
params *Parameters,
h host.Host,
d discovery.Discovery,
opts ...Option,
) *Discovery {
params := DefaultParameters()

for _, opt := range opts {
opt(&params)
) (*Discovery, error) {
if err := params.Validate(); err != nil {
return nil, err
}

o := newOptions(opts...)
return &Discovery{
tag: params.Tag,
set: newLimitedSet(params.PeersLimit),
host: h,
disc: d,
connector: newBackoffConnector(h, defaultBackoffFactory),
onUpdatedPeers: func(peer.ID, bool) {},
onUpdatedPeers: o.onUpdatedPeers,
params: params,
triggerDisc: make(chan struct{}),
}
}, nil
}

func (d *Discovery) Start(context.Context) error {
Expand Down Expand Up @@ -111,15 +117,6 @@ func (d *Discovery) Stop(context.Context) error {
return nil
}

// WithOnPeersUpdate chains OnPeersUpdate callbacks on every update of discovered peers list.
func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) {
prev := d.onUpdatedPeers
d.onUpdatedPeers = func(peerID peer.ID, isAdded bool) {
prev(peerID, isAdded)
f(peerID, isAdded)
}
}

// Peers provides a list of discovered peers in the "full" topic.
// If Discovery hasn't found any peers, it blocks until at least one peer is found.
func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) {
Expand All @@ -133,7 +130,7 @@ func (d *Discovery) Discard(id peer.ID) bool {
return false
}

d.host.ConnManager().Unprotect(id, rendezvousPoint)
walldiss marked this conversation as resolved.
Show resolved Hide resolved
d.host.ConnManager().Unprotect(id, d.tag)
d.connector.Backoff(id)
d.set.Remove(id)
d.onUpdatedPeers(id, false)
Expand Down Expand Up @@ -161,13 +158,13 @@ func (d *Discovery) Advertise(ctx context.Context) {
timer := time.NewTimer(d.params.AdvertiseInterval)
defer timer.Stop()
for {
_, err := d.disc.Advertise(ctx, rendezvousPoint)
_, err := d.disc.Advertise(ctx, d.tag)
d.metrics.observeAdvertise(ctx, err)
if err != nil {
if ctx.Err() != nil {
return
}
log.Warnw("error advertising", "rendezvous", rendezvousPoint, "err", err)
log.Warnw("error advertising", "rendezvous", d.tag, "err", err)

// we don't want retry indefinitely in busy loop
// internal discovery mechanism may need some time before attempts
Expand Down Expand Up @@ -280,7 +277,7 @@ func (d *Discovery) discover(ctx context.Context) bool {
findCancel()
}()

peers, err := d.disc.FindPeers(findCtx, rendezvousPoint)
peers, err := d.disc.FindPeers(findCtx, d.tag)
if err != nil {
log.Error("unable to start discovery", "err", err)
return false
Expand Down Expand Up @@ -371,11 +368,11 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo
d.metrics.observeHandlePeer(ctx, handlePeerConnected)
logger.Debug("added peer to set")

// tag to protect peer from being killed by ConnManager
// Tag to protect peer from being killed by ConnManager
// NOTE: This is does not protect from remote killing the connection.
// In the future, we should design a protocol that keeps bidirectional agreement on whether
// connection should be kept or not, similar to mesh link in GossipSub.
d.host.ConnManager().Protect(peer.ID, rendezvousPoint)
d.host.ConnManager().Protect(peer.ID, d.tag)
return true
}

Expand Down
95 changes: 83 additions & 12 deletions share/p2p/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,34 @@ func TestDiscovery(t *testing.T) {

tn := newTestnet(ctx, t)

peerA := tn.discovery(
WithPeersLimit(nodes),
WithAdvertiseInterval(-1),
)

type peerUpdate struct {
peerID peer.ID
isAdded bool
}
updateCh := make(chan peerUpdate)
peerA.WithOnPeersUpdate(func(peerID peer.ID, isAdded bool) {
submit := func(peerID peer.ID, isAdded bool) {
updateCh <- peerUpdate{peerID: peerID, isAdded: isAdded}
})
}

host, routingDisc := tn.peer()
params := DefaultParameters()
params.PeersLimit = nodes
params.AdvertiseInterval = -1
params.Tag = "full"

peerA := tn.discovery(params, host, routingDisc,
WithOnPeersUpdate(submit),
)

params = &Parameters{
PeersLimit: 0,
AdvertiseInterval: time.Millisecond * 100,
Tag: "full",
walldiss marked this conversation as resolved.
Show resolved Hide resolved
}
discs := make([]*Discovery, nodes)
for i := range discs {
discs[i] = tn.discovery(WithPeersLimit(0), WithAdvertiseInterval(time.Millisecond*100))
host, routingDisc := tn.peer()
discs[i] = tn.discovery(params, host, routingDisc)

select {
case res := <-updateCh:
Expand Down Expand Up @@ -73,6 +84,53 @@ func TestDiscovery(t *testing.T) {
assert.EqualValues(t, 0, peerA.set.Size())
}

func TestDiscoveryTagged(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
t.Cleanup(cancel)

tn := newTestnet(ctx, t)

// launch 2 peers, that advertise with different tags
adv1, routingDisc1 := tn.peer()
adv2, routingDisc2 := tn.peer()

// sub will discover both peers, but on different tags
sub, routingDisc := tn.peer()

params := DefaultParameters()

// create 2 discovery services for sub, each with a different tag
params.Tag = "tag1"
done1 := make(chan struct{})
tn.discovery(params, sub, routingDisc,
WithOnPeersUpdate(checkPeer(t, adv1.ID(), done1)))

params.Tag = "tag2"
done2 := make(chan struct{})
tn.discovery(params, sub, routingDisc,
WithOnPeersUpdate(checkPeer(t, adv2.ID(), done2)))

// run discovery services for advertisers
params.Tag = "tag1"
tn.discovery(params, adv1, routingDisc1)

params.Tag = "tag2"
tn.discovery(params, adv2, routingDisc2)

// wait for discovery services to discover each other on different tags
select {
case <-done1:
case <-ctx.Done():
t.Fatal("did not discover peer in time")
}

select {
case <-done2:
case <-ctx.Done():
t.Fatal("did not discover peer in time")
}
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved

type testnet struct {
ctx context.Context
T *testing.T
Expand All @@ -97,10 +155,15 @@ func newTestnet(ctx context.Context, t *testing.T) *testnet {
return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)}
}

func (t *testnet) discovery(opts ...Option) *Discovery {
hst, routingDisc := t.peer()
disc := NewDiscovery(hst, routingDisc, opts...)
err := disc.Start(t.ctx)
func (t *testnet) discovery(
params *Parameters,
hst host.Host,
routingDisc discovery.Discovery,
opts ...Option,
) *Discovery {
disc, err := NewDiscovery(params, hst, routingDisc, opts...)
require.NoError(t.T, err)
err = disc.Start(t.ctx)
require.NoError(t.T, err)
t.T.Cleanup(func() {
err := disc.Stop(t.ctx)
Expand Down Expand Up @@ -134,3 +197,11 @@ func (t *testnet) peer() (host.Host, discovery.Discovery) {

return hst, routing.NewRoutingDiscovery(dht)
}

func checkPeer(t *testing.T, expected peer.ID, done chan struct{}) func(peerID peer.ID, isAdded bool) {
return func(peerID peer.ID, isAdded bool) {
defer close(done)
require.Equal(t, expected, peerID)
require.True(t, isAdded)
}
}
3 changes: 1 addition & 2 deletions share/p2p/discovery/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ const (

handlePeerResultKey = "result"
handlePeerSkipSelf handlePeerResult = "skip_self"
handlePeerEmptyAddrs handlePeerResult = "skip_empty_addresses"
handlePeerEnoughPeers handlePeerResult = "skip_enough_peers"
handlePeerBackoff handlePeerResult = "skip_backoff"
handlePeerConnected handlePeerResult = "connected"
Expand Down Expand Up @@ -47,7 +46,7 @@ func (d *Discovery) WithMetrics() error {
return fmt.Errorf("discovery: init metrics: %w", err)
}
d.metrics = metrics
d.WithOnPeersUpdate(metrics.observeOnPeersUpdate)
d.onUpdatedPeers = d.onUpdatedPeers.add(metrics.observeOnPeersUpdate)
return nil
}

Expand Down
Loading
Loading