Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove concept of a non-limited stream. #7011

Merged
merged 13 commits into from
Mar 20, 2020
9 changes: 2 additions & 7 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,6 @@ async def _run_notifier_loop(self):
self.pending_updates = False

with Measure(self.clock, "repl.stream.get_updates"):
# First we tell the streams that they should update their
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming we didn't do this for the good of our health, but rather to combat race conditions between the streams. What has changed so that it is safe to get rid of it now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I've been trying to get my head around what this is protecting and I think I've understood a little bit here. Couple of potential reasons for having this:

  1. Helps preserve a bit of ordering across different streams, e.g. if there's a typing notification and then a message the synchrotrons are likely to get told about the typing notification and then the message.
  2. Some streams share the same ID generator/token (e.g. the two tag streams). We want to make sure that we send down the updates for both streams up to the same token so that any stream change caches get correctly updated before anything queries for changes up to the new current token.

Now the slight problem here is that while we do ensure we update the tokens of all streams in lockstep, we then send down updates batched by stream (rather than trying to interleave the streams so that earlier updates go first). This means that the race conditions above can still happen anyway, AFAICT.

The fact that item two above is still racy sounds like a bug that needs to be fixed either way, probably easiest by merging the relevant streams so that there is a one to one mapping between streams and ID generators (I think its only global vs room account data streams and devices and user signature streams).

For item one, I'm not convinced that updating the current token in lock step actually helps that much. Ideally in each loop there will only be one or two updates to send and so the lock step does nothing, while conversely if there are a lot of updates to send down the loop workers will see updates out of order between streams anyway as we send down updates by stream.

I'm therefore minded to keep the removal of this code simply because I think it makes things a bit clearer and easier to reason about.

Thoughts welcome though, as I feel like I might still be missing something,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, it sounds pretty plausible...

# current tokens.
for stream in self.streams:
stream.advance_current_token()

all_streams = self.streams

if self._replication_torture_level is not None:
Expand All @@ -180,7 +175,7 @@ async def _run_notifier_loop(self):
random.shuffle(all_streams)

for stream in all_streams:
if stream.last_token == stream.upto_token:
if stream.last_token == stream.current_token():
continue

if self._replication_torture_level:
Expand All @@ -192,7 +187,7 @@ async def _run_notifier_loop(self):
"Getting stream: %s: %s -> %s",
stream.NAME,
stream.last_token,
stream.upto_token,
stream.current_token(),
)
try:
updates, current_token = await stream.get_updates()
Expand Down
22 changes: 5 additions & 17 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class Stream(object):
"""Base class for the streams.

Provides a `get_updates()` function that returns new updates since the last
time it was called up until the point `advance_current_token` was called.
time it was called up.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""

NAME = None # type: str # The name of the stream
Expand All @@ -146,26 +146,15 @@ def __init__(self, hs):
# The token from which we last asked for updates
self.last_token = self.current_token()

# The token that we will get updates up to
self.upto_token = self.current_token()

def advance_current_token(self):
"""Updates `upto_token` to "now", which updates up until which point
get_updates[_since] will fetch rows till.
"""
self.upto_token = self.current_token()

def discard_updates_and_advance(self):
"""Called when the stream should advance but the updates would be discarded,
e.g. when there are no currently connected workers.
"""
self.upto_token = self.current_token()
self.last_token = self.upto_token
self.last_token = self.current_token()

async def get_updates(self):
"""Gets all updates since the last time this function was called (or
since the stream was constructed if it hadn't been called before),
until the `upto_token`
since the stream was constructed if it hadn't been called before).

Returns:
Deferred[Tuple[List[Tuple[int, Any]], int]:
Expand All @@ -189,16 +178,15 @@ async def get_updates_since(self, from_token):
sent over the replication steam.
"""
if from_token in ("NOW", "now"):
return [], self.upto_token
return [], self.current_token()

current_token = self.upto_token
current_token = self.current_token()

from_token = int(from_token)

if from_token == current_token:
return [], current_token

logger.info("get_updates_since: %s", self.__class__)
if self._LIMITED:
rows = await self.update_function(
from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
Expand Down