Skip to content

Commit

Permalink
fix: return appropriate errors in filter unsubscribe (#941)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Dec 1, 2023
1 parent b7105f9 commit ac1a699
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 13 deletions.
41 changes: 29 additions & 12 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import (
const FilterPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-push/2.0.0-beta1")

var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrSubscriptionNotFound = errors.New("subscription not found")
)

type WakuFilterLightNode struct {
Expand Down Expand Up @@ -110,19 +111,21 @@ func (wf *WakuFilterLightNode) start() error {
func (wf *WakuFilterLightNode) Stop() {
wf.CommonService.Stop(func() {
wf.h.RemoveStreamHandler(FilterPushID_v20beta1)
res, err := wf.unsubscribeAll(wf.Context())
if err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(err))
}

for _, r := range res.Errors() {
if r.Err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID))
if wf.subscriptions.Count() > 0 {
res, err := wf.unsubscribeAll(wf.Context())
if err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(err))
}

for _, r := range res.Errors() {
if r.Err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID))
}

}
//
wf.subscriptions.Clear()
}
//
wf.subscriptions.Clear()
})
}

Expand Down Expand Up @@ -485,6 +488,13 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr

peers := make(map[peer.ID]struct{})
subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter)
if len(subs) == 0 {
result.Add(WakuFilterPushError{
Err: ErrSubscriptionNotFound,
PeerID: params.selectedPeer,
})
continue
}
for _, sub := range subs {
sub.Remove(cTopics...)
peers[sub.PeerID] = struct{}{}
Expand Down Expand Up @@ -583,14 +593,21 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
if err != nil {
return nil, err
}
result := &WakuFilterPushResult{}

peers := make(map[peer.ID]struct{})
subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{})
if len(subs) == 0 && params.selectedPeer != "" {
result.Add(WakuFilterPushError{
Err: err,
PeerID: params.selectedPeer,
})
return result, ErrSubscriptionNotFound
}
for _, sub := range subs {
sub.Close()
peers[sub.PeerID] = struct{}{}
}
result := &WakuFilterPushResult{}
if params.wg != nil {
params.wg.Add(len(peers))
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *FilterTestSuite) waitForMessages(fn func(), subs []*subscription.Subscr
contentTopic: env.Message().GetContentTopic(),
payload: string(env.Message().GetPayload()),
}
s.log.Info("received message ", zap.String("pubSubTopic", received.pubSubTopic), zap.String("contentTopic", received.contentTopic), zap.String("payload", received.payload))
s.log.Debug("received message ", zap.String("pubSubTopic", received.pubSubTopic), zap.String("contentTopic", received.contentTopic), zap.String("payload", received.payload))
if matchOneOfManyMsg(received, expected) {
found++
}
Expand Down
6 changes: 6 additions & 0 deletions waku/v2/protocol/subscription/subscriptions_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap {
}
}

func (m *SubscriptionsMap) Count() int {
m.RLock()
defer m.RUnlock()
return len(m.items)
}

func (m *SubscriptionsMap) IsListening(pubsubTopic, contentTopic string) bool {
m.RLock()
defer m.RUnlock()
Expand Down

0 comments on commit ac1a699

Please sign in to comment.