Skip to content

Commit

Permalink
remove relay msgChannel and relay on pubsub buffersize for subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 18, 2023
1 parent a435055 commit edf5cd2
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 44 deletions.
2 changes: 1 addition & 1 deletion waku/v2/protocol/content_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// DefaultContentTopic is the default content topic used in Waku network if no content topic is specified.
const DefaultContentTopic = "/waku/2/default-content/proto"

var ErrInvalidFormat = errors.New("invalid format")
var ErrInvalidFormat = errors.New("invalid content topic format")
var ErrMissingGeneration = errors.New("missing part: generation")
var ErrInvalidGeneration = errors.New("generation should be a number")

Expand Down
65 changes: 22 additions & 43 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,14 @@ func (w *WakuRelay) subscribeToPubsubTopic(topic string) (subs *pubsub.Subscript
return nil, err
}

sub, err = pubSubTopic.Subscribe()
sub, err = pubSubTopic.Subscribe(pubsub.WithBufferSize(1024))
if err != nil {
return nil, err
}

w.WaitGroup().Add(1)
go w.pubsubTopicMsgHandler(topic, sub)

evtHandler, err := w.addPeerTopicEventListener(pubSubTopic)
if err != nil {
return nil, err
Expand All @@ -341,9 +344,6 @@ func (w *WakuRelay) subscribeToPubsubTopic(topic string) (subs *pubsub.Subscript
return nil, err
}

w.WaitGroup().Add(1)
go w.topicMsgHandler(topic, sub)

w.log.Info("subscribing to topic", zap.String("topic", sub.Topic()))
}

Expand Down Expand Up @@ -442,13 +442,14 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
}

for pubSubTopic, cTopics := range pubSubTopicMap {
w.log.Info("subscribing to ", zap.String("pubsubTopic", pubSubTopic), zap.Strings("content-topics", cTopics))
var cFilter waku_proto.ContentFilter
cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = waku_proto.NewContentTopicSet(cTopics...)

//Check if gossipsub subscription already exists for pubSubTopic
if !w.IsSubscribed(pubSubTopic) {
_, err := w.subscribeToPubsubTopic(contentFilter.PubsubTopic)
_, err := w.subscribeToPubsubTopic(cFilter.PubsubTopic)
if err != nil {
//TODO: Handle partial errors.
return nil, err
Expand Down Expand Up @@ -549,50 +550,28 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
return nil
}

func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
msgChannel := make(chan *pubsub.Message, 1024)
go func() {
defer close(msgChannel)
for {
msg, err := sub.Next(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
w.log.Error("getting message from subscription", zap.Error(err))
}
sub.Cancel()
return
}
msgChannel <- msg
}
}()
return msgChannel
}

func (w *WakuRelay) topicMsgHandler(pubsubTopic string, sub *pubsub.Subscription) {
func (w *WakuRelay) pubsubTopicMsgHandler(pubsubTopic string, sub *pubsub.Subscription) {
defer w.WaitGroup().Done()

subChannel := w.nextMessage(w.Context(), sub)
for {
select {
case <-w.Context().Done():
return
// TODO: if there are no more relay subscriptions, close the pubsub subscription
case msg, ok := <-subChannel:
if !ok {
return
}
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
w.log.Error("decoding message", zap.Error(err))
return
msg, err := sub.Next(w.Context())
if err != nil {
if !errors.Is(err, context.Canceled) {
w.log.Error("getting message from subscription", zap.Error(err))
}
sub.Cancel()
return
}
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
w.log.Error("decoding message", zap.Error(err))
return
}
envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), pubsubTopic)

envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), pubsubTopic)

w.metrics.RecordMessage(envelope)
w.metrics.RecordMessage(envelope)

w.bcaster.Submit(envelope)
}
w.bcaster.Submit(envelope)
}

}
Expand Down

0 comments on commit edf5cd2

Please sign in to comment.