Skip to content

Commit

Permalink
feat: add option to specify preferred peers for filter
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 23, 2024
1 parent 8842d00 commit 103d26c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
7 changes: 7 additions & 0 deletions waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Sub struct {
type subscribeParameters struct {
batchInterval time.Duration
multiplexChannelBuffer int
preferredPeers peer.IDSlice
}

type SubscribeOptions func(*subscribeParameters)
Expand All @@ -75,6 +76,12 @@ func defaultOptions() []SubscribeOptions {
}
}

func WithPreferredServiceNodes(peers peer.IDSlice) SubscribeOptions {
return func(params *subscribeParameters) {
params.preferredPeers = peers
}
}

// Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
sub := new(Sub)
Expand Down
11 changes: 10 additions & 1 deletion waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package filter

import (
"context"
"math/rand"
"sync"
"time"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"

"go.uber.org/zap"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -61,7 +63,8 @@ type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error
}

func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int,
envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
Expand Down Expand Up @@ -162,6 +165,12 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
defer utils.LogOnPanic()
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
if len(mgr.params.preferredPeers) > 0 {
//use one peer which is from preferred peers.
randomIndex := rand.Intn(len(mgr.params.preferredPeers) - 1)
randomPreferredPeer := mgr.params.preferredPeers[randomIndex]
config.Peers = []peer.ID{randomPreferredPeer}
}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
ctx, cancel := context.WithCancel(context.Background())
s.Log.Info("About to perform API Subscribe()")
params := subscribeParameters{300 * time.Second, 1024}
params := subscribeParameters{300 * time.Second, 1024, nil}
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params)
s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter)
Expand Down

0 comments on commit 103d26c

Please sign in to comment.