Skip to content

Commit

Permalink
rework options constructor pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Sep 27, 2023
1 parent 7e79b4c commit fc879f7
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 82 deletions.
8 changes: 3 additions & 5 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,16 @@ import (
)

func newDiscovery(cfg *disc.Parameters,
) func(routing.ContentRouting, host.Host, *peers.Manager) *disc.Discovery {
) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) {
return func(
r routing.ContentRouting,
h host.Host,
manager *peers.Manager,
) *disc.Discovery {
) (*disc.Discovery, error) {
return disc.NewDiscovery(
cfg,
h,
routingdisc.NewRoutingDiscovery(r),
disc.WithPeersLimit(cfg.PeersLimit),
disc.WithAdvertiseInterval(cfg.AdvertiseInterval),
disc.WithOnPeersUpdate(manager.UpdatedFullNodes),
)
}
}
Expand Down
9 changes: 6 additions & 3 deletions share/availability/full/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode {
}

func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability {
disc := discovery.NewDiscovery(
params := discovery.DefaultParameters()
params.AdvertiseInterval = time.Second
params.PeersLimit = 10
disc, err := discovery.NewDiscovery(
params,
nil,
routing.NewRoutingDiscovery(routinghelpers.Null{}),
discovery.WithAdvertiseInterval(time.Second),
discovery.WithPeersLimit(10),
)
require.NoError(t, err)
store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore())
require.NoError(t, err)
err = store.Start(context.Background())
Expand Down
15 changes: 7 additions & 8 deletions share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,26 @@ func (f OnUpdatedPeers) add(next OnUpdatedPeers) OnUpdatedPeers {

// NewDiscovery constructs a new discovery.
func NewDiscovery(
params *Parameters,
h host.Host,
d discovery.Discovery,
opts ...Option,
) *Discovery {
params := DefaultParameters()

for _, opt := range opts {
opt(params)
) (*Discovery, error) {
if err := params.Validate(); err != nil {
return nil, err
}

o := newOptions(opts...)
return &Discovery{
tag: params.Tag,
set: newLimitedSet(params.PeersLimit),
host: h,
disc: d,
connector: newBackoffConnector(h, defaultBackoffFactory),
onUpdatedPeers: params.onUpdatedPeers,
onUpdatedPeers: o.onUpdatedPeers,
params: params,
triggerDisc: make(chan struct{}),
}
}, nil
}

func (d *Discovery) Start(context.Context) error {
Expand Down Expand Up @@ -158,7 +158,6 @@ func (d *Discovery) Advertise(ctx context.Context) {
timer := time.NewTimer(d.params.AdvertiseInterval)
defer timer.Stop()
for {
fmt.Println(d.tag)
_, err := d.disc.Advertise(ctx, d.tag)
d.metrics.observeAdvertise(ctx, err)
if err != nil {
Expand Down
49 changes: 31 additions & 18 deletions share/p2p/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,24 @@ func TestDiscovery(t *testing.T) {
}

host, routingDisc := tn.peer()
peerA := tn.discovery(
host, routingDisc,
WithPeersLimit(nodes),
WithAdvertiseInterval(-1),
params := DefaultParameters()
params.PeersLimit = nodes
params.AdvertiseInterval = -1
params.Tag = "full"

peerA := tn.discovery(params, host, routingDisc,
WithOnPeersUpdate(submit),
)

params = &Parameters{
PeersLimit: 0,
AdvertiseInterval: time.Millisecond * 100,
Tag: "full",
}
discs := make([]*Discovery, nodes)
for i := range discs {
host, routingDisc := tn.peer()
discs[i] = tn.discovery(host, routingDisc,
WithPeersLimit(0),
WithAdvertiseInterval(time.Millisecond*100))
discs[i] = tn.discovery(params, host, routingDisc)

select {
case res := <-updateCh:
Expand Down Expand Up @@ -92,23 +97,25 @@ func TestDiscoveryTagged(t *testing.T) {
// sub will discover both peers, but on different tags
sub, routingDisc := tn.peer()

params := DefaultParameters()

// create 2 discovery services for sub, each with a different tag
params.Tag = "tag1"
done1 := make(chan struct{})
tn.discovery(sub, routingDisc,
WithTag("tag1"),
tn.discovery(params, sub, routingDisc,
WithOnPeersUpdate(checkPeer(t, adv1.ID(), done1)))

params.Tag = "tag2"
done2 := make(chan struct{})
tn.discovery(sub, routingDisc,
WithTag("tag2"),
tn.discovery(params, sub, routingDisc,
WithOnPeersUpdate(checkPeer(t, adv2.ID(), done2)))

// run discovery services for advertisers
tn.discovery(adv1, routingDisc1,
WithTag("tag1"))
params.Tag = "tag1"
tn.discovery(params, adv1, routingDisc1)

tn.discovery(adv2, routingDisc2,
WithTag("tag2"))
params.Tag = "tag2"
tn.discovery(params, adv2, routingDisc2)

// wait for discovery services to discover each other on different tags
select {
Expand Down Expand Up @@ -148,9 +155,15 @@ func newTestnet(ctx context.Context, t *testing.T) *testnet {
return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)}
}

func (t *testnet) discovery(hst host.Host, routingDisc discovery.Discovery, opts ...Option) *Discovery {
disc := NewDiscovery(hst, routingDisc, opts...)
err := disc.Start(t.ctx)
func (t *testnet) discovery(
params *Parameters,
hst host.Host,
routingDisc discovery.Discovery,
opts ...Option,
) *Discovery {
disc, err := NewDiscovery(params, hst, routingDisc, opts...)
require.NoError(t.T, err)
err = disc.Start(t.ctx)
require.NoError(t.T, err)
t.T.Cleanup(func() {
err := disc.Stop(t.ctx)
Expand Down
59 changes: 17 additions & 42 deletions share/p2p/discovery/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ type Parameters struct {
// Set -1 to disable.
// NOTE: only full and bridge can advertise themselves.
AdvertiseInterval time.Duration
// onUpdatedPeers will be called on peer set changes
onUpdatedPeers OnUpdatedPeers
// Tag is used as rondezvous point for discovery service
Tag string
}

// options is the set of options that can be configured for the Discovery module
type options struct {
// onUpdatedPeers will be called on peer set changes
onUpdatedPeers OnUpdatedPeers
}

// Option is a function that configures Discovery Parameters
type Option func(*Parameters)
type Option func(*options)

// DefaultParameters returns the default Parameters' configuration values
// for the Discovery module
Expand All @@ -37,29 +41,12 @@ func DefaultParameters() *Parameters {
PeersLimit: 5,
AdvertiseInterval: time.Hour,
//TODO: remove fullNodesTag default value once multiple tags are supported
Tag: fullNodesTag,
onUpdatedPeers: func(peer.ID, bool) {},
Tag: fullNodesTag,
}
}

// Validate validates the values in Parameters
func (p *Parameters) Validate() error {
if p.AdvertiseInterval <= 0 {
return fmt.Errorf(
"discovery: invalid option: value AdvertiseInterval %s, %s",
"is 0 or negative.",
"value must be positive",
)
}

if p.PeersLimit <= 0 {
return fmt.Errorf(
"discovery: invalid option: value PeersLimit %s, %s",
"is negative.",
"value must be positive",
)
}

if p.Tag == "" {
return fmt.Errorf(
"discovery: invalid option: value Tag %s, %s",
Expand All @@ -70,32 +57,20 @@ func (p *Parameters) Validate() error {
return nil
}

// WithPeersLimit is a functional option that Discovery
// uses to set the PeersLimit configuration param
func WithPeersLimit(peersLimit uint) Option {
return func(p *Parameters) {
p.PeersLimit = peersLimit
}
}

// WithAdvertiseInterval is a functional option that Discovery
// uses to set the AdvertiseInterval configuration param
func WithAdvertiseInterval(advInterval time.Duration) Option {
return func(p *Parameters) {
p.AdvertiseInterval = advInterval
}
}

// WithOnPeersUpdate chains OnPeersUpdate callbacks on every update of discovered peers list.
func WithOnPeersUpdate(f OnUpdatedPeers) Option {
return func(p *Parameters) {
return func(p *options) {
p.onUpdatedPeers = p.onUpdatedPeers.add(f)
}
}

// WithTag is a functional option that sets the Tag for the discovery service
func WithTag(tag string) Option {
return func(p *Parameters) {
p.Tag = tag
func newOptions(opts ...Option) *options {
defaults := &options{
onUpdatedPeers: func(peer.ID, bool) {},
}

for _, opt := range opts {
opt(defaults)
}
return defaults
}
20 changes: 14 additions & 6 deletions share/p2p/peers/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,17 @@ func TestIntegration(t *testing.T) {
bnHost := nw.Hosts()[1]
bnRouter, err := dht.New(ctx, bnHost, opts...)
require.NoError(t, err)
bnDisc := discovery.NewDiscovery(

params := discovery.DefaultParameters()
params.AdvertiseInterval = time.Second
params.PeersLimit = 0

bnDisc, err := discovery.NewDiscovery(
params,
bnHost,
routingdisc.NewRoutingDiscovery(bnRouter),
discovery.WithPeersLimit(0),
discovery.WithAdvertiseInterval(time.Second),
)
require.NoError(t, err)

// set up full node / receiver node
fnHost := nw.Hosts()[2]
Expand All @@ -419,11 +424,14 @@ func TestIntegration(t *testing.T) {
}

// set up discovery for full node with hook to peer manager and check discovered peer
fnDisc := discovery.NewDiscovery(
params = discovery.DefaultParameters()
params.AdvertiseInterval = time.Second
params.PeersLimit = 10

fnDisc, err := discovery.NewDiscovery(
params,
fnHost,
routingdisc.NewRoutingDiscovery(fnRouter),
discovery.WithPeersLimit(10),
discovery.WithAdvertiseInterval(time.Second),
discovery.WithOnPeersUpdate(fnPeerManager.UpdatedFullNodes),
discovery.WithOnPeersUpdate(checkDiscoveredPeer),
)
Expand Down

0 comments on commit fc879f7

Please sign in to comment.