From 6dc99b0520626600851dfddc1d5a1dfd00c24e9a Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 17 Oct 2023 19:12:06 -0700 Subject: [PATCH] Support more than 1 relay subscription for a pubSubTopic --- waku/v2/protocol/content_filter.go | 14 ++++++ waku/v2/protocol/relay/waku_relay.go | 68 ++++++++++++---------------- 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/waku/v2/protocol/content_filter.go b/waku/v2/protocol/content_filter.go index 67af8b79e..ef9e1bdfd 100644 --- a/waku/v2/protocol/content_filter.go +++ b/waku/v2/protocol/content_filter.go @@ -32,6 +32,20 @@ func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)} } +func (cf ContentFilter) Equals(cf1 ContentFilter) bool { + if cf.PubsubTopic != cf1.PubsubTopic || + len(cf.ContentTopics) != len(cf1.ContentTopics) { + return false + } + for topic := range cf.ContentTopics { + _, ok := cf1.ContentTopics[topic] + if !ok { + return false + } + } + return true +} + // This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) { pubSubTopicMap := make(map[string][]string) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 7696d1eaf..e282e65b5 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -65,7 +65,7 @@ type WakuRelay struct { EvtRelayUnsubscribed event.Emitter EvtPeerTopic event.Emitter } - contentSubs map[string]*Subscription + contentSubs map[string]map[int]*Subscription *waku_proto.CommonService } @@ -201,7 +201,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds), pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second), }, opts...) - w.contentSubs = make(map[string]*Subscription) + w.contentSubs = make(map[string]map[int]*Subscription) return w } @@ -430,37 +430,26 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont if !w.IsSubscribed(cFilter.PubsubTopic) { _, err := w.subscribeToPubsubTopic(contentFilter.PubsubTopic) if err != nil { + //TODO: Handle partial errors. return nil, err } - - subscription := w.bcaster.Register(contentFilter, 1024) - - // Create Content subscription - w.topicsMutex.RLock() - w.contentSubs[pubSubTopic] = subscription - w.topicsMutex.RUnlock() - subscriptions = append(subscriptions, subscription) - go func() { - <-ctx.Done() - subscription.Unsubscribe() - }() - } else { - w.topicsMutex.RLock() - defer w.topicsMutex.RUnlock() - - subscription := w.contentSubs[pubSubTopic] - if subscription.subType == SpecificContentTopics { - //Update existing content Subscription if subType is SpecificContentTopics - //Add new contentTopics if not already part of filter - for cTopic := range cFilter.ContentTopics { - if _, ok := subscription.contentFilter.ContentTopics[cTopic]; !ok { - subscription.contentFilter.ContentTopics[cTopic] = struct{}{} - } - } - } - subscriptions = append(subscriptions, subscription) } + subscription := w.bcaster.Register(contentFilter, 1024) + + // Create Content subscription + w.topicsMutex.RLock() + if _, ok := w.contentSubs[pubSubTopic]; !ok { + w.contentSubs[pubSubTopic] = map[int]*Subscription{} + } + w.contentSubs[pubSubTopic][subscription.ID] = subscription + + w.topicsMutex.RUnlock() + subscriptions = append(subscriptions, subscription) + go func() { + <-ctx.Done() + subscription.Unsubscribe() + }() } return subscriptions, nil @@ -483,23 +472,24 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co } for pubSubTopic, cTopics := range pubSubTopicMap { + cfTemp := waku_proto.NewContentFilter(pubSubTopic, cTopics...) pubsubUnsubscribe := false sub, ok := w.relaySubs[pubSubTopic] if !ok { return fmt.Errorf("not subscribed to topic") } - cSub := w.contentSubs[pubSubTopic] - if cSub != nil { - if cSub.subType == AllContentTopics { - pubsubUnsubscribe = true - } else { - for _, cTopic := range cTopics { - delete(cSub.contentFilter.ContentTopics, cTopic) - } - if len(cSub.contentFilter.ContentTopics) == 0 { - pubsubUnsubscribe = true + cSubs := w.contentSubs[pubSubTopic] + if cSubs != nil { + //Remove relevant subscription + for subID, sub := range cSubs { + if sub.contentFilter.Equals(cfTemp) { + sub.Unsubscribe() + delete(cSubs, subID) } } + if len(cSubs) == 0 { + pubsubUnsubscribe = true + } } else { //Should not land here ideally w.log.Error("pubsub subscriptions exists, but contentSubscription doesn't for contentFilter",