From ff4a4c78e9970cf583c5ac0bfed1735dedd9aff4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Aug 2024 17:57:24 +0100 Subject: [PATCH 1/4] SS: Reset connection if token is unrecognized This triggers the client to start a new sliding sync connection. If we don't do this and the client asks for the full range of rooms, we end up sending down all rooms and their state from scratch (which can be very slow) --- synapse/api/errors.py | 18 ++++++++++++ synapse/handlers/sliding_sync.py | 19 +++++++++++++ .../sliding_sync/test_rooms_required_state.py | 28 ++++++++----------- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index dd4a1ae706..76f0a08dac 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -128,6 +128,10 @@ class Codes(str, Enum): # MSC2677 DUPLICATE_ANNOTATION = "M_DUPLICATE_ANNOTATION" + # MSC3575 we are telling the client they need to reset their sliding sync + # connection. + UNKNOWN_POS = "M_UNKNOWN_POS" + class CodeMessageException(RuntimeError): """An exception with integer code, a message string attributes and optional headers. @@ -847,3 +851,17 @@ def __init__(self) -> None: msg=PartialStateConflictError.message(), errcode=Codes.UNKNOWN, ) + + +class SlidingSyncUnknownPosition(SynapseError): + """An error that Synapse can return to signal to the client to reset their + sliding sync connection (i.e. send a new request without a `?since=` + param). + """ + + def __init__(self) -> None: + super().__init__( + HTTPStatus.BAD_REQUEST, + msg="Unknown position", + errcode=Codes.UNKNOWN_POS, + ) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 8467766518..0bcafc5c5b 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -47,6 +47,7 @@ EventTypes, Membership, ) +from synapse.api.errors import SlidingSyncUnknownPosition from synapse.events import EventBase, StrippedStateEvent from synapse.events.utils import parse_stripped_state_event, strip_event from synapse.handlers.relations import BundledAggregations @@ -491,6 +492,14 @@ async def current_sync_for_user( # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() + if from_token: + # Check that we recognize the connection position, if not tell the + # clients that they need to start again. + if not await self.connection_store.is_valid_token( + sync_config, from_token.connection_position + ): + raise SlidingSyncUnknownPosition() + await self.connection_store.mark_token_seen( sync_config=sync_config, from_token=from_token, @@ -2821,6 +2830,16 @@ class SlidingSyncConnectionStore: attr.Factory(dict) ) + async def is_valid_token( + self, sync_config: SlidingSyncConfig, connection_token: int + ) -> bool: + """Return whether the connection token is valid/recognized""" + if connection_token == 0: + return True + + conn_key = self._get_connection_key(sync_config) + return connection_token in self._connections.get(conn_key, {}) + async def have_sent_room( self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str ) -> HaveSentRoom: diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index 03e36914ae..d96d2450d0 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -161,10 +161,10 @@ def test_rooms_required_state_incremental_sync(self) -> None: self.assertIsNone(response_body["rooms"][room_id1].get("required_state")) self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) - def test_rooms_required_state_incremental_sync_restart(self) -> None: + def test_rooms_incremental_sync_restart(self) -> None: """ - Test `rooms.required_state` returns requested state events in the room during an - incremental sync, after a restart (and so the in memory caches are reset). + Test that after a restart (and so the in memory caches are reset) that + we correctly return an `M_UNKNOWN_POST` """ user1_id = self.register_user("user1", "pass") @@ -195,22 +195,16 @@ def test_rooms_required_state_incremental_sync_restart(self) -> None: self.hs.get_sliding_sync_handler().connection_store._connections.clear() # Make the Sliding Sync request - response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) - - # If the cache has been cleared then we do expect the state to come down - state_map = self.get_success( - self.storage_controllers.state.get_current_state(room_id1) + channel = self.make_request( + method="POST", + path=self.sync_endpoint + f"?pos={from_token}", + content=sync_body, + access_token=user1_tok, ) - - self._assertRequiredStateIncludes( - response_body["rooms"][room_id1]["required_state"], - { - state_map[(EventTypes.Create, "")], - state_map[(EventTypes.RoomHistoryVisibility, "")], - }, - exact=True, + self.assertEqual(channel.code, 400, channel.json_body) + self.assertEqual( + channel.json_body["errcode"], "M_UNKNOWN_POS", channel.json_body ) - self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_wildcard(self) -> None: """ From 4af7ecc2824aad2a5f9499adf11749ade4d62878 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Aug 2024 17:59:44 +0100 Subject: [PATCH 2/4] Newsfile --- changelog.d/17529.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17529.misc diff --git a/changelog.d/17529.misc b/changelog.d/17529.misc new file mode 100644 index 0000000000..37b2ee07a4 --- /dev/null +++ b/changelog.d/17529.misc @@ -0,0 +1 @@ +Reset the sliding sync connection if we don't recognize the per-connection state position. From bbce7b428325bff70a41ba8af813e02eb92dfa53 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Aug 2024 09:47:35 +0100 Subject: [PATCH 3/4] Update tests/rest/client/sliding_sync/test_rooms_required_state.py Co-authored-by: Eric Eastwood --- tests/rest/client/sliding_sync/test_rooms_required_state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index d96d2450d0..a13cad223f 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -164,7 +164,7 @@ def test_rooms_required_state_incremental_sync(self) -> None: def test_rooms_incremental_sync_restart(self) -> None: """ Test that after a restart (and so the in memory caches are reset) that - we correctly return an `M_UNKNOWN_POST` + we correctly return an `M_UNKNOWN_POS` """ user1_id = self.register_user("user1", "pass") From 1832c45c98d6911dfd9e176252ac926d96073d41 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Aug 2024 09:54:54 +0100 Subject: [PATCH 4/4] Update comments --- synapse/api/errors.py | 2 +- synapse/handlers/sliding_sync.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 76f0a08dac..99fc7eab54 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -854,7 +854,7 @@ def __init__(self) -> None: class SlidingSyncUnknownPosition(SynapseError): - """An error that Synapse can return to signal to the client to reset their + """An error that Synapse can return to signal to the client to expire their sliding sync connection (i.e. send a new request without a `?since=` param). """ diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 0bcafc5c5b..1936471345 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -495,6 +495,14 @@ async def current_sync_for_user( if from_token: # Check that we recognize the connection position, if not tell the # clients that they need to start again. + # + # If we don't do this and the client asks for the full range of + # rooms, we end up sending down all rooms and their state from + # scratch (which can be very slow). By expiring the connection we + # allow the client a chance to do an initial request with a smaller + # range of rooms to get them some results sooner but will end up + # taking the same amount of time (more with round-trips and + # re-processing) in the end to get everything again. if not await self.connection_store.is_valid_token( sync_config, from_token.connection_position ):