From 78faa1a6343f47feab1d58ce60898e0cac8794d6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 11:14:14 +0100 Subject: [PATCH 01/21] Rename StateGroupStorage --- synapse/storage/__init__.py | 4 ++-- synapse/storage/state.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 105e4e1fec1b..3dba1fee8dfc 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -32,7 +32,7 @@ from synapse.storage.databases.main import DataStore from synapse.storage.persist_events import EventsPersistenceStorage from synapse.storage.purge_events import PurgeEventsStorage -from synapse.storage.state import StateGroupStorage +from synapse.storage.state import StateStorage if TYPE_CHECKING: from synapse.server import HomeServer @@ -51,7 +51,7 @@ def __init__(self, hs: "HomeServer", stores: Databases): self.main = stores.main self.purge_events = PurgeEventsStorage(hs, stores) - self.state = StateGroupStorage(hs, stores) + self.state = StateStorage(hs, stores) self.persistence = None if stores.persist_events: diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ab630953ac93..0bd0b87dec32 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -580,8 +580,10 @@ def must_await_full_state(self, is_mine_id: Callable[[str], bool]) -> bool: _NONE_STATE_FILTER = StateFilter(types=frozendict(), include_others=False) -class StateGroupStorage: - """High level interface to fetching state for event.""" +class StateStorage: + """High level interface to fetching state for an event, or the current state + in a room. + """ def __init__(self, hs: "HomeServer", stores: "Databases"): self._is_mine_id = hs.is_mine_id From 334844d430487d29e46c9bad245afff8d6a1400e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 11:19:00 +0100 Subject: [PATCH 02/21] Add fetching current state funcs to StateStorage --- synapse/handlers/directory.py | 5 +- synapse/storage/databases/main/state.py | 24 ------- synapse/storage/state.py | 87 +++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 4aa33df884ac..001878be240e 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -45,6 +45,7 @@ def __init__(self, hs: "HomeServer"): self.appservice_handler = hs.get_application_service_handler() self.event_creation_handler = hs.get_event_creation_handler() self.store = hs.get_datastores().main + self.storage = hs.get_storage() self.config = hs.config self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search self.require_membership = hs.config.server.require_membership_for_aliases @@ -463,7 +464,9 @@ async def edit_published_room_list( making_public = visibility == "public" if making_public: room_aliases = await self.store.get_aliases_for_room(room_id) - canonical_alias = await self.store.get_canonical_alias_for_room(room_id) + canonical_alias = await self.storage.state.get_canonical_alias_for_room( + room_id + ) if canonical_alias: room_aliases.append(canonical_alias) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 18ae8aee295d..628ab64dc769 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -269,30 +269,6 @@ def _get_filtered_current_state_ids_txn( "get_filtered_current_state_ids", _get_filtered_current_state_ids_txn ) - async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]: - """Get canonical alias for room, if any - - Args: - room_id: The room ID - - Returns: - The canonical alias, if any - """ - - state = await self.get_filtered_current_state_ids( - room_id, StateFilter.from_types([(EventTypes.CanonicalAlias, "")]) - ) - - event_id = state.get((EventTypes.CanonicalAlias, "")) - if not event_id: - return None - - event = await self.get_event(event_id, allow_none=True) - if not event: - return None - - return event.content.get("canonical_alias") - @cached(max_entries=50000) async def _get_state_group_for_event(self, event_id: str) -> Optional[int]: return await self.db_pool.simple_select_one_onecol( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0bd0b87dec32..55d255ac2551 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,7 @@ import logging from typing import ( TYPE_CHECKING, + Any, Awaitable, Callable, Collection, @@ -895,3 +896,89 @@ async def store_state_group( return await self.stores.state.store_state_group( event_id, room_id, prev_group, delta_ids, current_state_ids ) + + async def get_current_state_ids(self, room_id: str) -> StateMap[str]: + """Get the current state event ids for a room based on the + current_state_events table. + + Args: + room_id: The room to get the state IDs of. + + Returns: + The current state of the room. + """ + + return await self.stores.main.get_current_state_ids(room_id) + + async def get_filtered_current_state_ids( + self, room_id: str, state_filter: Optional[StateFilter] = None + ) -> StateMap[str]: + """Get the current state event of a given type for a room based on the + current_state_events table. This may not be as up-to-date as the result + of doing a fresh state resolution as per state_handler.get_current_state + + Args: + room_id + state_filter: The state filter used to fetch state + from the database. + + Returns: + Map from type/state_key to event ID. + """ + return await self.stores.main.get_filtered_current_state_ids( + room_id, state_filter + ) + + async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]: + """Get canonical alias for room, if any + + Args: + room_id: The room ID + + Returns: + The canonical alias, if any + """ + + state = await self.get_filtered_current_state_ids( + room_id, StateFilter.from_types([(EventTypes.CanonicalAlias, "")]) + ) + + event_id = state.get((EventTypes.CanonicalAlias, "")) + if not event_id: + return None + + event = await self.stores.main.get_event(event_id, allow_none=True) + if not event: + return None + + return event.content.get("canonical_alias") + + async def get_current_state_deltas( + self, prev_stream_id: int, max_stream_id: int + ) -> Tuple[int, List[Dict[str, Any]]]: + """Fetch a list of room state changes since the given stream id + + Each entry in the result contains the following fields: + - stream_id (int) + - room_id (str) + - type (str): event type + - state_key (str): + - event_id (str|None): new event_id for this state key. None if the + state has been deleted. + - prev_event_id (str|None): previous event_id for this state key. None + if it's new state. + + Args: + prev_stream_id: point to get changes since (exclusive) + max_stream_id: the point that we know has been correctly persisted + - ie, an upper limit to return changes from. + + Returns: + A tuple consisting of: + - the stream id which these results go up to + - list of current_state_delta_stream rows. If it is empty, we are + up to date. + """ + return await self.stores.main.get_current_state_deltas( + prev_stream_id, max_stream_id + ) From fe8691517b4915a5af64cba4bab7ba34259fc92d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 11:27:01 +0100 Subject: [PATCH 03/21] Use StateStorage.get_current_state_ids --- synapse/federation/federation_server.py | 2 +- synapse/handlers/device.py | 2 +- synapse/handlers/federation.py | 2 +- synapse/handlers/room_list.py | 3 ++- synapse/handlers/room_member.py | 3 ++- synapse/handlers/room_summary.py | 7 ++++--- synapse/handlers/sync.py | 8 ++++---- synapse/module_api/__init__.py | 3 ++- synapse/push/mailer.py | 2 +- synapse/rest/admin/rooms.py | 3 ++- synapse/storage/state.py | 10 ++++++++-- 11 files changed, 28 insertions(+), 17 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index b8232e5257d2..21747794651f 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1207,7 +1207,7 @@ async def check_server_matches_acl(self, server_name: str, room_id: str) -> None Raises: AuthError if the server does not match the ACL """ - state_ids = await self.store.get_current_state_ids(room_id) + state_ids = await self.storage.state.get_current_state_ids(room_id) acl_event_id = state_ids.get((EventTypes.ServerACL, "")) if not acl_event_id: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index b21e46986543..1c7b758abc4c 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -165,7 +165,7 @@ async def get_user_ids_changed( possibly_changed = set(changed) possibly_left = set() for room_id in rooms_changed: - current_state_ids = await self.store.get_current_state_ids(room_id) + current_state_ids = await self.state_storage.get_current_state_ids(room_id) # The user may have left the room # TODO: Check if they actually did or if we were just invited. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c8233270d72c..de4527b2517d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -731,7 +731,7 @@ async def on_make_join_request( # Note that this requires the /send_join request to come back to the # same server. if room_version.msc3083_join_rules: - state_ids = await self.store.get_current_state_ids(room_id) + state_ids = await self.state_storage.get_current_state_ids(room_id) if await self._event_auth_handler.has_restricted_join_rules( state_ids, room_version ): diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index f3577b5d5aec..4246c9be3f5b 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -50,6 +50,7 @@ class RoomListHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main + self._storage = hs.get_storage() self.hs = hs self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search self.response_cache: ResponseCache[ @@ -274,7 +275,7 @@ async def generate_room_entry( if aliases: result["aliases"] = aliases - current_state_ids = await self.store.get_current_state_ids( + current_state_ids = await self._storage.state.get_current_state_ids( room_id, on_invalidate=cache_context.invalidate ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 00662dc96114..d41a291610e2 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -68,6 +68,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastores().main + self.storage = hs.get_storage() self.auth = hs.get_auth() self.state_handler = hs.get_state_handler() self.config = hs.config @@ -994,7 +995,7 @@ async def _should_perform_remote_join( # If the host is in the room, but not one of the authorised hosts # for restricted join rules, a remote join must be used. room_version = await self.store.get_room_version(room_id) - current_state_ids = await self.store.get_current_state_ids(room_id) + current_state_ids = await self.storage.state.get_current_state_ids(room_id) # If restricted join rules are not being used, a local join can always # be used. diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index 1dd74912fa95..f512f1447d53 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -90,6 +90,7 @@ class RoomSummaryHandler: def __init__(self, hs: "HomeServer"): self._event_auth_handler = hs.get_event_auth_handler() self._store = hs.get_datastores().main + self._storage = hs.get_storage() self._event_serializer = hs.get_event_client_serializer() self._server_name = hs.hostname self._federation_client = hs.get_federation_client() @@ -537,7 +538,7 @@ async def _is_local_room_accessible( Returns: True if the room is accessible to the requesting user or server. """ - state_ids = await self._store.get_current_state_ids(room_id) + state_ids = await self._storage.state.get_current_state_ids(room_id) # If there's no state for the room, it isn't known. if not state_ids: @@ -702,7 +703,7 @@ async def _build_room_entry(self, room_id: str, for_federation: bool) -> JsonDic # there should always be an entry assert stats is not None, "unable to retrieve stats for %s" % (room_id,) - current_state_ids = await self._store.get_current_state_ids(room_id) + current_state_ids = await self._storage.state.get_current_state_ids(room_id) create_event = await self._store.get_event( current_state_ids[(EventTypes.Create, "")] ) @@ -763,7 +764,7 @@ async def _get_child_events(self, room_id: str) -> Iterable[EventBase]: """ # look for child rooms/spaces. - current_state_ids = await self._store.get_current_state_ids(room_id) + current_state_ids = await self._storage.state.get_current_state_ids(room_id) events = await self._store.get_events_as_list( [ diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c5c538e0c35e..bf8b9efe035d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -506,8 +506,8 @@ async def _load_filtered_recents( # ensure that we always include current state in the timeline current_state_ids: FrozenSet[str] = frozenset() if any(e.is_state() for e in recents): - current_state_ids_map = await self.store.get_current_state_ids( - room_id + current_state_ids_map = ( + await self.state_storage.get_current_state_ids(room_id) ) current_state_ids = frozenset(current_state_ids_map.values()) @@ -574,8 +574,8 @@ async def _load_filtered_recents( # ensure that we always include current state in the timeline current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): - current_state_ids_map = await self.store.get_current_state_ids( - room_id + current_state_ids_map = ( + await self.state_storage.get_current_state_ids(room_id) ) current_state_ids = frozenset(current_state_ids_map.values()) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 95f3b2792793..884faac14c7f 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -196,6 +196,7 @@ def __init__(self, hs: "HomeServer", auth_handler: AuthHandler) -> None: self._store: Union[ DataStore, "GenericWorkerSlavedStore" ] = hs.get_datastores().main + self._storage = hs.get_storage() self._auth = hs.get_auth() self._auth_handler = auth_handler self._server_name = hs.hostname @@ -1301,7 +1302,7 @@ async def get_room_state( # to get_filtered_current_state_ids above, with `state_filter = StateFilter.all()`, # but get_filtered_current_state_ids isn't cached and `get_current_state_ids` # is, so using the latter when we can is better for perf. - state_ids = await self._store.get_current_state_ids(room_id) + state_ids = await self._storage.state.get_current_state_ids(room_id) state_events = await self._store.get_events(state_ids.values()) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 84124af96527..929aafb81517 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -255,7 +255,7 @@ async def send_notification_mail( user_display_name = user_id async def _fetch_room_state(room_id: str) -> None: - room_state = await self.store.get_current_state_ids(room_id) + room_state = await self.state_storage.get_current_state_ids(room_id) state_by_room[room_id] = room_state # Run at most 3 of these at once: sync does 10 at a time but email diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 356d6f74d7ef..4447a543c911 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -418,6 +418,7 @@ class RoomStateRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() self.store = hs.get_datastores().main + self._storage = hs.get_storage() self.clock = hs.get_clock() self._event_serializer = hs.get_event_client_serializer() @@ -430,7 +431,7 @@ async def on_GET( if not ret: raise NotFoundError("Room not found") - event_ids = await self.store.get_current_state_ids(room_id) + event_ids = await self._storage.state.get_current_state_ids(room_id) events = await self.store.get_events(event_ids.values()) now = self.clock.time_msec() room_state = self._event_serializer.serialize_events(events.values(), now) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 55d255ac2551..1cd6aacc39b1 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -897,18 +897,24 @@ async def store_state_group( event_id, room_id, prev_group, delta_ids, current_state_ids ) - async def get_current_state_ids(self, room_id: str) -> StateMap[str]: + async def get_current_state_ids( + self, room_id: str, on_invalidate: Optional[Callable[[], None]] = None + ) -> StateMap[str]: """Get the current state event ids for a room based on the current_state_events table. Args: room_id: The room to get the state IDs of. + on_invalidate: Callback for when the `get_current_state_ids` cache + for the room gets invalidated. Returns: The current state of the room. """ - return await self.stores.main.get_current_state_ids(room_id) + return await self.stores.main.get_current_state_ids( + room_id, on_invalidate=on_invalidate + ) async def get_filtered_current_state_ids( self, room_id: str, state_filter: Optional[StateFilter] = None From a2465b8139ea372026e539d471b6db382ee6dbd7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 11:36:27 +0100 Subject: [PATCH 04/21] Use StateStorage.get_filtered_current_state_ids --- synapse/events/third_party_rules.py | 3 ++- synapse/handlers/message.py | 2 +- synapse/handlers/register.py | 3 ++- synapse/handlers/room.py | 9 ++++++--- synapse/module_api/__init__.py | 4 ++-- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index 9f4ff9799c00..82e171b5b5f1 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -152,6 +152,7 @@ def __init__(self, hs: "HomeServer"): self.third_party_rules = None self.store = hs.get_datastores().main + self._storage = hs.get_storage() self._check_event_allowed_callbacks: List[CHECK_EVENT_ALLOWED_CALLBACK] = [] self._on_create_room_callbacks: List[ON_CREATE_ROOM_CALLBACK] = [] @@ -463,7 +464,7 @@ async def _get_state_map_for_room(self, room_id: str) -> StateMap[EventBase]: Returns: A dict mapping (event type, state key) to state event. """ - state_ids = await self.store.get_filtered_current_state_ids(room_id) + state_ids = await self._storage.state.get_current_state_ids(room_id) room_state_events = await self.store.get_events(state_ids.values()) state_events = {} diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9501e7f1b7de..fec983944556 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -228,7 +228,7 @@ async def get_state_events( ) if membership == Membership.JOIN: - state_ids = await self.store.get_filtered_current_state_ids( + state_ids = await self.state_storage.get_filtered_current_state_ids( room_id, state_filter=state_filter ) room_state = await self.store.get_events(state_ids.values()) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 05bb1e0225c3..e7acf787f203 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -87,6 +87,7 @@ class LoginDict(TypedDict): class RegistrationHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main + self._storage = hs.get_storage() self.clock = hs.get_clock() self.hs = hs self.auth = hs.get_auth() @@ -528,7 +529,7 @@ async def _join_rooms(self, user_id: str) -> None: if requires_invite: # If the server is in the room, check if the room is public. - state = await self.store.get_filtered_current_state_ids( + state = await self._storage.state.get_filtered_current_state_ids( room_id, StateFilter.from_types([(EventTypes.JoinRules, "")]) ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e2775b34f10b..787c564a3094 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -107,6 +107,7 @@ class EventContext: class RoomCreationHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main + self._storage = hs.get_storage() self.auth = hs.get_auth() self.clock = hs.get_clock() self.hs = hs @@ -481,7 +482,7 @@ async def clone_existing_room( if room_type == RoomTypes.SPACE: types_to_copy.append((EventTypes.SpaceChild, None)) - old_room_state_ids = await self.store.get_filtered_current_state_ids( + old_room_state_ids = await self._storage.state.get_filtered_current_state_ids( old_room_id, StateFilter.from_types(types_to_copy) ) # map from event_id to BaseEvent @@ -559,8 +560,10 @@ async def clone_existing_room( ) # Transfer membership events - old_room_member_state_ids = await self.store.get_filtered_current_state_ids( - old_room_id, StateFilter.from_types([(EventTypes.Member, None)]) + old_room_member_state_ids = ( + await self._storage.state.get_filtered_current_state_ids( + old_room_id, StateFilter.from_types([(EventTypes.Member, None)]) + ) ) # map from event_id to BaseEvent diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 884faac14c7f..0da627b7a7a9 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -914,7 +914,7 @@ def get_state_events_in_room( The filtered state events in the room. """ state_ids = yield defer.ensureDeferred( - self._store.get_filtered_current_state_ids( + self._storage.state.get_filtered_current_state_ids( room_id=room_id, state_filter=StateFilter.from_types(types) ) ) @@ -1293,7 +1293,7 @@ async def get_room_state( # If a filter was provided, turn it into a StateFilter and retrieve a filtered # view of the state. state_filter = StateFilter.from_types(event_filter) - state_ids = await self._store.get_filtered_current_state_ids( + state_ids = await self._storage.state.get_filtered_current_state_ids( room_id, state_filter, ) From a2945a591a5b530fe05f0687c1a0ffb965b19306 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 11:52:32 +0100 Subject: [PATCH 05/21] Use StateStorage.get_current_state_deltas --- synapse/handlers/presence.py | 3 ++- synapse/handlers/stats.py | 3 ++- synapse/handlers/user_directory.py | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index dd84e6c88b6c..5b4c6dde3907 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -134,6 +134,7 @@ class BasePresenceHandler(abc.ABC): def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.store = hs.get_datastores().main + self._storage = hs.get_storage() self.presence_router = hs.get_presence_router() self.state = hs.get_state_handler() self.is_mine_id = hs.is_mine_id @@ -1346,7 +1347,7 @@ async def _unsafe_process(self) -> None: self._event_pos, room_max_stream_ordering, ) - max_pos, deltas = await self.store.get_current_state_deltas( + max_pos, deltas = await self._storage.state.get_current_state_deltas( self._event_pos, room_max_stream_ordering ) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 436cd971cedd..2950f9fc30e7 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -40,6 +40,7 @@ class StatsHandler: def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastores().main + self._storage = hs.get_storage() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() @@ -105,7 +106,7 @@ async def _unsafe_process(self) -> None: logger.debug( "Processing room stats %s->%s", self.pos, room_max_stream_ordering ) - max_pos, deltas = await self.store.get_current_state_deltas( + max_pos, deltas = await self._storage.state.get_current_state_deltas( self.pos, room_max_stream_ordering ) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 74f7fdfe6ce5..6629697c3b5e 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -56,6 +56,7 @@ def __init__(self, hs: "HomeServer"): super().__init__(hs) self.store = hs.get_datastores().main + self._storage = hs.get_storage() self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() @@ -174,7 +175,7 @@ async def _unsafe_process(self) -> None: logger.debug( "Processing user stats %s->%s", self.pos, room_max_stream_ordering ) - max_pos, deltas = await self.store.get_current_state_deltas( + max_pos, deltas = await self._storage.state.get_current_state_deltas( self.pos, room_max_stream_ordering ) From e786f683adebfea350c4a7d942199298ed4903d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 11:54:55 +0100 Subject: [PATCH 06/21] Rename to get_partial_current_state_ids --- synapse/storage/_base.py | 2 +- synapse/storage/databases/main/state.py | 10 ++++++---- synapse/storage/persist_events.py | 4 ++-- synapse/storage/state.py | 2 +- tests/handlers/test_federation.py | 6 ++++-- tests/handlers/test_federation_event.py | 4 +++- tests/rest/client/test_upgrade_room.py | 8 ++++++-- 7 files changed, 23 insertions(+), 13 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 8df80664a27e..57bd74700e0c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -77,7 +77,7 @@ def _invalidate_state_caches( # Purge other caches based on room state. self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) - self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,)) + self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) def _attempt_to_invalidate_cache( self, cache_name: str, key: Optional[Collection[Any]] diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 628ab64dc769..8d22cc9bc0d7 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -177,7 +177,7 @@ async def get_create_event_for_room(self, room_id: str) -> EventBase: Raises: NotFoundError if the room is unknown """ - state_ids = await self.get_current_state_ids(room_id) + state_ids = await self.get_partial_current_state_ids(room_id) if not state_ids: raise NotFoundError(f"Current state for room {room_id} is empty") @@ -193,10 +193,12 @@ async def get_create_event_for_room(self, room_id: str) -> EventBase: return create_event @cached(max_entries=100000, iterable=True) - async def get_current_state_ids(self, room_id: str) -> StateMap[str]: + async def get_partial_current_state_ids(self, room_id: str) -> StateMap[str]: """Get the current state event ids for a room based on the current_state_events table. + This may be the partial state if we're lazy joining the room. + Args: room_id: The room to get the state IDs of. @@ -215,7 +217,7 @@ def _get_current_state_ids_txn(txn: LoggingTransaction) -> StateMap[str]: return {(intern_string(r[0]), intern_string(r[1])): r[2] for r in txn} return await self.db_pool.runInteraction( - "get_current_state_ids", _get_current_state_ids_txn + "get_partial_current_state_ids", _get_current_state_ids_txn ) # FIXME: how should this be cached? @@ -241,7 +243,7 @@ async def get_filtered_current_state_ids( if not where_clause: # We delegate to the cached version - return await self.get_current_state_ids(room_id) + return await self.get_partial_current_state_ids(room_id) def _get_filtered_current_state_ids_txn( txn: LoggingTransaction, diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 0fc282866bc5..a5e423d901dd 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -994,7 +994,7 @@ async def _calculate_state_delta( Assumes that we are only persisting events for one room at a time. """ - existing_state = await self.main_store.get_current_state_ids(room_id) + existing_state = await self.main_store.get_partial_current_state_ids(room_id) to_delete = [key for key in existing_state if key not in current_state] @@ -1083,7 +1083,7 @@ async def _is_server_still_joined( # The server will leave the room, so we go and find out which remote # users will still be joined when we leave. if current_state is None: - current_state = await self.main_store.get_current_state_ids(room_id) + current_state = await self.main_store.get_partial_current_state_ids(room_id) current_state = dict(current_state) for key in delta.to_delete: current_state.pop(key, None) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1cd6aacc39b1..4f1384078756 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -912,7 +912,7 @@ async def get_current_state_ids( The current state of the room. """ - return await self.stores.main.get_current_state_ids( + return await self.stores.main.get_partial_current_state_ids( room_id, on_invalidate=on_invalidate ) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index bef6c2b77609..3fc43a71d0cf 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -237,7 +237,9 @@ def test_backfill_with_many_backward_extremities(self) -> None: ) current_state = self.get_success( self.store.get_events_as_list( - (self.get_success(self.store.get_current_state_ids(room_id))).values() + ( + self.get_success(self.store.get_partial_current_state_ids(room_id)) + ).values() ) ) @@ -506,7 +508,7 @@ def _build_and_send_join_event( self.get_success(d) # sanity-check: the room should show that the new user is a member - r = self.get_success(self.store.get_current_state_ids(room_id)) + r = self.get_success(self.store.get_partial_current_state_ids(room_id)) self.assertEqual(r[(EventTypes.Member, other_user)], join_event.event_id) return join_event diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index e64b28f28b86..32fa04abc983 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -91,7 +91,9 @@ def _test_process_pulled_event_with_missing_state( event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join") ) - initial_state_map = self.get_success(main_store.get_current_state_ids(room_id)) + initial_state_map = self.get_success( + main_store.get_partial_current_state_ids(room_id) + ) auth_event_ids = [ initial_state_map[("m.room.create", "")], diff --git a/tests/rest/client/test_upgrade_room.py b/tests/rest/client/test_upgrade_room.py index a21cbe9fa874..98c1039d33a7 100644 --- a/tests/rest/client/test_upgrade_room.py +++ b/tests/rest/client/test_upgrade_room.py @@ -249,7 +249,9 @@ def test_space(self) -> None: new_space_id = channel.json_body["replacement_room"] - state_ids = self.get_success(self.store.get_current_state_ids(new_space_id)) + state_ids = self.get_success( + self.store.get_partial_current_state_ids(new_space_id) + ) # Ensure the new room is still a space. create_event = self.get_success( @@ -284,7 +286,9 @@ def test_custom_room_type(self) -> None: new_room_id = channel.json_body["replacement_room"] - state_ids = self.get_success(self.store.get_current_state_ids(new_room_id)) + state_ids = self.get_success( + self.store.get_partial_current_state_ids(new_room_id) + ) # Ensure the new room is the same type as the old room. create_event = self.get_success( From 3f74b378ddb46a26ec66482437d9fc1222d8e30e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 11:58:01 +0100 Subject: [PATCH 07/21] Rename to get_partial_filtered_current_state_ids --- synapse/storage/databases/main/state.py | 4 +++- synapse/storage/databases/main/user_directory.py | 4 +++- synapse/storage/state.py | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 8d22cc9bc0d7..755a18409d2d 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -221,13 +221,15 @@ def _get_current_state_ids_txn(txn: LoggingTransaction) -> StateMap[str]: ) # FIXME: how should this be cached? - async def get_filtered_current_state_ids( + async def get_partial_filtered_current_state_ids( self, room_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[str]: """Get the current state event of a given type for a room based on the current_state_events table. This may not be as up-to-date as the result of doing a fresh state resolution as per state_handler.get_current_state + This may be the partial state if we're lazy joining the room. + Args: room_id state_filter: The state filter used to fetch state diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 028db69af301..b649816c27fb 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -441,7 +441,9 @@ async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> boo (EventTypes.RoomHistoryVisibility, ""), ) - current_state_ids = await self.get_filtered_current_state_ids( # type: ignore[attr-defined] + # Getting the partial state is fine, as we're not looking at membership + # events. + current_state_ids = await self.get_partial_filtered_current_state_ids( # type: ignore[attr-defined] room_id, StateFilter.from_types(types_to_filter) ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 4f1384078756..4b6ee9b39f30 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -931,7 +931,7 @@ async def get_filtered_current_state_ids( Returns: Map from type/state_key to event ID. """ - return await self.stores.main.get_filtered_current_state_ids( + return await self.stores.main.get_partial_filtered_current_state_ids( room_id, state_filter ) From c068581aba42697d7eba400171f7e1a2ef286803 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 12:00:26 +0100 Subject: [PATCH 08/21] Rename to get_partial_current_state_deltas --- synapse/storage/databases/main/state_deltas.py | 4 +++- synapse/storage/state.py | 2 +- tests/handlers/test_typing.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py index 188afec332dd..445213e12aaf 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py @@ -27,7 +27,7 @@ class StateDeltasStore(SQLBaseStore): # attribute. TODO: can we get static analysis to enforce this? _curr_state_delta_stream_cache: StreamChangeCache - async def get_current_state_deltas( + async def get_partial_current_state_deltas( self, prev_stream_id: int, max_stream_id: int ) -> Tuple[int, List[Dict[str, Any]]]: """Fetch a list of room state changes since the given stream id @@ -42,6 +42,8 @@ async def get_current_state_deltas( - prev_event_id (str|None): previous event_id for this state key. None if it's new state. + This may be the partial state if we're lazy joining the room. + Args: prev_stream_id: point to get changes since (exclusive) max_stream_id: the point that we know has been correctly persisted diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 4b6ee9b39f30..516de7231c4d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -985,6 +985,6 @@ async def get_current_state_deltas( - list of current_state_delta_stream rows. If it is empty, we are up to date. """ - return await self.stores.main.get_current_state_deltas( + return await self.stores.main.get_partial_current_state_deltas( prev_stream_id, max_stream_id ) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 5f2e26a5fce7..c7d9cf7930d0 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -145,7 +145,7 @@ async def get_users_in_room(room_id: str): ) ) - self.datastore.get_current_state_deltas = Mock(return_value=(0, None)) + self.datastore.get_partial_current_state_deltas = Mock(return_value=(0, None)) self.datastore.get_to_device_stream_token = lambda: 0 self.datastore.get_new_device_msgs_for_remote = ( From 6f386d1513ee717c3bd6a8bbfaf01214a176876c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 13:43:20 +0100 Subject: [PATCH 09/21] Block calls to `get_current_state_ids` and co. when have partial state --- synapse/handlers/federation.py | 1 + synapse/storage/databases/main/room.py | 13 ++++ synapse/storage/state.py | 19 +++++- .../util/partial_state_events_tracker.py | 60 +++++++++++++++++++ 4 files changed, 92 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index de4527b2517d..2afb24fa8703 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1488,6 +1488,7 @@ async def _sync_partial_state_room( success = await self.store.clear_partial_state_room(room_id) if success: logger.info("State resync complete for %s", room_id) + self.storage.state.notify_room_un_partial_stated(room_id) # TODO(faster_joins) update room stats and user directory? return diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 10f2ceb50b95..4fca1388b8d7 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1112,6 +1112,19 @@ def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None keyvalues={"room_id": room_id}, ) + async def is_room_got_partial_state(self, room_id: str) -> bool: + "Whether the given room only has partial state stored" + + entry = await self.db_pool.simple_select_one_onecol( + table="partial_state_rooms", + keyvalues={"room_id": room_id}, + retcol="room_id", + allow_none=True, + desc="is_room_got_partial_state", + ) + + return entry is not None + class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 516de7231c4d..55cd334e3c98 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -34,7 +34,10 @@ from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.storage.util.partial_state_events_tracker import PartialStateEventsTracker +from synapse.storage.util.partial_state_events_tracker import ( + PartialCurrentStateTracker, + PartialStateEventsTracker, +) from synapse.types import MutableStateMap, StateKey, StateMap if TYPE_CHECKING: @@ -590,10 +593,18 @@ def __init__(self, hs: "HomeServer", stores: "Databases"): self._is_mine_id = hs.is_mine_id self.stores = stores self._partial_state_events_tracker = PartialStateEventsTracker(stores.main) + self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main) def notify_event_un_partial_stated(self, event_id: str) -> None: self._partial_state_events_tracker.notify_un_partial_stated(event_id) + def notify_room_un_partial_stated(self, room_id: str) -> None: + """Notify that the room no longer has any partial state. + + Must be called after `clear_partial_state_room` + """ + self._partial_state_room_tracker.notify_un_partial_stated(room_id) + async def get_state_group_delta( self, state_group: int ) -> Tuple[Optional[int], Optional[StateMap[str]]]: @@ -911,6 +922,7 @@ async def get_current_state_ids( Returns: The current state of the room. """ + await self._partial_state_room_tracker.await_full_state(room_id) return await self.stores.main.get_partial_current_state_ids( room_id, on_invalidate=on_invalidate @@ -931,6 +943,9 @@ async def get_filtered_current_state_ids( Returns: Map from type/state_key to event ID. """ + if not state_filter or state_filter.must_await_full_state(self._is_mine_id): + await self._partial_state_room_tracker.await_full_state(room_id) + return await self.stores.main.get_partial_filtered_current_state_ids( room_id, state_filter ) @@ -985,6 +1000,8 @@ async def get_current_state_deltas( - list of current_state_delta_stream rows. If it is empty, we are up to date. """ + # FIXME(faster room joins): what do we do here? + return await self.stores.main.get_partial_current_state_deltas( prev_stream_id, max_stream_id ) diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py index a61a951ef0c5..d89b8fc6da7c 100644 --- a/synapse/storage/util/partial_state_events_tracker.py +++ b/synapse/storage/util/partial_state_events_tracker.py @@ -21,6 +21,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.storage.databases.main.room import RoomWorkerStore from synapse.util import unwrapFirstError logger = logging.getLogger(__name__) @@ -118,3 +119,62 @@ async def await_full_state(self, event_ids: Collection[str]) -> None: observer_set.discard(observer) if not observer_set: del self._observers[event_id] + + +class PartialCurrentStateTracker: + """Keeps track of which rooms have partial state, after partial-state joins""" + + def __init__(self, store: RoomWorkerStore): + self._store = store + + # a map from room id to a set of Deferreds which are waiting for that room to be + # un-partial-stated. + self._observers: Dict[str, Set[Deferred[None]]] = defaultdict(set) + + def notify_un_partial_stated(self, room_id: str) -> None: + """Notify that we now have full current state for a given room + + Unblocks any callers to await_full_state() for that room. + + Args: + room_id: the room that now has full current state. + """ + observers = self._observers.pop(room_id, None) + if not observers: + return + logger.info( + "Notifying %i things waiting for un-partial-stating of room %s", + len(observers), + room_id, + ) + with PreserveLoggingContext(): + for o in observers: + o.callback(None) + + async def await_full_state(self, room_id: str) -> None: + # We add the deferred immediately so that the DB call to check for + # partial state doesn't race when we unpartial the room. + d = Deferred[None]() + self._observers.setdefault(room_id, set()).add(d) + + try: + # Check if the room has partial current state or not. + has_partial_state = await self._store.is_room_got_partial_state(room_id) + if not has_partial_state: + return + + logger.info( + "Awaiting un-partial-stating of room %s", + room_id, + ) + + await make_deferred_yieldable(d) + + logger.info("Room has un-partial-stated") + finally: + # Remove the added observer, and remove the room entry if its empty. + ds = self._observers.get(room_id) + if ds is not None: + ds.discard(d) + if not ds: + self._observers.pop(room_id, None) From 2b674f8f9a33e202397563119730e03fb5bb5c8a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 13:56:04 +0100 Subject: [PATCH 10/21] Add test for `PartialCurrentStateTracker` --- .../util/test_partial_state_events_tracker.py | 61 ++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/tests/storage/util/test_partial_state_events_tracker.py b/tests/storage/util/test_partial_state_events_tracker.py index 303e190b6cc2..d868db410876 100644 --- a/tests/storage/util/test_partial_state_events_tracker.py +++ b/tests/storage/util/test_partial_state_events_tracker.py @@ -17,8 +17,12 @@ from twisted.internet.defer import CancelledError, ensureDeferred -from synapse.storage.util.partial_state_events_tracker import PartialStateEventsTracker +from synapse.storage.util.partial_state_events_tracker import ( + PartialCurrentStateTracker, + PartialStateEventsTracker, +) +from tests.test_utils import make_awaitable from tests.unittest import TestCase @@ -115,3 +119,58 @@ def test_cancellation(self): self.tracker.notify_un_partial_stated("event1") self.successResultOf(d2) + + +class PartialCurrentStateTrackerTestCase(TestCase): + def setUp(self) -> None: + self.mock_store = mock.Mock(spec_set=["is_room_got_partial_state"]) + + self.tracker = PartialCurrentStateTracker(self.mock_store) + + def test_does_not_block_for_full_state_rooms(self): + self.mock_store.is_room_got_partial_state.return_value = make_awaitable(False) + + self.successResultOf(ensureDeferred(self.tracker.await_full_state("room_id"))) + + def test_blocks_for_partial_room_state(self): + self.mock_store.is_room_got_partial_state.return_value = make_awaitable(True) + + d = ensureDeferred(self.tracker.await_full_state("room_id")) + + # there should be no result yet + self.assertNoResult(d) + + # notifying that the room has been de-partial-stated should unblock + self.tracker.notify_un_partial_stated("room_id") + self.successResultOf(d) + + def test_un_partial_state_race(self): + # We should correctly handle race between awaiting the state and us + # un-partialling the state + async def is_room_got_partial_state(events): + self.tracker.notify_un_partial_stated("room_id") + return True + + self.mock_store.is_room_got_partial_state.side_effect = ( + is_room_got_partial_state + ) + + self.successResultOf(ensureDeferred(self.tracker.await_full_state("room_id"))) + + def test_cancellation(self): + self.mock_store.is_room_got_partial_state.return_value = make_awaitable(True) + + d1 = ensureDeferred(self.tracker.await_full_state("room_id")) + self.assertNoResult(d1) + + d2 = ensureDeferred(self.tracker.await_full_state("room_id")) + self.assertNoResult(d2) + + d1.cancel() + self.assertFailure(d1, CancelledError) + + # d2 should still be waiting! + self.assertNoResult(d2) + + self.tracker.notify_un_partial_stated("room_id") + self.successResultOf(d2) From ecae768a79de4d367378465fc2773c53549d7872 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 13:57:17 +0100 Subject: [PATCH 11/21] Newsfile --- changelog.d/12872.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12872.misc diff --git a/changelog.d/12872.misc b/changelog.d/12872.misc new file mode 100644 index 000000000000..b944bac1053b --- /dev/null +++ b/changelog.d/12872.misc @@ -0,0 +1 @@ +When querying the current state of the room wait for lazy joining of the room to finish. From d1a1d6a079904658fce6cd353898184d267d2fcd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 15:23:38 +0100 Subject: [PATCH 12/21] Don't block sync when lazy joining a room --- synapse/handlers/sync.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bf8b9efe035d..140919805255 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -574,8 +574,11 @@ async def _load_filtered_recents( # ensure that we always include current state in the timeline current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): + # FIXME(faster room joins): We use the partial state here as + # we don't want to block `/sync` on finishing a lazy join. + # Is this the correct way of doing it? current_state_ids_map = ( - await self.state_storage.get_current_state_ids(room_id) + await self.store.get_partial_current_state_ids(room_id) ) current_state_ids = frozenset(current_state_ids_map.values()) From 531955f7234965582cf56a42f53bffdc985e4be1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 May 2022 15:59:05 +0100 Subject: [PATCH 13/21] Fix type annotation --- synapse/storage/util/partial_state_events_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py index d89b8fc6da7c..62e051d88306 100644 --- a/synapse/storage/util/partial_state_events_tracker.py +++ b/synapse/storage/util/partial_state_events_tracker.py @@ -154,7 +154,7 @@ def notify_un_partial_stated(self, room_id: str) -> None: async def await_full_state(self, room_id: str) -> None: # We add the deferred immediately so that the DB call to check for # partial state doesn't race when we unpartial the room. - d = Deferred[None]() + d: Deferred[None] = Deferred() self._observers.setdefault(room_id, set()).add(d) try: From 6cc7269590d931513fb2321456f1a2cb40a2e18c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 May 2022 17:45:42 +0100 Subject: [PATCH 14/21] Apply suggestions from code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/sync.py | 2 +- synapse/storage/state.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 140919805255..d5d4556301b4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -574,7 +574,7 @@ async def _load_filtered_recents( # ensure that we always include current state in the timeline current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): - # FIXME(faster room joins): We use the partial state here as + # FIXME(faster_joins): We use the partial state here as # we don't want to block `/sync` on finishing a lazy join. # Is this the correct way of doing it? current_state_ids_map = ( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 55cd334e3c98..38570d00125d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -601,7 +601,7 @@ def notify_event_un_partial_stated(self, event_id: str) -> None: def notify_room_un_partial_stated(self, room_id: str) -> None: """Notify that the room no longer has any partial state. - Must be called after `clear_partial_state_room` + Must be called after `DataStore.clear_partial_state_room` """ self._partial_state_room_tracker.notify_un_partial_stated(room_id) @@ -1000,7 +1000,7 @@ async def get_current_state_deltas( - list of current_state_delta_stream rows. If it is empty, we are up to date. """ - # FIXME(faster room joins): what do we do here? + # FIXME(faster_joins): what do we do here? return await self.stores.main.get_partial_current_state_deltas( prev_stream_id, max_stream_id From 3a20548a1dc2cebda74c24d002a6d2faa731c139 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 May 2022 09:50:43 +0100 Subject: [PATCH 15/21] s/self.storage/self._storage/ --- synapse/handlers/room_member.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index d41a291610e2..f0941f54a42e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -68,7 +68,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastores().main - self.storage = hs.get_storage() + self._storage = hs.get_storage() self.auth = hs.get_auth() self.state_handler = hs.get_state_handler() self.config = hs.config @@ -995,7 +995,7 @@ async def _should_perform_remote_join( # If the host is in the room, but not one of the authorised hosts # for restricted join rules, a remote join must be used. room_version = await self.store.get_room_version(room_id) - current_state_ids = await self.storage.state.get_current_state_ids(room_id) + current_state_ids = await self._storage.state.get_current_state_ids(room_id) # If restricted join rules are not being used, a local join can always # be used. From 6e7625eef0c334f60610ea46200558346267cca6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 May 2022 09:52:20 +0100 Subject: [PATCH 16/21] s/is_room_got_partial_state/has_room_only_got_partial_state/ --- synapse/storage/databases/main/room.py | 4 ++-- .../util/partial_state_events_tracker.py | 4 +++- .../util/test_partial_state_events_tracker.py | 20 ++++++++++++------- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 4fca1388b8d7..11349c6d7a9d 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1112,7 +1112,7 @@ def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None keyvalues={"room_id": room_id}, ) - async def is_room_got_partial_state(self, room_id: str) -> bool: + async def has_room_only_got_partial_state(self, room_id: str) -> bool: "Whether the given room only has partial state stored" entry = await self.db_pool.simple_select_one_onecol( @@ -1120,7 +1120,7 @@ async def is_room_got_partial_state(self, room_id: str) -> bool: keyvalues={"room_id": room_id}, retcol="room_id", allow_none=True, - desc="is_room_got_partial_state", + desc="has_room_only_got_partial_state", ) return entry is not None diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py index 62e051d88306..e8406b8c7f31 100644 --- a/synapse/storage/util/partial_state_events_tracker.py +++ b/synapse/storage/util/partial_state_events_tracker.py @@ -159,7 +159,9 @@ async def await_full_state(self, room_id: str) -> None: try: # Check if the room has partial current state or not. - has_partial_state = await self._store.is_room_got_partial_state(room_id) + has_partial_state = await self._store.has_room_only_got_partial_state( + room_id + ) if not has_partial_state: return diff --git a/tests/storage/util/test_partial_state_events_tracker.py b/tests/storage/util/test_partial_state_events_tracker.py index d868db410876..2a1a5d4176f7 100644 --- a/tests/storage/util/test_partial_state_events_tracker.py +++ b/tests/storage/util/test_partial_state_events_tracker.py @@ -123,17 +123,21 @@ def test_cancellation(self): class PartialCurrentStateTrackerTestCase(TestCase): def setUp(self) -> None: - self.mock_store = mock.Mock(spec_set=["is_room_got_partial_state"]) + self.mock_store = mock.Mock(spec_set=["has_room_only_got_partial_state"]) self.tracker = PartialCurrentStateTracker(self.mock_store) def test_does_not_block_for_full_state_rooms(self): - self.mock_store.is_room_got_partial_state.return_value = make_awaitable(False) + self.mock_store.has_room_only_got_partial_state.return_value = make_awaitable( + False + ) self.successResultOf(ensureDeferred(self.tracker.await_full_state("room_id"))) def test_blocks_for_partial_room_state(self): - self.mock_store.is_room_got_partial_state.return_value = make_awaitable(True) + self.mock_store.has_room_only_got_partial_state.return_value = make_awaitable( + True + ) d = ensureDeferred(self.tracker.await_full_state("room_id")) @@ -147,18 +151,20 @@ def test_blocks_for_partial_room_state(self): def test_un_partial_state_race(self): # We should correctly handle race between awaiting the state and us # un-partialling the state - async def is_room_got_partial_state(events): + async def has_room_only_got_partial_state(events): self.tracker.notify_un_partial_stated("room_id") return True - self.mock_store.is_room_got_partial_state.side_effect = ( - is_room_got_partial_state + self.mock_store.has_room_only_got_partial_state.side_effect = ( + has_room_only_got_partial_state ) self.successResultOf(ensureDeferred(self.tracker.await_full_state("room_id"))) def test_cancellation(self): - self.mock_store.is_room_got_partial_state.return_value = make_awaitable(True) + self.mock_store.has_room_only_got_partial_state.return_value = make_awaitable( + True + ) d1 = ensureDeferred(self.tracker.await_full_state("room_id")) self.assertNoResult(d1) From 711ea44e6ce573c41e7d4c7f3d09ee2304e39e17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 May 2022 10:01:14 +0100 Subject: [PATCH 17/21] Merge get_filtered and get_current_state_ids --- synapse/handlers/message.py | 2 +- synapse/handlers/register.py | 2 +- synapse/handlers/room.py | 8 +++--- synapse/module_api/__init__.py | 18 ++++++------- synapse/storage/state.py | 46 ++++++++++++++-------------------- 5 files changed, 31 insertions(+), 45 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fec983944556..167b3f546b61 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -228,7 +228,7 @@ async def get_state_events( ) if membership == Membership.JOIN: - state_ids = await self.state_storage.get_filtered_current_state_ids( + state_ids = await self.state_storage.get_current_state_ids( room_id, state_filter=state_filter ) room_state = await self.store.get_events(state_ids.values()) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index e7acf787f203..d5c69499b21f 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -529,7 +529,7 @@ async def _join_rooms(self, user_id: str) -> None: if requires_invite: # If the server is in the room, check if the room is public. - state = await self._storage.state.get_filtered_current_state_ids( + state = await self._storage.state.get_current_state_ids( room_id, StateFilter.from_types([(EventTypes.JoinRules, "")]) ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 787c564a3094..b6827cc6b8ce 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -482,7 +482,7 @@ async def clone_existing_room( if room_type == RoomTypes.SPACE: types_to_copy.append((EventTypes.SpaceChild, None)) - old_room_state_ids = await self._storage.state.get_filtered_current_state_ids( + old_room_state_ids = await self._storage.state.get_current_state_ids( old_room_id, StateFilter.from_types(types_to_copy) ) # map from event_id to BaseEvent @@ -560,10 +560,8 @@ async def clone_existing_room( ) # Transfer membership events - old_room_member_state_ids = ( - await self._storage.state.get_filtered_current_state_ids( - old_room_id, StateFilter.from_types([(EventTypes.Member, None)]) - ) + old_room_member_state_ids = await self._storage.state.get_current_state_ids( + old_room_id, StateFilter.from_types([(EventTypes.Member, None)]) ) # map from event_id to BaseEvent diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 0da627b7a7a9..1ceeb885a429 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -914,7 +914,7 @@ def get_state_events_in_room( The filtered state events in the room. """ state_ids = yield defer.ensureDeferred( - self._storage.state.get_filtered_current_state_ids( + self._storage.state.get_current_state_ids( room_id=room_id, state_filter=StateFilter.from_types(types) ) ) @@ -1289,20 +1289,16 @@ async def get_room_state( # regardless of their state key ] """ + state_filter = None if event_filter: # If a filter was provided, turn it into a StateFilter and retrieve a filtered # view of the state. state_filter = StateFilter.from_types(event_filter) - state_ids = await self._storage.state.get_filtered_current_state_ids( - room_id, - state_filter, - ) - else: - # If no filter was provided, get the whole state. We could also reuse the call - # to get_filtered_current_state_ids above, with `state_filter = StateFilter.all()`, - # but get_filtered_current_state_ids isn't cached and `get_current_state_ids` - # is, so using the latter when we can is better for perf. - state_ids = await self._storage.state.get_current_state_ids(room_id) + + state_ids = await self._storage.state.get_current_state_ids( + room_id, + state_filter, + ) state_events = await self._store.get_events(state_ids.values()) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 38570d00125d..afff2017c44e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -909,46 +909,38 @@ async def store_state_group( ) async def get_current_state_ids( - self, room_id: str, on_invalidate: Optional[Callable[[], None]] = None + self, + room_id: str, + state_filter: Optional[StateFilter] = None, + on_invalidate: Optional[Callable[[], None]] = None, ) -> StateMap[str]: """Get the current state event ids for a room based on the current_state_events table. + If a state filter is given (that is not `StateFilter.all()`) the query + result is *not* cached. + Args: - room_id: The room to get the state IDs of. + room_id: The room to get the state IDs of. state_filter: The state + filter used to fetch state from the + database. on_invalidate: Callback for when the `get_current_state_ids` cache for the room gets invalidated. Returns: The current state of the room. """ - await self._partial_state_room_tracker.await_full_state(room_id) - - return await self.stores.main.get_partial_current_state_ids( - room_id, on_invalidate=on_invalidate - ) - - async def get_filtered_current_state_ids( - self, room_id: str, state_filter: Optional[StateFilter] = None - ) -> StateMap[str]: - """Get the current state event of a given type for a room based on the - current_state_events table. This may not be as up-to-date as the result - of doing a fresh state resolution as per state_handler.get_current_state - - Args: - room_id - state_filter: The state filter used to fetch state - from the database. - - Returns: - Map from type/state_key to event ID. - """ if not state_filter or state_filter.must_await_full_state(self._is_mine_id): await self._partial_state_room_tracker.await_full_state(room_id) - return await self.stores.main.get_partial_filtered_current_state_ids( - room_id, state_filter - ) + if state_filter and not state_filter.is_full(): + return await self.stores.main.get_partial_filtered_current_state_ids( + room_id, state_filter + ) + else: + return await self.stores.main.get_partial_current_state_ids( + room_id, on_invalidate=on_invalidate + ) async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]: """Get canonical alias for room, if any @@ -960,7 +952,7 @@ async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]: The canonical alias, if any """ - state = await self.get_filtered_current_state_ids( + state = await self.get_current_state_ids( room_id, StateFilter.from_types([(EventTypes.CanonicalAlias, "")]) ) From c6fe1322790a038e4d055b41117da446a028cbe1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 May 2022 15:56:13 +0100 Subject: [PATCH 18/21] Fix up naming --- synapse/events/third_party_rules.py | 4 ++-- synapse/handlers/directory.py | 8 +++++--- synapse/handlers/presence.py | 7 +++++-- synapse/handlers/register.py | 4 ++-- synapse/handlers/room.py | 14 +++++++++----- synapse/handlers/room_list.py | 4 ++-- synapse/handlers/room_member.py | 6 ++++-- synapse/handlers/room_summary.py | 12 ++++++++---- synapse/handlers/stats.py | 7 +++++-- synapse/handlers/user_directory.py | 7 +++++-- synapse/module_api/__init__.py | 6 +++--- synapse/rest/admin/rooms.py | 4 ++-- 12 files changed, 52 insertions(+), 31 deletions(-) diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index 82e171b5b5f1..35f3f3690f8c 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -152,7 +152,7 @@ def __init__(self, hs: "HomeServer"): self.third_party_rules = None self.store = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self._check_event_allowed_callbacks: List[CHECK_EVENT_ALLOWED_CALLBACK] = [] self._on_create_room_callbacks: List[ON_CREATE_ROOM_CALLBACK] = [] @@ -464,7 +464,7 @@ async def _get_state_map_for_room(self, room_id: str) -> StateMap[EventBase]: Returns: A dict mapping (event type, state key) to state event. """ - state_ids = await self._storage.state.get_current_state_ids(room_id) + state_ids = await self._storage_controllers.state.get_current_state_ids(room_id) room_state_events = await self.store.get_events(state_ids.values()) state_events = {} diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 001878be240e..44e84698c48e 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -45,7 +45,7 @@ def __init__(self, hs: "HomeServer"): self.appservice_handler = hs.get_application_service_handler() self.event_creation_handler = hs.get_event_creation_handler() self.store = hs.get_datastores().main - self.storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self.config = hs.config self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search self.require_membership = hs.config.server.require_membership_for_aliases @@ -464,8 +464,10 @@ async def edit_published_room_list( making_public = visibility == "public" if making_public: room_aliases = await self.store.get_aliases_for_room(room_id) - canonical_alias = await self.storage.state.get_canonical_alias_for_room( - room_id + canonical_alias = ( + await self._storage_controllers.state.get_canonical_alias_for_room( + room_id + ) ) if canonical_alias: room_aliases.append(canonical_alias) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 76c27fabc420..895ea63ed33b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -134,7 +134,7 @@ class BasePresenceHandler(abc.ABC): def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.store = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self.presence_router = hs.get_presence_router() self.state = hs.get_state_handler() self.is_mine_id = hs.is_mine_id @@ -1349,7 +1349,10 @@ async def _unsafe_process(self) -> None: self._event_pos, room_max_stream_ordering, ) - max_pos, deltas = await self._storage.state.get_current_state_deltas( + ( + max_pos, + deltas, + ) = await self._storage_controllers.state.get_current_state_deltas( self._event_pos, room_max_stream_ordering ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index d5c69499b21f..338204287f66 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -87,7 +87,7 @@ class LoginDict(TypedDict): class RegistrationHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self.clock = hs.get_clock() self.hs = hs self.auth = hs.get_auth() @@ -529,7 +529,7 @@ async def _join_rooms(self, user_id: str) -> None: if requires_invite: # If the server is in the room, check if the room is public. - state = await self._storage.state.get_current_state_ids( + state = await self._storage_controllers.state.get_current_state_ids( room_id, StateFilter.from_types([(EventTypes.JoinRules, "")]) ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 8a376404370d..9b49e577bd79 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -107,7 +107,7 @@ class EventContext: class RoomCreationHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self.auth = hs.get_auth() self.clock = hs.get_clock() self.hs = hs @@ -482,8 +482,10 @@ async def clone_existing_room( if room_type == RoomTypes.SPACE: types_to_copy.append((EventTypes.SpaceChild, None)) - old_room_state_ids = await self._storage.state.get_current_state_ids( - old_room_id, StateFilter.from_types(types_to_copy) + old_room_state_ids = ( + await self._storage_controllers.state.get_current_state_ids( + old_room_id, StateFilter.from_types(types_to_copy) + ) ) # map from event_id to BaseEvent old_room_state_events = await self.store.get_events(old_room_state_ids.values()) @@ -560,8 +562,10 @@ async def clone_existing_room( ) # Transfer membership events - old_room_member_state_ids = await self._storage.state.get_current_state_ids( - old_room_id, StateFilter.from_types([(EventTypes.Member, None)]) + old_room_member_state_ids = ( + await self._storage_controllers.state.get_current_state_ids( + old_room_id, StateFilter.from_types([(EventTypes.Member, None)]) + ) ) # map from event_id to BaseEvent diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 4246c9be3f5b..183d4ae3c4fe 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -50,7 +50,7 @@ class RoomListHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self.hs = hs self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search self.response_cache: ResponseCache[ @@ -275,7 +275,7 @@ async def generate_room_entry( if aliases: result["aliases"] = aliases - current_state_ids = await self._storage.state.get_current_state_ids( + current_state_ids = await self._storage_controllers.state.get_current_state_ids( room_id, on_invalidate=cache_context.invalidate ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index f0941f54a42e..70c674ff8e25 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -68,7 +68,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self.auth = hs.get_auth() self.state_handler = hs.get_state_handler() self.config = hs.config @@ -995,7 +995,9 @@ async def _should_perform_remote_join( # If the host is in the room, but not one of the authorised hosts # for restricted join rules, a remote join must be used. room_version = await self.store.get_room_version(room_id) - current_state_ids = await self._storage.state.get_current_state_ids(room_id) + current_state_ids = await self._storage_controllers.state.get_current_state_ids( + room_id + ) # If restricted join rules are not being used, a local join can always # be used. diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index 1350c3bd63e2..13098f56ed26 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -90,7 +90,7 @@ class RoomSummaryHandler: def __init__(self, hs: "HomeServer"): self._event_auth_handler = hs.get_event_auth_handler() self._store = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self._event_serializer = hs.get_event_client_serializer() self._server_name = hs.hostname self._federation_client = hs.get_federation_client() @@ -538,7 +538,7 @@ async def _is_local_room_accessible( Returns: True if the room is accessible to the requesting user or server. """ - state_ids = await self._storage.state.get_current_state_ids(room_id) + state_ids = await self._storage_controllers.state.get_current_state_ids(room_id) # If there's no state for the room, it isn't known. if not state_ids: @@ -703,7 +703,9 @@ async def _build_room_entry(self, room_id: str, for_federation: bool) -> JsonDic # there should always be an entry assert stats is not None, "unable to retrieve stats for %s" % (room_id,) - current_state_ids = await self._storage.state.get_current_state_ids(room_id) + current_state_ids = await self._storage_controllers.state.get_current_state_ids( + room_id + ) create_event = await self._store.get_event( current_state_ids[(EventTypes.Create, "")] ) @@ -761,7 +763,9 @@ async def _get_child_events(self, room_id: str) -> Iterable[EventBase]: """ # look for child rooms/spaces. - current_state_ids = await self._storage.state.get_current_state_ids(room_id) + current_state_ids = await self._storage_controllers.state.get_current_state_ids( + room_id + ) events = await self._store.get_events_as_list( [ diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 2950f9fc30e7..f45e06eb0e08 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -40,7 +40,7 @@ class StatsHandler: def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() @@ -106,7 +106,10 @@ async def _unsafe_process(self) -> None: logger.debug( "Processing room stats %s->%s", self.pos, room_max_stream_ordering ) - max_pos, deltas = await self._storage.state.get_current_state_deltas( + ( + max_pos, + deltas, + ) = await self._storage_controllers.state.get_current_state_deltas( self.pos, room_max_stream_ordering ) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 6629697c3b5e..8c3c52e1caa6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -56,7 +56,7 @@ def __init__(self, hs: "HomeServer"): super().__init__(hs) self.store = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() @@ -175,7 +175,10 @@ async def _unsafe_process(self) -> None: logger.debug( "Processing user stats %s->%s", self.pos, room_max_stream_ordering ) - max_pos, deltas = await self._storage.state.get_current_state_deltas( + ( + max_pos, + deltas, + ) = await self._storage_controllers.state.get_current_state_deltas( self.pos, room_max_stream_ordering ) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 72c039686a64..a8ad575fcd96 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -194,7 +194,7 @@ def __init__(self, hs: "HomeServer", auth_handler: AuthHandler) -> None: self._store: Union[ DataStore, "GenericWorkerSlavedStore" ] = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self._auth = hs.get_auth() self._auth_handler = auth_handler self._server_name = hs.hostname @@ -912,7 +912,7 @@ def get_state_events_in_room( The filtered state events in the room. """ state_ids = yield defer.ensureDeferred( - self._storage.state.get_current_state_ids( + self._storage_controllers.state.get_current_state_ids( room_id=room_id, state_filter=StateFilter.from_types(types) ) ) @@ -1296,7 +1296,7 @@ async def get_room_state( # view of the state. state_filter = StateFilter.from_types(event_filter) - state_ids = await self._storage.state.get_current_state_ids( + state_ids = await self._storage_controllers.state.get_current_state_ids( room_id, state_filter, ) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 4447a543c911..1cacd1a4f075 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -418,7 +418,7 @@ class RoomStateRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() self.store = hs.get_datastores().main - self._storage = hs.get_storage() + self._storage_controllers = hs.get_storage_controllers() self.clock = hs.get_clock() self._event_serializer = hs.get_event_client_serializer() @@ -431,7 +431,7 @@ async def on_GET( if not ret: raise NotFoundError("Room not found") - event_ids = await self._storage.state.get_current_state_ids(room_id) + event_ids = await self._storage_controllers.state.get_current_state_ids(room_id) events = await self.store.get_events(event_ids.values()) now = self.clock.time_msec() room_state = self._event_serializer.serialize_events(events.values(), now) From 4757b0094139f8213fbc22bbb591290333544bf9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Jun 2022 14:59:00 +0100 Subject: [PATCH 19/21] Apply suggestions from code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/12872.misc | 2 +- synapse/storage/databases/main/room.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/changelog.d/12872.misc b/changelog.d/12872.misc index b944bac1053b..f60a756f2110 100644 --- a/changelog.d/12872.misc +++ b/changelog.d/12872.misc @@ -1 +1 @@ -When querying the current state of the room wait for lazy joining of the room to finish. +Faster room joins: when querying the current state of the room, wait for state to be populated. diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 11349c6d7a9d..ed25503ceb30 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1113,7 +1113,12 @@ def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None ) async def has_room_only_got_partial_state(self, room_id: str) -> bool: - "Whether the given room only has partial state stored" + """Checks if this room has partial state. + + Returns true if this is a "partial-state" room, which means that the state + at events in the room, and `current_state_events`, may not yet be + complete. + """ entry = await self.db_pool.simple_select_one_onecol( table="partial_state_rooms", From b6cb65fedba5e90a5134cee2d13ee71bc3323ce2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Jun 2022 15:03:47 +0100 Subject: [PATCH 20/21] Lint --- synapse/storage/databases/main/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index ed25503ceb30..32a6b264e3af 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1114,7 +1114,7 @@ def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None async def has_room_only_got_partial_state(self, room_id: str) -> bool: """Checks if this room has partial state. - + Returns true if this is a "partial-state" room, which means that the state at events in the room, and `current_state_events`, may not yet be complete. From c513c20ce940b6cf527903bf3f0c6cc7df47d2b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Jun 2022 15:05:20 +0100 Subject: [PATCH 21/21] Rename has_room_only_got_partial_state --- synapse/storage/databases/main/room.py | 4 ++-- .../util/partial_state_events_tracker.py | 4 +--- .../util/test_partial_state_events_tracker.py | 20 ++++++------------- 3 files changed, 9 insertions(+), 19 deletions(-) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 32a6b264e3af..2b0e819078a0 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1112,7 +1112,7 @@ def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None keyvalues={"room_id": room_id}, ) - async def has_room_only_got_partial_state(self, room_id: str) -> bool: + async def is_partial_state_room(self, room_id: str) -> bool: """Checks if this room has partial state. Returns true if this is a "partial-state" room, which means that the state @@ -1125,7 +1125,7 @@ async def has_room_only_got_partial_state(self, room_id: str) -> bool: keyvalues={"room_id": room_id}, retcol="room_id", allow_none=True, - desc="has_room_only_got_partial_state", + desc="is_partial_state_room", ) return entry is not None diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py index e8406b8c7f31..211437cfaa99 100644 --- a/synapse/storage/util/partial_state_events_tracker.py +++ b/synapse/storage/util/partial_state_events_tracker.py @@ -159,9 +159,7 @@ async def await_full_state(self, room_id: str) -> None: try: # Check if the room has partial current state or not. - has_partial_state = await self._store.has_room_only_got_partial_state( - room_id - ) + has_partial_state = await self._store.is_partial_state_room(room_id) if not has_partial_state: return diff --git a/tests/storage/util/test_partial_state_events_tracker.py b/tests/storage/util/test_partial_state_events_tracker.py index 2a1a5d4176f7..cae14151c0fa 100644 --- a/tests/storage/util/test_partial_state_events_tracker.py +++ b/tests/storage/util/test_partial_state_events_tracker.py @@ -123,21 +123,17 @@ def test_cancellation(self): class PartialCurrentStateTrackerTestCase(TestCase): def setUp(self) -> None: - self.mock_store = mock.Mock(spec_set=["has_room_only_got_partial_state"]) + self.mock_store = mock.Mock(spec_set=["is_partial_state_room"]) self.tracker = PartialCurrentStateTracker(self.mock_store) def test_does_not_block_for_full_state_rooms(self): - self.mock_store.has_room_only_got_partial_state.return_value = make_awaitable( - False - ) + self.mock_store.is_partial_state_room.return_value = make_awaitable(False) self.successResultOf(ensureDeferred(self.tracker.await_full_state("room_id"))) def test_blocks_for_partial_room_state(self): - self.mock_store.has_room_only_got_partial_state.return_value = make_awaitable( - True - ) + self.mock_store.is_partial_state_room.return_value = make_awaitable(True) d = ensureDeferred(self.tracker.await_full_state("room_id")) @@ -151,20 +147,16 @@ def test_blocks_for_partial_room_state(self): def test_un_partial_state_race(self): # We should correctly handle race between awaiting the state and us # un-partialling the state - async def has_room_only_got_partial_state(events): + async def is_partial_state_room(events): self.tracker.notify_un_partial_stated("room_id") return True - self.mock_store.has_room_only_got_partial_state.side_effect = ( - has_room_only_got_partial_state - ) + self.mock_store.is_partial_state_room.side_effect = is_partial_state_room self.successResultOf(ensureDeferred(self.tracker.await_full_state("room_id"))) def test_cancellation(self): - self.mock_store.has_room_only_got_partial_state.return_value = make_awaitable( - True - ) + self.mock_store.is_partial_state_room.return_value = make_awaitable(True) d1 = ensureDeferred(self.tracker.await_full_state("room_id")) self.assertNoResult(d1)