Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix : issues with get messages API #878

Merged
merged 5 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ
} else {
topicToSubscribe = topic
}
_, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity))
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity))

if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
Expand Down Expand Up @@ -126,7 +126,14 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
}
var response []*pb.WakuMessage
select {
case msg := <-sub.Ch:
case msg, open := <-sub.Ch:
if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic))

Check failure

Code scanning / CodeQL

Log entries created from user input

This log entry depends on a [user-provided value](1).
w.WriteHeader(http.StatusNotFound)
_, err = w.Write([]byte("consume channel is closed for subscription"))
r.log.Error("writing response", zap.Error(err))
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
return
}
response = append(response, msg.Message())
default:
break
Expand Down
19 changes: 14 additions & 5 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rpc

import (
"errors"
"fmt"
"net/http"

Expand Down Expand Up @@ -93,7 +94,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {

_, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...))
_, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...), relay.WithCacheSize(uint(r.cacheCapacity)))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err))
return err
Expand Down Expand Up @@ -150,7 +151,11 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep
return err
}
select {
case msg := <-sub.Ch:
case msg, open := <-sub.Ch:
if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic))
return errors.New("consume channel is closed for subscription")
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
}
rpcMsg, err := ProtoToRPC(msg.Message())
if err != nil {
r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err))
Expand All @@ -165,14 +170,13 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep

// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()

for _, topic := range args.Topics {
var err error
if topic == "" {
topic = relay.DefaultWakuTopic
}
_, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topic), relay.WithCacheSize(uint(r.cacheCapacity)))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
Expand Down Expand Up @@ -206,8 +210,13 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *
if err != nil {
return err
}
fmt.Println("subscription is ", sub)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
select {
case msg := <-sub.Ch:
case msg, open := <-sub.Ch:
if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic))
return errors.New("consume channel is closed for subscription")
}
m, err := ProtoToRPC(msg.Message())
if err == nil {
*reply = append(*reply, m)
Expand Down
6 changes: 6 additions & 0 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTo
cSubs := w.contentSubs[pubsubTopic]
for _, sub := range cSubs {
if sub.contentFilter.Equals(contentFilter) {
if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned
continue
}
return sub, nil
}
}
Expand All @@ -308,6 +311,9 @@ func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error)
cSubs := w.contentSubs[pubsubTopic]
for _, sub := range cSubs {
if sub.contentFilter.Equals(contentFilter) {
if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned
continue
}
return sub, nil
}
}
Expand Down