Skip to content

Commit

Permalink
Support more than 1 relay subscription for a pubSubTopic
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 18, 2023
1 parent 6009fae commit 6dc99b0
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 39 deletions.
14 changes: 14 additions & 0 deletions waku/v2/protocol/content_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter
return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)}
}

func (cf ContentFilter) Equals(cf1 ContentFilter) bool {
if cf.PubsubTopic != cf1.PubsubTopic ||
len(cf.ContentTopics) != len(cf1.ContentTopics) {
return false
}
for topic := range cf.ContentTopics {
_, ok := cf1.ContentTopics[topic]
if !ok {
return false
}
}
return true
}

// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) {
pubSubTopicMap := make(map[string][]string)
Expand Down
68 changes: 29 additions & 39 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type WakuRelay struct {
EvtRelayUnsubscribed event.Emitter
EvtPeerTopic event.Emitter
}
contentSubs map[string]*Subscription
contentSubs map[string]map[int]*Subscription
*waku_proto.CommonService
}

Expand Down Expand Up @@ -201,7 +201,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds),
pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second),
}, opts...)
w.contentSubs = make(map[string]*Subscription)
w.contentSubs = make(map[string]map[int]*Subscription)
return w
}

Expand Down Expand Up @@ -430,37 +430,26 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
if !w.IsSubscribed(cFilter.PubsubTopic) {
_, err := w.subscribeToPubsubTopic(contentFilter.PubsubTopic)
if err != nil {
//TODO: Handle partial errors.
return nil, err
}

subscription := w.bcaster.Register(contentFilter, 1024)

// Create Content subscription
w.topicsMutex.RLock()
w.contentSubs[pubSubTopic] = subscription
w.topicsMutex.RUnlock()
subscriptions = append(subscriptions, subscription)
go func() {
<-ctx.Done()
subscription.Unsubscribe()
}()
} else {
w.topicsMutex.RLock()
defer w.topicsMutex.RUnlock()

subscription := w.contentSubs[pubSubTopic]
if subscription.subType == SpecificContentTopics {
//Update existing content Subscription if subType is SpecificContentTopics
//Add new contentTopics if not already part of filter
for cTopic := range cFilter.ContentTopics {
if _, ok := subscription.contentFilter.ContentTopics[cTopic]; !ok {
subscription.contentFilter.ContentTopics[cTopic] = struct{}{}
}
}
}
subscriptions = append(subscriptions, subscription)
}

subscription := w.bcaster.Register(contentFilter, 1024)

// Create Content subscription
w.topicsMutex.RLock()
if _, ok := w.contentSubs[pubSubTopic]; !ok {
w.contentSubs[pubSubTopic] = map[int]*Subscription{}
}
w.contentSubs[pubSubTopic][subscription.ID] = subscription

w.topicsMutex.RUnlock()
subscriptions = append(subscriptions, subscription)
go func() {
<-ctx.Done()
subscription.Unsubscribe()
}()
}

return subscriptions, nil
Expand All @@ -483,23 +472,24 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
}

for pubSubTopic, cTopics := range pubSubTopicMap {
cfTemp := waku_proto.NewContentFilter(pubSubTopic, cTopics...)
pubsubUnsubscribe := false
sub, ok := w.relaySubs[pubSubTopic]
if !ok {
return fmt.Errorf("not subscribed to topic")
}
cSub := w.contentSubs[pubSubTopic]
if cSub != nil {
if cSub.subType == AllContentTopics {
pubsubUnsubscribe = true
} else {
for _, cTopic := range cTopics {
delete(cSub.contentFilter.ContentTopics, cTopic)
}
if len(cSub.contentFilter.ContentTopics) == 0 {
pubsubUnsubscribe = true
cSubs := w.contentSubs[pubSubTopic]
if cSubs != nil {
//Remove relevant subscription
for subID, sub := range cSubs {
if sub.contentFilter.Equals(cfTemp) {
sub.Unsubscribe()
delete(cSubs, subID)
}
}
if len(cSubs) == 0 {
pubsubUnsubscribe = true
}
} else {
//Should not land here ideally
w.log.Error("pubsub subscriptions exists, but contentSubscription doesn't for contentFilter",
Expand Down

0 comments on commit 6dc99b0

Please sign in to comment.