Skip to content

Commit

Permalink
implement relay RPC methods for autosharding
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 18, 2023
1 parent 460716d commit a435055
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 18 deletions.
94 changes: 77 additions & 17 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,34 +120,94 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
return nil
}

// PostV1AutoSubscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_subscription
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()

var err error
_, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter("", args.Topics...))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err))
return err
}
//TODO: Handle partial errors.

*reply = true
return nil
}

// DeleteV1AutoSubscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_auto_subscription
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
func (r *RelayService) DeleteV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()

err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter("", args.Topics...))
if err != nil {
r.log.Error("unsubscribing from topics", zap.Strings("topic", args.Topics), zap.Error(err))
return err
}
//TODO: Handle partial errors.
return nil
}

// PostV1AutoMessage is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_message
func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
var err error

msg := args.Message.toProto()
if msg.ContentTopic == "" {
err := fmt.Errorf("content-topic cannot be empty")
r.log.Error("publishing message", zap.Error(err))
return err
}
if err = server.AppendRLNProof(r.node, msg); err != nil {
return err
}

_, err = r.node.Relay().Publish(req.Context(), msg)
if err != nil {
r.log.Error("publishing message", zap.Error(err))
return err
}

*reply = true
return nil
}

// GetV1AutoMessages is invoked when the json rpc request uses the get_waku_v2_relay_v1_auto_messages method
// Note that this method takes contentTopic as an argument instead of pubSubtopic and uses autosharding.
func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
sub, err := r.node.Relay().GetSubscription(args.Topic)
if err != nil {
return err
}
select {
case msg := <-sub.Ch:
*reply = append(*reply, ProtoToRPC(msg.Message()))
default:
break
}
return nil
}

// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()

for _, topic := range args.Topics {
var err error
if topic == "" {
var sub *relay.Subscription
subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub = subs[0]
sub.Unsubscribe()
} else {
var sub *relay.Subscription
subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub = subs[0]
sub.Unsubscribe()
topic = relay.DefaultWakuTopic
}
var sub *relay.Subscription
subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = make([]*pb.WakuMessage, 0)
r.messagesMutex.Unlock()
Expand Down
17 changes: 16 additions & 1 deletion waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,21 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byt
return w.PublishToTopic(ctx, message, pubSubTopic)
}

func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) {
pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return nil, err
}
contentFilter := waku_proto.NewContentFilter(pubSubTopic, contentTopic)
cSubs := w.contentSubs[pubSubTopic]
for _, sub := range cSubs {
if sub.contentFilter.Equals(contentFilter) {
return sub, nil
}
}
return nil, fmt.Errorf("no subscription found for content topic")
}

// Stop unmounts the relay protocol and stops all subscriptions
func (w *WakuRelay) Stop() {
w.CommonService.Stop(func() {
Expand All @@ -417,7 +432,7 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
return len(w.PubSub().ListPeers(topic)) >= w.minPeersToPublish
}

// SubscribeToTopic returns a Subscription to receive messages from a pubsub topic
// subscribe returns list of Subscription to receive messages based on content filter
func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) ([]*Subscription, error) {

var subscriptions []*Subscription
Expand Down

0 comments on commit a435055

Please sign in to comment.