Skip to content

Commit

Permalink
Merge branch 'master' into fix/race-condition
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored Oct 26, 2023
2 parents d760931 + 0ba8b2c commit a5683c2
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
17 changes: 14 additions & 3 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,14 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...)

err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
paramsCopy := params.Copy()
paramsCopy.selectedPeer = selectedPeer
err := wf.request(
ctx,
paramsCopy,
pb.FilterSubscribeRequest_SUBSCRIBE,
cFilter,
)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
Expand Down Expand Up @@ -532,7 +539,9 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,

if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) {
// Last sub for this [peer, contentFilter] pair
err = wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: sub.PeerID, requestID: params.requestID}, sub.ContentFilter)
paramsCopy := params.Copy()
paramsCopy.selectedPeer = sub.PeerID
err = wf.unsubscribeFromServer(ctx, paramsCopy, sub.ContentFilter)
resultChan <- WakuFilterPushResult{
Err: err,
PeerID: sub.PeerID,
Expand Down Expand Up @@ -586,9 +595,11 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
}
}()

paramsCopy := params.Copy()
paramsCopy.selectedPeer = peerID
err := wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
params,
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
protocol.ContentFilter{})
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions waku/v2/protocol/filter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"go.uber.org/zap"
)

func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters {
return &FilterSubscribeParameters{
selectedPeer: old.selectedPeer,
requestID: old.requestID,
}
}

type (
FilterSubscribeParameters struct {
selectedPeer peer.ID
Expand Down

0 comments on commit a5683c2

Please sign in to comment.