Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : autoshard relay api #807

Merged
merged 30 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5476cfd
chore: remove duplicate import
chaitanyaprem Oct 13, 2023
71cbbe1
fix: using relay without bcaster should consume and drop messages
chaitanyaprem Oct 13, 2023
e0e4ccd
update relay api usage
chaitanyaprem Oct 16, 2023
c15da7e
Merge branch 'master' into feat/relay-api-autoshard
chaitanyaprem Oct 16, 2023
5fa8512
move subscription to broadcaster
chaitanyaprem Oct 16, 2023
f4ffbbc
move filter logic under subscription
chaitanyaprem Oct 16, 2023
fc35d3a
fix error with test
chaitanyaprem Oct 16, 2023
de83bbf
fix lint error
chaitanyaprem Oct 16, 2023
5285e55
fix examples
chaitanyaprem Oct 16, 2023
06dd50a
fix rln test
chaitanyaprem Oct 16, 2023
b4b9e3d
remove notification from outside broadcaster in relay
chaitanyaprem Oct 16, 2023
7a4db68
fix: handle context deadline exceed error
chaitanyaprem Oct 16, 2023
4bcd91f
handle unregister for pubSubTopic in broadcaster
chaitanyaprem Oct 16, 2023
d732304
clarify TODO's and rename SubSimulation
chaitanyaprem Oct 17, 2023
83adec1
Merge branch 'master' into feat/relay-api-autoshard
chaitanyaprem Oct 17, 2023
84cbd85
Merge branch 'master' into feat/relay-api-autoshard
chaitanyaprem Oct 17, 2023
6009fae
address review comments
chaitanyaprem Oct 17, 2023
6dc99b0
Support more than 1 relay subscription for a pubSubTopic
chaitanyaprem Oct 18, 2023
460716d
modify relay Publish API to derive pubSubTopic based on autosharding
chaitanyaprem Oct 18, 2023
a435055
implement relay RPC methods for autosharding
chaitanyaprem Oct 18, 2023
edf5cd2
remove relay msgChannel and relay on pubsub buffersize for subscription
chaitanyaprem Oct 18, 2023
cf0c1a3
handle relay topic subscriptions during node init via cmd
chaitanyaprem Oct 18, 2023
4fc5134
fix: issues with autosharding relay RPC API
chaitanyaprem Oct 18, 2023
b4426b2
added unit tests for relay autoshard changes
chaitanyaprem Oct 19, 2023
d7610b9
Merge branch 'master' into feat/relay-api-autoshard
chaitanyaprem Oct 19, 2023
4c22e60
Apply suggestions from code review
chaitanyaprem Oct 20, 2023
9b3a0b8
handle relay subscribe with noConsumer and address issue reported in …
chaitanyaprem Oct 20, 2023
b1f1b0b
address issue with relay test code
chaitanyaprem Oct 20, 2023
277721a
chore: reorg relay code
chaitanyaprem Oct 20, 2023
7f9eb84
chore: address codeclimate issue
chaitanyaprem Oct 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ func Execute(options NodeOptions) {
if options.Relay.Enable {
for nodeTopic := range pubSubTopicMap {
nodeTopic := nodeTopic
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic)
sub, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic))
failOnErr(err, "Error subscring to topic")
sub.Unsubscribe()

//Calling unsubscribe only closes broadcaster sub and not the underlying pubSub subscription.
sub[0].Unsubscribe()
if len(options.Rendezvous.Nodes) != 0 {
// Register the node in rendezvous point
iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes)
Expand Down
8 changes: 5 additions & 3 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re

var err error
for _, topic := range topics {
err = r.node.Relay().Unsubscribe(req.Context(), topic)
err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
} else {
Expand All @@ -129,18 +129,20 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ

var err error
var sub *relay.Subscription
var subs []*relay.Subscription
var topicToSubscribe string
for _, topic := range topics {
if topic == "" {
sub, err = r.node.Relay().Subscribe(req.Context())
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic))
topicToSubscribe = relay.DefaultWakuTopic
} else {
sub, err = r.node.Relay().SubscribeToTopic(req.Context(), topic)
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic))
topicToSubscribe = topic
}
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
} else {
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = []*pb.WakuMessage{}
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Adder func(msg *protocol.Envelope)

type runnerService struct {
broadcaster relay.Broadcaster
sub relay.Subscription
sub *relay.Subscription
cancel context.CancelFunc
adder Adder
}
Expand Down
10 changes: 6 additions & 4 deletions cmd/waku/server/rpc/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
Expand All @@ -37,9 +38,9 @@ func makeFilterService(t *testing.T, isFullNode bool) *FilterService {
require.NoError(t, err)

if isFullNode {
sub, err := n.Relay().SubscribeToTopic(context.Background(), testTopic)
sub, err := n.Relay().Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
go func() {
for range sub.Ch {
for range sub[0].Ch {
}
}()
require.NoError(t, err)
Expand All @@ -62,14 +63,15 @@ func TestFilterSubscription(t *testing.T) {
err = node.Start(context.Background())
require.NoError(t, err)

_, err = node.SubscribeToTopic(context.Background(), testTopic)
_, err = node.Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
require.NoError(t, err)

b2 := relay.NewBroadcaster(10)
require.NoError(t, b2.Start(context.Background()))
f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
f.SetHost(host)
err = f.Start(context.Background(), relay.NoopSubscription())
sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
err = f.Start(context.Background(), sub)
require.NoError(t, err)

d := makeFilterService(t, true)
Expand Down
12 changes: 9 additions & 3 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,21 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
var err error
if topic == "" {
var sub *relay.Subscription
sub, err = r.node.Relay().Subscribe(ctx)
subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub = subs[0]
sub.Unsubscribe()
} else {
var sub *relay.Subscription
sub, err = r.node.Relay().SubscribeToTopic(ctx, topic)
subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub = subs[0]
sub.Unsubscribe()
}
if err != nil {
Expand All @@ -155,7 +161,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()
for _, topic := range args.Topics {
err := r.node.Relay().Unsubscribe(ctx, topic)
err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err))
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rpc/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Adder func(msg *protocol.Envelope)

type runnerService struct {
broadcaster relay.Broadcaster
sub relay.Subscription
sub *relay.Subscription
adder Adder
}

Expand Down
5 changes: 3 additions & 2 deletions examples/basic2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -116,13 +117,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string
}

func readLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string) {
sub, err := wakuNode.Relay().Subscribe(ctx)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
if err != nil {
log.Error("Could not subscribe", zap.Error(err))
return
}

for envelope := range sub.Ch {
for envelope := range sub[0].Ch {
if envelope.Message().ContentTopic != contentTopic {
continue
}
Expand Down
12 changes: 2 additions & 10 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"chat2/pb"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-zerokit-rln/rln"
"golang.org/x/crypto/pbkdf2"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -84,13 +82,13 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
} else {

for _, topic := range topics {
sub, err := node.Relay().SubscribeToTopic(ctx, topic)
sub, err := node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
chat.ui.ErrorMessage(err)
} else {
chat.C = make(chan *protocol.Envelope)
go func() {
for e := range sub.Ch {
for e := range sub[0].Ch {
chat.C <- e
}
}()
Expand Down Expand Up @@ -356,12 +354,6 @@ func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Chat2Mess
return msg, nil
}

func generateSymKey(password string) []byte {
// AesKeyLength represents the length (in bytes) of an private key
AESKeyLength := 256 / 8
return pbkdf2.Key([]byte(password), nil, 65356, AESKeyLength, sha256.New)
}

func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
defer c.wg.Done()

Expand Down
4 changes: 2 additions & 2 deletions examples/filter2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {

func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
pubsubTopic := pubSubTopic.String()
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic))
if err != nil {
log.Error("Could not subscribe: ", err)
return
}

for value := range sub.Ch {
for value := range sub[0].Ch {
payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None})
if err != nil {
fmt.Println(err)
Expand Down
4 changes: 2 additions & 2 deletions examples/rln/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
}

func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic.String())
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic.String()))
if err != nil {
log.Error("Could not subscribe", zap.Error(err))
return
}

for envelope := range sub.Ch {
for envelope := range sub[0].Ch {
if envelope.Message().ContentTopic != contentTopic.String() {
continue
}
Expand Down
8 changes: 4 additions & 4 deletions library/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,18 @@ func relaySubscribe(topic string) error {
return nil
}

subscription, err := wakuState.node.Relay().SubscribeToTopic(context.Background(), topicToSubscribe)
subscription, err := wakuState.node.Relay().Subscribe(context.Background(), protocol.NewContentFilter(topicToSubscribe))
if err != nil {
return err
}

relaySubscriptions[topicToSubscribe] = subscription
relaySubscriptions[topicToSubscribe] = subscription[0]

go func(subscription *relay.Subscription) {
for envelope := range subscription.Ch {
send("message", toSubscriptionMessage(envelope))
}
}(subscription)
}(subscription[0])

return nil
}
Expand Down Expand Up @@ -123,5 +123,5 @@ func RelayUnsubscribe(topic string) error {

delete(relaySubscriptions, topicToUnsubscribe)

return wakuState.node.Relay().Unsubscribe(context.Background(), topicToUnsubscribe)
return wakuState.node.Relay().Unsubscribe(context.Background(), protocol.NewContentFilter(topicToUnsubscribe))
}
6 changes: 4 additions & 2 deletions tests/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (

"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)

Expand Down Expand Up @@ -39,10 +41,10 @@ func TestBasicSendingReceiving(t *testing.T) {

require.NoError(t, write(ctx, wakuNode, "test"))

sub, err := wakuNode.Relay().Subscribe(ctx)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)

value := <-sub.Ch
value := <-sub[0].Ch
payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None})
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ type Service interface {
type ReceptorService interface {
SetHost(h host.Host)
Stop()
Start(context.Context, relay.Subscription) error
Start(context.Context, *relay.Subscription) error
}
2 changes: 1 addition & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ func (w *WakuNode) mountDiscV5() error {
return err
}

func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error {
func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) error {
err := w.store.Start(ctx, sub)
if err != nil {
w.log.Error("starting store", zap.Error(err))
Expand Down
17 changes: 9 additions & 8 deletions waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence/sqlite"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
Expand Down Expand Up @@ -54,11 +55,11 @@ func TestWakuNode2(t *testing.T) {
err = wakuNode.Start(ctx)
require.NoError(t, err)

_, err = wakuNode.Relay().SubscribeToTopic(ctx, "waku/rs/1/1")
_, err = wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter("waku/rs/1/1"))
require.NoError(t, err)
time.Sleep(time.Second * 1)

err = wakuNode.Relay().Unsubscribe(ctx, "waku/rs/1/1")
err = wakuNode.Relay().Unsubscribe(ctx, protocol.NewContentFilter("waku/rs/1/1"))
require.NoError(t, err)

defer wakuNode.Stop()
Expand Down Expand Up @@ -151,9 +152,9 @@ func Test500(t *testing.T) {

time.Sleep(2 * time.Second)

sub1, err := wakuNode1.Relay().Subscribe(ctx)
sub1, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
sub2, err := wakuNode1.Relay().Subscribe(ctx)
sub2, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)

wg := sync.WaitGroup{}
Expand All @@ -168,7 +169,7 @@ func Test500(t *testing.T) {
select {
case <-ticker.C:
require.Fail(t, "Timeout Sub1")
case msg := <-sub1.Ch:
case msg := <-sub1[0].Ch:
if msg == nil {
return
}
Expand All @@ -189,7 +190,7 @@ func Test500(t *testing.T) {
select {
case <-ticker.C:
require.Fail(t, "Timeout Sub2")
case msg := <-sub2.Ch:
case msg := <-sub2[0].Ch:
if msg == nil {
return
}
Expand Down Expand Up @@ -234,9 +235,9 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
defer wakuNode1.Stop()

subs, err := wakuNode1.Relay().Subscribe(ctx)
subs, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
subs.Unsubscribe()
defer subs[0].Unsubscribe()

// NODE2: Filter Client/Store
db, err := sqlite.NewDB(":memory:", false, utils.Logger())
Expand Down
3 changes: 1 addition & 2 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
Expand Down Expand Up @@ -232,7 +231,7 @@ func WithHostAddress(hostAddr *net.TCPAddr) WakuNodeOption {
}

// WithAdvertiseAddresses is a WakuNodeOption that allows overriding the address used in the waku node with custom value
func WithAdvertiseAddresses(advertiseAddrs ...ma.Multiaddr) WakuNodeOption {
func WithAdvertiseAddresses(advertiseAddrs ...multiaddr.Multiaddr) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.advertiseAddrs = advertiseAddrs
return WithMultiaddress(advertiseAddrs...)(params)
Expand Down
Loading