Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Don't wait for streams when asking for stream updates
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jan 11, 2023
1 parent b134a67 commit 3dca06c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
16 changes: 11 additions & 5 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
a connection error is received.
RETRY_ON_CONNECT_ERROR_ATTEMPTS (int): Number of attempts to retry when
receiving connection errors, each will backoff exponentially longer.
WAIT_FOR_STREAMS (bool): Whether to wait for replication streams to
catch up before processing the request and/or response. Defaults to
True.
"""

NAME: str = abc.abstractproperty() # type: ignore
Expand All @@ -108,6 +111,8 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
RETRY_ON_CONNECT_ERROR = True
RETRY_ON_CONNECT_ERROR_ATTEMPTS = 5 # =63s (2^6-1)

WAIT_FOR_STREAMS = True

def __init__(self, hs: "HomeServer"):
if self.CACHE:
self.response_cache: ResponseCache[str] = ResponseCache(
Expand Down Expand Up @@ -231,7 +236,7 @@ async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:

data = await cls._serialize_payload(**kwargs)

if cls.METHOD != "GET":
if cls.METHOD != "GET" and cls.WAIT_FOR_STREAMS:
# Include the current stream positions that we write to. We
# don't do this for GETs as they don't have a body, and we
# generally assume that a GET won't rely on data we have
Expand Down Expand Up @@ -436,9 +441,10 @@ async def _check_auth_and_handle(
if _STREAM_POSITION_KEY in response:
raise Exception("data to send contains %r key", _STREAM_POSITION_KEY)

response[_STREAM_POSITION_KEY] = {
stream.NAME: stream.current_token(self._instance_name)
for stream in self._streams
}
if self.WAIT_FOR_STREAMS:
response[_STREAM_POSITION_KEY] = {
stream.NAME: stream.current_token(self._instance_name)
for stream in self._streams
}

return code, response
4 changes: 4 additions & 0 deletions synapse/replication/http/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
PATH_ARGS = ("stream_name",)
METHOD = "GET"

# We don't want to wait for replication streams to catch up, as this gets
# called in the process of catching replication streams up.
WAIT_FOR_STREAMS = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

Expand Down
1 change: 1 addition & 0 deletions tests/replication/http/test__base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class UncancellableReplicationEndpoint(ReplicationEndpoint):
NAME = "uncancellable_sleep"
PATH_ARGS = ()
CACHE = False
WAIT_FOR_STREAMS = False

def __init__(self, hs: HomeServer):
super().__init__(hs)
Expand Down

0 comments on commit 3dca06c

Please sign in to comment.