-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Discard RDATA from already seen positions #7648
Conversation
@erikjohnston Hope it is OK to set you directly as the reviewer, but since we've already talked about it a bunch that seemed to make the most sense. This approach seems to work fine for streams that reset, with our assumption that a Please let me know how this looks! |
synapse/app/generic_worker.py
Outdated
if stream_name == TypingStream.NAME: | ||
self.typing_handler.process_replication_rows(token, []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will need to be expanded to include additional streams, if it is the right fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think this makes sense. Though I wonder if we should just call on_rdata
so that we don't duplicate things
@erikjohnston Can you take a look at this again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good! The gut wrenching in the typing test is a bit sad, but I don't suggest we try and clean it up now
synapse/app/generic_worker.py
Outdated
if stream_name == TypingStream.NAME: | ||
self.typing_handler.process_replication_rows(token, []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think this makes sense. Though I wonder if we should just call on_rdata
so that we don't duplicate things
Co-authored-by: Erik Johnston <erik@matrix.org>
@@ -738,6 +738,10 @@ async def _process_and_notify(self, stream_name, instance_name, token, rows): | |||
except Exception: | |||
logger.exception("Error processing replication") | |||
|
|||
async def on_position(self, stream_name: str, instance_name: str, token: int): | |||
await super().on_position(stream_name, instance_name, token) | |||
await self.on_rdata(stream_name, instance_name, token, []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably needs a comment, but I'm not really sure what to suggest...
Also call
on_rdata
to ensure that stream positions are properly reset.
Or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, something like that. I don't have any better suggestions at least :/
* commit '03619324f': Create a ListenerConfig object (#7681) Fix changelog wording 1.15.1 Wrap register_device coroutine in an ensureDeferred (#7684) Ensure the body is a string before comparing push rules. (#7701) Ensure etag is a string for GET room_keys/version response (#7691) Update m.id.phone to use 'phone' instead of 'number' (#7687) Fix "There was no active span when trying to log." error (#7698) Enable 3PID add/bind/unbind endpoints on r0 routes Discard RDATA from already seen positions. (#7648) Replace iteritems/itervalues/iterkeys with native versions. (#7692) Fix warnings about losing log context during UI auth. (#7688) Fix a typo when comparing the URI & method during UI Auth. (#7689) Remove "user_id" from GET /presence. (#7606) Increase the default SAML session expirary time to 15 minutes. (#7664) fix typo in sample_config.yaml (#7652) Take out a lock before modifying _CACHES (#7663) Add option to enable encryption by default for new rooms (#7639) Clean-up the fallback login code. (#7657)
When processing
RDATA
commands via replication, discard anyRDATA
that has already been seen. This is calculated via asking the stream to get the "current position" and comparing to the incoming token.Fixes #7360
To Do
typing
andfederation
).