-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Remove concept of a non-limited stream. #7011
Changes from all commits
d0a5571
f3edad7
2d85553
22de343
f3bc2d1
80fc299
7712f41
9986cdb
1410dab
5c44939
ea26880
6b79830
326cdc8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Remove concept of a non-limited stream. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -166,11 +166,6 @@ async def _run_notifier_loop(self): | |
self.pending_updates = False | ||
|
||
with Measure(self.clock, "repl.stream.get_updates"): | ||
# First we tell the streams that they should update their | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm assuming we didn't do this for the good of our health, but rather to combat race conditions between the streams. What has changed so that it is safe to get rid of it now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, I've been trying to get my head around what this is protecting and I think I've understood a little bit here. Couple of potential reasons for having this:
Now the slight problem here is that while we do ensure we update the tokens of all streams in lockstep, we then send down updates batched by stream (rather than trying to interleave the streams so that earlier updates go first). This means that the race conditions above can still happen anyway, AFAICT. The fact that item two above is still racy sounds like a bug that needs to be fixed either way, probably easiest by merging the relevant streams so that there is a one to one mapping between streams and ID generators (I think its only global vs room account data streams and devices and user signature streams). For item one, I'm not convinced that updating the current token in lock step actually helps that much. Ideally in each loop there will only be one or two updates to send and so the lock step does nothing, while conversely if there are a lot of updates to send down the loop workers will see updates out of order between streams anyway as we send down updates by stream. I'm therefore minded to keep the removal of this code simply because I think it makes things a bit clearer and easier to reason about. Thoughts welcome though, as I feel like I might still be missing something, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well, it sounds pretty plausible... |
||
# current tokens. | ||
for stream in self.streams: | ||
stream.advance_current_token() | ||
|
||
all_streams = self.streams | ||
|
||
if self._replication_torture_level is not None: | ||
|
@@ -180,7 +175,7 @@ async def _run_notifier_loop(self): | |
random.shuffle(all_streams) | ||
|
||
for stream in all_streams: | ||
if stream.last_token == stream.upto_token: | ||
if stream.last_token == stream.current_token(): | ||
continue | ||
|
||
if self._replication_torture_level: | ||
|
@@ -192,7 +187,7 @@ async def _run_notifier_loop(self): | |
"Getting stream: %s: %s -> %s", | ||
stream.NAME, | ||
stream.last_token, | ||
stream.upto_token, | ||
stream.current_token(), | ||
) | ||
try: | ||
updates, current_token = await stream.get_updates() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eyebrow raised: we're just dropping updates beyond the limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current situation is that the replication sees that it is limited and panics. The next
episodePR in the series changes it so that if the stream is limited then it gets the token from the last item in the returned listThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(ftr: the relevant PR is #7024)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conclusions from discussion elsewhere:
limit
andcurrent_id
is not unique to this function: all of theupdate_function
s do the same thing (it just looks a bit different here because the typing stream is in-memory)synapse/synapse/replication/tcp/streams/_base.py
Line 200 in 1410dab
update_functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
further discussion:
... but also because we're going to rely on the
update_functions
honouring theirlimit
in a future PR, so we should bring this one in line now even though it's not technically necessary yet.