Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: duplicate message forwarding in filter service #2842

Merged
merged 7 commits into from
Jun 25, 2024
Merged

Conversation

darshankabariya
Copy link
Contributor

@darshankabariya darshankabariya commented Jun 24, 2024

close #2320

PR Description:

We identified a duplicate issue in the filter node service, which has been resolved in this fix.

Copy link

github-actions bot commented Jun 24, 2024

You can find the image built from this PR at

quay.io/wakuorg/nwaku-pr:2842

Built from 2ad6e43

Copy link
Contributor

@gabrielmer gabrielmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks so much!

Copy link
Contributor

@NagyZoltanPeter NagyZoltanPeter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think an ever growing lookup table is a good solution for this problem.
Please evaluate if TimedCache or similar approach can do it.
Please notice that gossip-sub also does very similar for filtering duplicates from re-publising.

  • I don't think we need to track this per peer as a message is instantly pushed to all subscribed peers if that a duplicate, it is certainly dup for all.

@NagyZoltanPeter
Copy link
Contributor

  • consideration upon re-reading the original problem:

So in Florin's test there were no relay involved. He was sending messages via REST API to relay it, but also the same node was serving filter subs. In this case filter gets the messages before relay, hence the message push.
I don't know if it's a generic case to prepare for.
All because from waku_relay there will be no such case happening as duplicates are filtered. In case of publish unfortunately filterHandler is called before any checks - due gossipsub publish calls pubsub ancestor's publish method ahead off anything - which actually calls the data handlers....

So it maybe a bit costly to maintain dup check database for this only case.
Also the case described here is a bit strange, if there is a new message (published again) in the system why should we prevent our subscribers to get it?

@darshankabariya
Copy link
Contributor Author

darshankabariya commented Jun 24, 2024

Hi @NagyZoltanPeter, I want to clarify that when I mentioned 'relay message,' I meant 'send message,' not using Waku relay. Let's set aside dynamic subscription for now and focus on the original issue. Regarding your suggestion about not needing to store messages for every peer, I'm considering using a shared msg_hash pool using TimedCache. I have a question: could you please advise on how long we should cache these messages for? The default is set to just 10 milliseconds.

@NagyZoltanPeter
Copy link
Contributor

Hi @NagyZoltanPeter, I want to clarify that when I mentioned 'relay message,' I meant 'send message,' not using Waku relay. Let's set aside dynamic subscription for now and focus on the original issue. Regarding your suggestion about not needing to store messages for every peer, I'm considering using a shared hash pool. I have a question: could you please advise on how long we should cache these messages for? Currently, the default is set to just 10 milliseconds.

Ok, I see. Sorry, maybe I was not very clear.
So filter protocol will push messages to its subscribed clients (filter-client and not REST user here). It can happen from two source: relaying a message by our node or from publish (when our node introduce a message to the network).
Florin's case was the latter.
First case filter will know messages from relay if they are valid and not duplicates. For publish case there is no such check currently because of the architecture of pubsub and gossipsub classes.

In case a message comes filter collects subscribed peers and right away sends them the message. There is no message cache involved here.
There is a message cache but only for REST API where the REST client must poll the filtered messages. But for this case there is one message cache for the only local REST user client, there is no multi tenant query supported.

Back to your question. Gossipsub retains seen message lookup for 2 minutes. I don't think any such solution if applied should hold for more time. - but still it is a question of requirements with it what we would like to prevent with this solution?

Currently, the default is set to just 10 milliseconds. What exactly is it and where?

@darshankabariya
Copy link
Contributor Author

darshankabariya commented Jun 24, 2024

Thanks, @NagyZoltanPeter. now is more clear to me. If we create and publish a message, and then publish the same message again, why should we prevent our subscribers from seeing it a second time? I assume there might be scenarios where we publish the same message with the same timestamp. If the payload is the same but the timestamp is different, our fix will not stop the second message from being published. However, let's ask Florin for further clarification.

Regarding the timeout variable, it's a default variable, so we can adjust it as needed. That's why I'm asking what value we should use for our solution. Default Timeout

For the sake of implementation, I will proceed with the timed cache logic to help me better understanding. If, as you mentioned, this concern is not necessary, we will not merge the PR.

Hi @fbarbu15, Could you please help us with our confusion ?

@NagyZoltanPeter
Copy link
Contributor

Thanks, @NagyZoltanPeter. now is more clear to me. If we create and publish a message, and then publish the same message again, why should we prevent our subscribers from seeing it a second time? I assume there might be scenarios where we publish the same message with the same timestamp. If the payload is the same but the timestamp is different, our fix will not stop the second message from being published. However, let's ask Florin for further clarification.

Regarding the timeout variable, it's a default variable, so we can adjust it as needed. That's why I'm asking what value we should use for our solution. Default Timeout

I see, thank you. In this case not the default is used but this.

For the sake of implementation, I will proceed with the timed cache logic to help me better understanding. If, as you mentioned, this concern is not necessary, we will not merge the PR.

Hi @fbarbu15, Could you please help us with our confusion ?

I think this kind of duplicates is possible if:

  • the filter service is on the same node as
    • used for lightpush
    • used with REST API POST relay

Other case, the more generic one is when filter service pushes messages got from relay.
It does not mean it cannot be a problem. We might consider other ways to do it.

But all in all, the message hash dup check with a timed manner is a possible good solution. Just do avoid uncontrolled message hash collection. We do expect to run pretty long and handling enormous amount of messages and this would lead to using all memory after some point.

Copy link
Contributor

@NagyZoltanPeter NagyZoltanPeter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems much much better!
I added some short suggestions and if you may consider adding unit tests to check dup cases.

Comment on lines 181 to 182
wf.messageCache.expire(Moment.now())
if wf.messageCache.contains(msgHash):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
wf.messageCache.expire(Moment.now())
if wf.messageCache.contains(msgHash):
if wf.messageCache.put(msgHash):

I think this is simpler and does exactly the same. It will call expire and will refresh timestamp also.

Comment on lines 203 to 204
## expiration refreshed that's why update cache every time, even if it has a value.
discard wf.messageCache.put(msgHash, Moment.now())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
## expiration refreshed that's why update cache every time, even if it has a value.
discard wf.messageCache.put(msgHash, Moment.now())

This is not needed in case my previous suggestion above applied.

@@ -295,6 +307,7 @@ proc new*(
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
),
peerManager: peerManager,
messageCache: init(TimedCache[string], timeout = 2.minutes),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would personally add the cache timeout as an input argument to the WakuFilter's new because it enables configurability even if most cases default is choosern.
Also it enables to create very easy unit tests that checks the feature correctness automatically.
So in unit test this way you can apply 1.seconds timeout which is more than enough to check such dup cases.
WDYT?

Copy link
Contributor Author

@darshankabariya darshankabariya Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree, this feels hard-coded. Let's set 2 minutes as the default and allow the Waku filter to modify it as needed. Thanks for mentioning the unit case; I'm happy to add a test case.

@darshankabariya darshankabariya merged commit 99149ea into master Jun 25, 2024
8 of 10 checks passed
@darshankabariya darshankabariya deleted the bug_2320 branch June 25, 2024 19:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

bug: Filter relay/v1/messages GET returns duplicate messages
3 participants