diff --git a/changelog.d/14831.misc b/changelog.d/14831.misc new file mode 100644 index 000000000000..72d6463f254e --- /dev/null +++ b/changelog.d/14831.misc @@ -0,0 +1 @@ +Non lazy loading sync not blocking during fast join. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 78d488f2b1cb..9cf1f29de139 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1817,11 +1817,34 @@ async def _generate_sync_entry_for_rooms( ) sync_result_builder.now_token = now_token + # Retrieve rooms that got un partial stated in the meantime, only useful in case + # of a non lazy-loading-members sync. + un_partial_stated_rooms = set() + if not sync_result_builder.sync_config.filter_collection.lazy_load_members(): + un_partial_state_rooms_since = 0 + if sync_result_builder.since_token is not None: + un_partial_state_rooms_since = int( + sync_result_builder.since_token.un_partial_state_rooms_key + ) + + un_partial_state_rooms_now = int( + sync_result_builder.now_token.un_partial_state_rooms_key + ) + if un_partial_state_rooms_since != un_partial_state_rooms_now: + un_partial_stated_rooms = ( + await self.store.get_un_partial_stated_rooms_between( + un_partial_state_rooms_since, + un_partial_state_rooms_now, + ) + ) + # 2. We check up front if anything has changed, if it hasn't then there is # no point in going further. if not sync_result_builder.full_state: if since_token and not ephemeral_by_room and not account_data_by_room: - have_changed = await self._have_rooms_changed(sync_result_builder) + have_changed = await self._have_rooms_changed( + sync_result_builder, un_partial_stated_rooms + ) log_kv({"rooms_have_changed": have_changed}) if not have_changed: tags_by_room = await self.store.get_updated_tags( @@ -1835,7 +1858,7 @@ async def _generate_sync_entry_for_rooms( ignored_users = await self.store.ignored_users(user_id) if since_token: room_changes = await self._get_rooms_changed( - sync_result_builder, ignored_users + sync_result_builder, ignored_users, un_partial_stated_rooms ) tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key @@ -1888,7 +1911,9 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None: ) async def _have_rooms_changed( - self, sync_result_builder: "SyncResultBuilder" + self, + sync_result_builder: "SyncResultBuilder", + un_partial_stated_rooms: Set[str], ) -> bool: """Returns whether there may be any new events that should be sent down the sync. Returns True if there are. @@ -1905,6 +1930,11 @@ async def _have_rooms_changed( stream_id = since_token.room_key.stream for room_id in sync_result_builder.joined_room_ids: + # If a room has been un partial stated in the meantime, + # let's consider it has changes and deal with it accordingly + # in _get_rooms_changed. + if room_id in un_partial_stated_rooms: + return True if self.store.has_room_changed_since(room_id, stream_id): return True return False @@ -1913,6 +1943,7 @@ async def _get_rooms_changed( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str], + un_partial_stated_rooms: Set[str], ) -> _RoomChanges: """Determine the changes in rooms to report to the user. @@ -2116,7 +2147,24 @@ async def _get_rooms_changed( room_entry = room_to_events.get(room_id, None) newly_joined = room_id in newly_joined_rooms - if room_entry: + + # In case of a non lazy-loading-members sync we want to include + # rooms that got un partial stated in the meantime, and we need + # to include the full state of them. + if ( + not sync_config.filter_collection.lazy_load_members() + and room_id in un_partial_stated_rooms + ): + entry = RoomSyncResultBuilder( + room_id=room_id, + rtype="joined", + events=None, + newly_joined=True, + full_state=True, + since_token=None, + upto_token=now_token, + ) + elif room_entry: events, start_key = room_entry prev_batch_token = now_token.copy_and_replace( @@ -2186,6 +2234,13 @@ async def _get_all_rooms( knocked = [] for event in room_list: + # Do not include rooms that we don't have the full state yet + # in case of non lazy-loading-members sync. + if ( + not sync_config.filter_collection.lazy_load_members() + ) and await self.store.is_partial_state_room(event.room_id): + continue + if event.room_version_id not in KNOWN_ROOM_VERSIONS: continue diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index aea96e9d2478..95787c2cfd8c 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -292,6 +292,7 @@ def _get_recent_references_for_event_txn( to_device_key=0, device_list_key=0, groups_key=0, + un_partial_state_rooms_key=0, ) return events[:limit], next_token diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 78906a5e1d9e..c614eda0760e 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -26,6 +26,7 @@ Mapping, Optional, Sequence, + Set, Tuple, Union, cast, @@ -1285,10 +1286,39 @@ def get_un_partial_stated_rooms_token(self) -> int: # explanation.) return self._un_partial_stated_rooms_stream_id_gen.get_current_token() + async def get_un_partial_stated_rooms_between( + self, last_id: int, current_id: int + ) -> Set[str]: + """Get all rooms that got un partial stated between `last_id` exclusive and + `current_id` inclusive. + + Returns: + The list of rooms. + """ + + if last_id == current_id: + return set() + + def _get_un_partial_stated_rooms_between_txn( + txn: LoggingTransaction, + ) -> Set[str]: + sql = """ + SELECT DISTINCT room_id FROM un_partial_stated_room_stream + WHERE ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (last_id, current_id)) + + return {r[0] for r in txn} + + return await self.db_pool.runInteraction( + "get_un_partial_stated_rooms_between", + _get_un_partial_stated_rooms_between_txn, + ) + async def get_un_partial_stated_rooms_from_stream( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]: - """Get updates for caches replication stream. + """Get updates for un partial stated rooms replication stream. Args: instance_name: The writer we want to fetch updates from. Unused diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 619eb7f601de..7e7bd160b59c 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -58,6 +58,7 @@ def get_current_token(self) -> StreamToken: push_rules_key = self.store.get_max_push_rules_stream_id() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() + un_partial_state_rooms_key = self.store.get_un_partial_stated_rooms_token() token = StreamToken( room_key=self.sources.room.get_current_key(), @@ -70,6 +71,7 @@ def get_current_token(self) -> StreamToken: device_list_key=device_list_key, # Groups key is unused. groups_key=0, + un_partial_state_rooms_key=un_partial_state_rooms_key, ) return token @@ -107,5 +109,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken: to_device_key=0, device_list_key=0, groups_key=0, + un_partial_state_rooms_key=0, ) return token diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 0c725eb9677d..d378c39ec2eb 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -646,12 +646,13 @@ class StreamToken: 7. `to_device_key`: `274711` 8. `device_list_key`: `265584` 9. `groups_key`: `1` (note that this key is now unused) + 10. `un_partial_state_rooms_key`: `379` You can see how many of these keys correspond to the various fields in a "/sync" response: ```json { - "next_batch": "s12_4_0_1_1_1_1_4_1", + "next_batch": "s12_4_0_1_1_1_1_4_1_1", "presence": { "events": [] }, @@ -663,7 +664,7 @@ class StreamToken: "!QrZlfIDQLNLdZHqTnt:hs1": { "timeline": { "events": [], - "prev_batch": "s10_4_0_1_1_1_1_4_1", + "prev_batch": "s10_4_0_1_1_1_1_4_1_1", "limited": false }, "state": { @@ -699,6 +700,7 @@ class StreamToken: device_list_key: int # Note that the groups key is no longer used and may have bogus values. groups_key: int + un_partial_state_rooms_key: int _SEPARATOR = "_" START: ClassVar["StreamToken"] @@ -737,6 +739,7 @@ async def to_string(self, store: "DataStore") -> str: # serialized so that there will not be confusion in the future # if additional tokens are added. str(self.groups_key), + str(self.un_partial_state_rooms_key), ] ) @@ -769,7 +772,7 @@ def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken": return attr.evolve(self, **{key: new_value}) -StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0) +StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0) @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index e0f5d54abab0..453a6e979c02 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -1831,7 +1831,7 @@ def test_timestamp_to_event(self) -> None: def test_topo_token_is_accepted(self) -> None: """Test Topo Token is accepted.""" - token = "t1-0_0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), @@ -1845,7 +1845,7 @@ def test_topo_token_is_accepted(self) -> None: def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: """Test that stream token is accepted for forward pagination.""" - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index b4daace55617..9222cab19801 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -1987,7 +1987,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.room_id = self.helper.create_room_as(self.user_id) def test_topo_token_is_accepted(self) -> None: - token = "t1-0_0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) ) @@ -1998,7 +1998,7 @@ def test_topo_token_is_accepted(self) -> None: self.assertTrue("end" in channel.json_body) def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) ) @@ -2728,7 +2728,7 @@ def test_messages_filter_labels(self) -> None: """Test that we can filter by a label on a /messages request.""" self._send_labelled_messages_in_room() - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=%s&from=%s&filter=%s" @@ -2745,7 +2745,7 @@ def test_messages_filter_not_labels(self) -> None: """Test that we can filter by the absence of a label on a /messages request.""" self._send_labelled_messages_in_room() - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=%s&from=%s&filter=%s" @@ -2768,7 +2768,7 @@ def test_messages_filter_labels_not_labels(self) -> None: """ self._send_labelled_messages_in_room() - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"