Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Jan 9, 2025
1 parent 2da5ad7 commit 88a752c
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 73 deletions.
2 changes: 1 addition & 1 deletion network/discovery/dv5_bootnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ func createBootnodeDiscovery(ctx context.Context, logger *zap.Logger, networkCfg
Bootnodes: []string{},
},
}
return newDiscV5Service(ctx, logger, false, nil, discOpts)
return newDiscV5Service(ctx, logger, discOpts)
}
2 changes: 1 addition & 1 deletion network/discovery/dv5_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (dvs *DiscV5Service) FindPeers(ctx context.Context, ns string, opt ...disco
}
cn := make(chan peer.AddrInfo, 32)

dvs.discover(ctx, logger, func(e PeerEvent) {
dvs.discover(ctx, func(e PeerEvent) {
cn <- e.AddrInfo
}, time.Millisecond, dvs.ssvNodeFilter(logger), dvs.badNodeFilter(logger), dvs.subnetFilter(subnet), dvs.alreadyConnectedFilter(), dvs.recentlyTrimmedFilter())

Expand Down
70 changes: 11 additions & 59 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"net"
"strconv"
"time"

"github.com/ethereum/go-ethereum/p2p/discover"
Expand All @@ -17,7 +16,6 @@ import (
"github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/records"
"github.com/ssvlabs/ssv/network/topics"
"github.com/ssvlabs/ssv/networkconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -57,14 +55,6 @@ type DiscV5Service struct {
dv5Listener Listener
bootnodes []*enode.Node

// withExtraFiltering specifies whether we want to additionally filter discovered peers,
// since operator node and boot node are using the same discovery code (`DiscV5Service`) we
// want to do this extra filtering for operator node only
withExtraFiltering bool
// topicsCtrl must be non-nil when withExtraFiltering is set to true since it will be used
// to help us decide which peers need to be filtered out
topicsCtrl topics.Controller

conns peers.ConnectionIndex
subnetsIdx peers.SubnetsIndex

Expand All @@ -77,24 +67,16 @@ type DiscV5Service struct {
publishLock chan struct{}
}

func newDiscV5Service(
pctx context.Context,
logger *zap.Logger,
withExtraFiltering bool,
topicsController topics.Controller,
opts *Options,
) (Service, error) {
func newDiscV5Service(pctx context.Context, logger *zap.Logger, opts *Options) (Service, error) {
ctx, cancel := context.WithCancel(pctx)
dvs := DiscV5Service{
ctx: ctx,
cancel: cancel,
withExtraFiltering: withExtraFiltering,
topicsCtrl: topicsController,
conns: opts.ConnIndex,
subnetsIdx: opts.SubnetsIdx,
networkConfig: opts.NetworkConfig,
subnets: opts.DiscV5Opts.Subnets,
publishLock: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
conns: opts.ConnIndex,
subnetsIdx: opts.SubnetsIdx,
networkConfig: opts.NetworkConfig,
subnets: opts.DiscV5Opts.Subnets,
publishLock: make(chan struct{}, 1),
}

logger.Debug(
Expand Down Expand Up @@ -165,7 +147,7 @@ func (dvs *DiscV5Service) Bootstrap(logger *zap.Logger, handler HandleNewPeer) e
const logFrequency = 10
var skippedPeers uint64 = 0

dvs.discover(dvs.ctx, logger, func(e PeerEvent) {
dvs.discover(dvs.ctx, func(e PeerEvent) {
logger := logger.With(
fields.ENR(e.Node),
fields.PeerID(e.AddrInfo.ID),
Expand Down Expand Up @@ -219,36 +201,6 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
return errors.New("no shared subnets")
}

if dvs.withExtraFiltering {
helpfulPeer := false // whether this peer helps us with getting rid of dead/solo subnets
subscribedTopics := dvs.topicsCtrl.Topics()
for _, topic := range subscribedTopics {
topicPeers, err := dvs.topicsCtrl.Peers(topic)
if err != nil {
return errors.Wrap(err, "could not get subscribed topic peers")
}

if len(topicPeers) >= 2 {
continue // this topic has enough peers
}

// we've got a dead subnet here, see if this peer can help with that
subnet, err := strconv.Atoi(topic)
if err != nil {
return errors.Wrap(err, "could not convert topic name to subnet id")
}
peerSubnet := peerSubnets[subnet]
if peerSubnet != 1 {
continue // peer doesn't have this subnet either, lets check other dead subnets we have
}
helpfulPeer = true // this peer helps with at least 1 dead subnet for us
break
}
if !helpfulPeer {
return errors.New("this peer doesn't help with dead subnets")
}
}

metricFoundNodes.Inc()
return nil
}
Expand Down Expand Up @@ -334,7 +286,7 @@ func (dvs *DiscV5Service) initDiscV5Listener(logger *zap.Logger, discOpts *Optio
// interval enables to control the rate of new nodes that we find.
// filters will be applied on each new node before the handler is called,
// enabling to apply custom access control for different scenarios.
func (dvs *DiscV5Service) discover(ctx context.Context, logger *zap.Logger, handler HandleNewPeer, interval time.Duration, filters ...NodeFilter) {
func (dvs *DiscV5Service) discover(ctx context.Context, handler HandleNewPeer, interval time.Duration, filters ...NodeFilter) {
iterator := dvs.dv5Listener.RandomNodes()
for _, f := range filters {
iterator = enode.Filter(iterator, f)
Expand Down Expand Up @@ -444,7 +396,7 @@ func (dvs *DiscV5Service) PublishENR(logger *zap.Logger) {
peerIDs := map[peer.ID]struct{}{}

// Publish ENR.
dvs.discover(ctx, logger, func(e PeerEvent) {
dvs.discover(ctx, func(e PeerEvent) {
metricPublishEnrPings.Inc()
err := dvs.dv5Listener.Ping(e.Node)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions network/discovery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/topics"
"github.com/ssvlabs/ssv/networkconfig"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -54,9 +53,9 @@ type Service interface {
}

// NewService creates new discovery.Service
func NewService(ctx context.Context, logger *zap.Logger, topicsController topics.Controller, opts Options) (Service, error) {
func NewService(ctx context.Context, logger *zap.Logger, opts Options) (Service, error) {
if opts.DiscV5Opts == nil {
return NewLocalDiscovery(ctx, logger, opts.Host)
}
return newDiscV5Service(ctx, logger, false, topicsController, &opts)
return newDiscV5Service(ctx, logger, &opts)
}
4 changes: 2 additions & 2 deletions network/discovery/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestDiscV5Service_PublishENR(t *testing.T) {
defer cancel()

opts := testingDiscoveryOptions(t, testNetConfig)
service, err := newDiscV5Service(ctx, testLogger, false, nil, opts)
service, err := newDiscV5Service(ctx, testLogger, opts)
require.NoError(t, err)
dvs := service.(*DiscV5Service)

Expand Down Expand Up @@ -173,7 +173,7 @@ func TestDiscV5Service_Bootstrap(t *testing.T) {

opts := testingDiscoveryOptions(t, testNetConfig)

service, err := newDiscV5Service(testCtx, testLogger, false, nil, opts)
service, err := newDiscV5Service(testCtx, testLogger, opts)
require.NoError(t, err)

dvs := service.(*DiscV5Service)
Expand Down
3 changes: 2 additions & 1 deletion network/discovery/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func testingDiscoveryOptions(t *testing.T, networkConfig networkconfig.NetworkCo
// Testing discovery with a given NetworkConfig
func testingDiscoveryWithNetworkConfig(t *testing.T, netConfig networkconfig.NetworkConfig) *DiscV5Service {
opts := testingDiscoveryOptions(t, netConfig)
service, err := newDiscV5Service(testCtx, testLogger, false, nil, opts)
service, err := newDiscV5Service(testCtx, testLogger, opts)
require.NoError(t, err)
require.NotNil(t, service)

Expand Down Expand Up @@ -191,6 +191,7 @@ func CustomNode(t *testing.T,
record := enr.Record{}

// Set entries
record.Set(enr.WithEntry("ssv", true)) // marks node as SSV-related (we filter out SSV-unrelated ones)
record.Set(enr.IP(net.IPv4(127, 0, 0, 1)))
record.Set(enr.UDP(12000))
record.Set(enr.TCP(13000))
Expand Down
2 changes: 1 addition & 1 deletion network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (n *p2pNetwork) peersTrimming(logger *zap.Logger) func() {
// we don't want to trim incoming connections as often as outgoing connections (since trimming
// outgoing connections often helps us discover valuable peers, while it's not really the case
// for with incoming connections - only slightly so), hence we'll only do it 1/5 of the times
if rand.Intn(5) > 0 {
if rand.Intn(5) > 0 { // nolint: gosec
return // skip trim iteration
}
}
Expand Down
10 changes: 5 additions & 5 deletions network/p2p/p2p_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
connectorQueueSize = 256
// inboundLimitRatio is the ratio of inbound connections to the total connections
// we allow (both inbound and outbound).
inboundLimitRatio = float64(0.6)
inboundLimitRatio = float64(0.5)
)

// Setup is used to setup the network
Expand Down Expand Up @@ -159,14 +159,14 @@ func (n *p2pNetwork) SetupServices(logger *zap.Logger) error {
if err := n.setupStreamCtrl(logger); err != nil {
return errors.Wrap(err, "could not setup stream controller")
}
topicsController, err := n.setupPubsub(logger)
_, err := n.setupPubsub(logger)
if err != nil {
return errors.Wrap(err, "could not setup topic controller")
}
if err := n.setupPeerServices(logger); err != nil {
return errors.Wrap(err, "could not setup peer services")
}
if err := n.setupDiscovery(logger, topicsController); err != nil {
if err := n.setupDiscovery(logger); err != nil {
return errors.Wrap(err, "could not setup discovery service")
}

Expand Down Expand Up @@ -245,7 +245,7 @@ func (n *p2pNetwork) ActiveSubnets() records.Subnets {
return n.activeSubnets
}

func (n *p2pNetwork) setupDiscovery(logger *zap.Logger, topicsController topics.Controller) error {
func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error {
ipAddr, err := p2pcommons.IPAddr()
if err != nil {
return errors.Wrap(err, "could not get ip addr")
Expand Down Expand Up @@ -280,7 +280,7 @@ func (n *p2pNetwork) setupDiscovery(logger *zap.Logger, topicsController topics.
HostDNS: n.cfg.HostDNS,
NetworkConfig: n.cfg.Network,
}
disc, err := discovery.NewService(n.ctx, logger, topicsController, discOpts)
disc, err := discovery.NewService(n.ctx, logger, discOpts)
if err != nil {
return err
}
Expand Down

0 comments on commit 88a752c

Please sign in to comment.