From 59314d5e411a5510b52d59c8240c9c048603ef50 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Jul 2024 15:41:23 +0100 Subject: [PATCH 01/10] Fix bug in sliding sync when using old DB. We don't necessarily have `instance_name` for old events (before we support multiple event persisters). We treat those as if the `instance_name` was "master". --- synapse/storage/databases/main/roommember.py | 6 +++++- synapse/storage/databases/main/stream.py | 10 +++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index d8b54dc4e3..1255690cd9 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -494,7 +494,11 @@ def _get_rooms_for_local_user_where_membership_is_txn( sender=sender, membership=membership, event_id=event_id, - event_pos=PersistedEventPosition(instance_name, stream_ordering), + event_pos=PersistedEventPosition( + # If instance_name is null we default to "master" + instance_name or "master", + stream_ordering, + ), room_version_id=room_version, ) for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index d34376b8df..c635dbf96a 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -936,7 +936,8 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]: # Event event_id=event_id, event_pos=PersistedEventPosition( - instance_name=instance_name, + # If instance_name is null we default to "master" + instance_name=instance_name or "master", stream=stream_ordering, ), # When `s.event_id = null`, we won't be able to get respective @@ -952,7 +953,8 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]: prev_event_id=prev_event_id, prev_event_pos=( PersistedEventPosition( - instance_name=prev_instance_name, + # If instance_name is null we default to "master" + instance_name=prev_instance_name or "master", stream=prev_stream_ordering, ) if ( @@ -1257,7 +1259,9 @@ def get_last_event_pos_in_room_before_stream_ordering_txn( stream_ordering=stream_ordering, ): return event_id, PersistedEventPosition( - instance_name, stream_ordering + # If instance_name is null we default to "master" + instance_name or "master", + stream_ordering, ) return None From c742b747d7c907a11d5e7a5738b45cf37c7ef56b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Jul 2024 15:42:42 +0100 Subject: [PATCH 02/10] Newsfile --- changelog.d/17398.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17398.bugfix diff --git a/changelog.d/17398.bugfix b/changelog.d/17398.bugfix new file mode 100644 index 0000000000..c09c8f3d82 --- /dev/null +++ b/changelog.d/17398.bugfix @@ -0,0 +1 @@ +Fix bug in experimenntal sliding sync support when using an old database. From b2880b6d60ec6a27023d63c06706f2b28d65e9c3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Jul 2024 17:05:50 +0100 Subject: [PATCH 03/10] Update changelog.d/17398.bugfix Co-authored-by: Eric Eastwood --- changelog.d/17398.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/17398.bugfix b/changelog.d/17398.bugfix index c09c8f3d82..7931c431ef 100644 --- a/changelog.d/17398.bugfix +++ b/changelog.d/17398.bugfix @@ -1 +1 @@ -Fix bug in experimenntal sliding sync support when using an old database. +Fix bug in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint when using an old database. From b1773077a7bf7bfe5e7c4ef2207f68e4160231e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Jul 2024 17:15:36 +0100 Subject: [PATCH 04/10] Also fixup internal metadata --- synapse/storage/databases/main/events_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index a5acea8c3b..761e393695 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1457,7 +1457,7 @@ def _fetch_event_rows( event_dict[event_id] = _EventRow( event_id=event_id, stream_ordering=row[1], - instance_name=row[2], + instance_name=row[2] or "master", internal_metadata=row[3], json=row[4], format_version=row[5], From 5a47fede6eac1f14abb77ae076f6c8019ae558bb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Jul 2024 17:19:19 +0100 Subject: [PATCH 05/10] Remove unused function --- synapse/storage/_base.py | 6 - synapse/storage/databases/main/cache.py | 10 -- synapse/storage/databases/main/roommember.py | 55 --------- tests/handlers/test_sync.py | 1 - tests/replication/storage/test_events.py | 123 +------------------ 5 files changed, 1 insertion(+), 194 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b127289d8d..881888fa93 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -119,9 +119,6 @@ def _invalidate_state_caches( self._attempt_to_invalidate_cache( "get_user_in_room_with_profile", (room_id, user_id) ) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", (user_id,) - ) self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,)) # Purge other caches based on room state. @@ -148,9 +145,6 @@ def _invalidate_state_caches_all(self, room_id: str) -> None: self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", None - ) self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index c6787faea0..2d6b75e47e 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -268,16 +268,12 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] if data.type == EventTypes.Member: - self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined] - (data.state_key,) - ) self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined] elif row.type == EventsStreamAllStateRow.TypeId: assert isinstance(data, EventsStreamAllStateRow) # Similar to the above, but the entire caches are invalidated. This is # unfortunate for the membership caches, but should recover quickly. self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] - self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined] self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined] else: raise Exception("Unknown events stream row type %s" % (row.type,)) @@ -334,9 +330,6 @@ def _invalidate_caches_for_event( self._attempt_to_invalidate_cache( "get_invited_rooms_for_local_user", (state_key,) ) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", (state_key,) - ) self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,)) self._attempt_to_invalidate_cache( @@ -399,9 +392,6 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None: self._attempt_to_invalidate_cache("get_thread_id", None) self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None) self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", None - ) self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("did_forget", None) self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 1255690cd9..fc8a37e295 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -51,7 +51,6 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import Sqlite3Engine from synapse.storage.roommember import ( - GetRoomsForUserWithStreamOrdering, MemberSummary, ProfileInfo, RoomsForUser, @@ -610,53 +609,6 @@ async def get_local_current_membership_for_user_in_room( return results - @cached(max_entries=500000, iterable=True) - async def get_rooms_for_user_with_stream_ordering( - self, user_id: str - ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: - """Returns a set of room_ids the user is currently joined to. - - If a remote user only returns rooms this server is currently - participating in. - - Args: - user_id - - Returns: - Returns the rooms the user is in currently, along with the stream - ordering of the most recent join for that user and room, along with - the room version of the room. - """ - return await self.db_pool.runInteraction( - "get_rooms_for_user_with_stream_ordering", - self._get_rooms_for_user_with_stream_ordering_txn, - user_id, - ) - - def _get_rooms_for_user_with_stream_ordering_txn( - self, txn: LoggingTransaction, user_id: str - ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: - # We use `current_state_events` here and not `local_current_membership` - # as a) this gets called with remote users and b) this only gets called - # for rooms the server is participating in. - sql = """ - SELECT room_id, e.instance_name, e.stream_ordering - FROM current_state_events AS c - INNER JOIN events AS e USING (room_id, event_id) - WHERE - c.type = 'm.room.member' - AND c.state_key = ? - AND c.membership = ? - """ - - txn.execute(sql, (user_id, Membership.JOIN)) - return frozenset( - GetRoomsForUserWithStreamOrdering( - room_id, PersistedEventPosition(instance, stream_id) - ) - for room_id, instance, stream_id in txn - ) - async def get_users_server_still_shares_room_with( self, user_ids: Collection[str] ) -> Set[str]: @@ -705,13 +657,6 @@ async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]: If a remote user only returns rooms this server is currently participating in. """ - rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate( - (user_id,), - None, - update_metrics=False, - ) - if rooms: - return frozenset(r.room_id for r in rooms) room_ids = await self.db_pool.simple_select_onecol( table="current_state_events", diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 674dd4fb54..77aafa492e 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -210,7 +210,6 @@ def test_unknown_room_version(self) -> None: ) # Blow away caches (supported room versions can only change due to a restart). - self.store.get_rooms_for_user_with_stream_ordering.invalidate_all() self.store.get_rooms_for_user.invalidate_all() self.store._get_event_cache.clear() self.store._event_ref.clear() diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py index a56f1e2d5d..5426255924 100644 --- a/tests/replication/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -30,18 +30,16 @@ from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, make_event_from_dict from synapse.events.snapshot import EventContext -from synapse.handlers.room import RoomEventSource from synapse.server import HomeServer from synapse.storage.databases.main.event_push_actions import ( NotifCounts, RoomNotifCounts, ) from synapse.storage.databases.main.events_worker import EventsWorkerStore -from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser +from synapse.storage.roommember import RoomsForUser from synapse.types import PersistedEventPosition from synapse.util import Clock -from tests.server import FakeTransport from ._base import BaseWorkerStoreTestCase @@ -221,125 +219,6 @@ def test_push_actions_for_user(self, send_receipt: bool) -> None: ), ) - def test_get_rooms_for_user_with_stream_ordering(self) -> None: - """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated - by rows in the events stream - """ - self.persist(type="m.room.create", key="", creator=USER_ID) - self.persist(type="m.room.member", key=USER_ID, membership="join") - self.replicate() - self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) - - j2 = self.persist( - type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" - ) - assert j2.internal_metadata.instance_name is not None - assert j2.internal_metadata.stream_ordering is not None - self.replicate() - - expected_pos = PersistedEventPosition( - j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering - ) - self.check( - "get_rooms_for_user_with_stream_ordering", - (USER_ID_2,), - {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)}, - ) - - def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist( - self, - ) -> None: - """Check that current_state invalidation happens correctly with multiple events - in the persistence batch. - - This test attempts to reproduce a race condition between the event persistence - loop and a worker-based Sync handler. - - The problem occurred when the master persisted several events in one batch. It - only updates the current_state at the end of each batch, so the obvious thing - to do is then to issue a current_state_delta stream update corresponding to the - last stream_id in the batch. - - However, that raises the possibility that a worker will see the replication - notification for a join event before the current_state caches are invalidated. - - The test involves: - * creating a join and a message event for a user, and persisting them in the - same batch - - * controlling the replication stream so that updates are sent gradually - - * between each bunch of replication updates, check that we see a consistent - snapshot of the state. - """ - self.persist(type="m.room.create", key="", creator=USER_ID) - self.persist(type="m.room.member", key=USER_ID, membership="join") - self.replicate() - self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) - - # limit the replication rate - repl_transport = self._server_transport - assert isinstance(repl_transport, FakeTransport) - repl_transport.autoflush = False - - # build the join and message events and persist them in the same batch. - logger.info("----- build test events ------") - j2, j2ctx = self.build_event( - type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" - ) - msg, msgctx = self.build_event() - self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)])) - self.replicate() - assert j2.internal_metadata.instance_name is not None - assert j2.internal_metadata.stream_ordering is not None - - event_source = RoomEventSource(self.hs) - event_source.store = self.worker_store - current_token = event_source.get_current_key() - - # gradually stream out the replication - while repl_transport.buffer: - logger.info("------ flush ------") - repl_transport.flush(30) - self.pump(0) - - prev_token = current_token - current_token = event_source.get_current_key() - - # attempt to replicate the behaviour of the sync handler. - # - # First, we get a list of the rooms we are joined to - joined_rooms = self.get_success( - self.worker_store.get_rooms_for_user_with_stream_ordering(USER_ID_2) - ) - - # Then, we get a list of the events since the last sync - membership_changes = self.get_success( - self.worker_store.get_membership_changes_for_user( - USER_ID_2, prev_token, current_token - ) - ) - - logger.info( - "%s->%s: joined_rooms=%r membership_changes=%r", - prev_token, - current_token, - joined_rooms, - membership_changes, - ) - - # the membership change is only any use to us if the room is in the - # joined_rooms list. - if membership_changes: - expected_pos = PersistedEventPosition( - j2.internal_metadata.instance_name, - j2.internal_metadata.stream_ordering, - ) - self.assertEqual( - joined_rooms, - {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)}, - ) - event_id = 0 def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase: From 24444a4762391f29eeacaffbbdca0906d64f21b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Jul 2024 17:25:24 +0100 Subject: [PATCH 06/10] Lint --- synapse/storage/databases/main/roommember.py | 6 +----- tests/replication/storage/test_events.py | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index fc8a37e295..5d2fd08495 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -50,11 +50,7 @@ from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import Sqlite3Engine -from synapse.storage.roommember import ( - MemberSummary, - ProfileInfo, - RoomsForUser, -) +from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser from synapse.types import ( JsonDict, PersistedEventPosition, diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py index 5426255924..1afe523d02 100644 --- a/tests/replication/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -40,7 +40,6 @@ from synapse.types import PersistedEventPosition from synapse.util import Clock - from ._base import BaseWorkerStoreTestCase USER_ID = "@feeling:test" From d15d8ab8825f9f733d58cab7d399827ea24dd5fd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Jul 2024 09:36:32 +0100 Subject: [PATCH 07/10] Have _filter_results handle null instance_name --- synapse/storage/databases/main/stream.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index c635dbf96a..1c7af434c0 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -371,7 +371,7 @@ def _make_generic_sql_bound( def _filter_results( lower_token: Optional[RoomStreamToken], upper_token: Optional[RoomStreamToken], - instance_name: str, + instance_name: Optional[str], topological_ordering: int, stream_ordering: int, ) -> bool: @@ -384,8 +384,14 @@ def _filter_results( position maps, which we handle by fetching more than necessary from the DB and then filtering (rather than attempting to construct a complicated SQL query). + + The `instance_name` arg is optional to handle historic rows, and is + interpreted as if it was "master". """ + if instance_name is None: + instance_name = "master" + event_historical_tuple = ( topological_ordering, stream_ordering, From 5f1612eb06c38c8be5996fa45cfc9e2a78828f66 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Jul 2024 09:43:48 +0100 Subject: [PATCH 08/10] Have _filter_results_by_stream handle null instance_name --- synapse/storage/databases/main/stream.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 1c7af434c0..f7a9a26c8d 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -426,7 +426,7 @@ def _filter_results( def _filter_results_by_stream( lower_token: Optional[RoomStreamToken], upper_token: Optional[RoomStreamToken], - instance_name: str, + instance_name: Optional[str], stream_ordering: int, ) -> bool: """ @@ -442,7 +442,14 @@ def _filter_results_by_stream( position maps, which we handle by fetching more than necessary from the DB and then filtering (rather than attempting to construct a complicated SQL query). + + The `instance_name` arg is optional to handle historic rows, and is + interpreted as if it was "master". """ + + if instance_name is None: + instance_name = "master" + if lower_token: assert lower_token.topological is None From fb2fc478a0242b8157fe813ed827279461676721 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Jul 2024 09:46:07 +0100 Subject: [PATCH 09/10] Update synapse/storage/databases/main/events_worker.py Co-authored-by: Eric Eastwood --- synapse/storage/databases/main/events_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 761e393695..4d4877c4c3 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1457,6 +1457,7 @@ def _fetch_event_rows( event_dict[event_id] = _EventRow( event_id=event_id, stream_ordering=row[1], + # If instance_name is null we default to "master" instance_name=row[2] or "master", internal_metadata=row[3], json=row[4], From c8f1411da3803beb8bb9df25288f3a851700a770 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Jul 2024 16:46:13 +0100 Subject: [PATCH 10/10] Remove assertions --- synapse/storage/databases/main/stream.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index f7a9a26c8d..f2544dcc1f 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -925,7 +925,6 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]: prev_sender, ) in txn: assert room_id is not None - assert instance_name is not None assert stream_ordering is not None if _filter_results_by_stream( @@ -970,10 +969,7 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]: instance_name=prev_instance_name or "master", stream=prev_stream_ordering, ) - if ( - prev_instance_name is not None - and prev_stream_ordering is not None - ) + if (prev_stream_ordering is not None) else None ), prev_membership=prev_membership,