Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Wait for lazy join to complete when getting current state #12872

Merged
merged 23 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
78faa1a
Rename StateGroupStorage
erikjohnston May 25, 2022
334844d
Add fetching current state funcs to StateStorage
erikjohnston May 25, 2022
fe86915
Use StateStorage.get_current_state_ids
erikjohnston May 25, 2022
a2465b8
Use StateStorage.get_filtered_current_state_ids
erikjohnston May 25, 2022
a2945a5
Use StateStorage.get_current_state_deltas
erikjohnston May 25, 2022
e786f68
Rename to get_partial_current_state_ids
erikjohnston May 25, 2022
3f74b37
Rename to get_partial_filtered_current_state_ids
erikjohnston May 25, 2022
c068581
Rename to get_partial_current_state_deltas
erikjohnston May 25, 2022
6f386d1
Block calls to `get_current_state_ids` and co. when have partial state
erikjohnston May 25, 2022
2b674f8
Add test for `PartialCurrentStateTracker`
erikjohnston May 25, 2022
ecae768
Newsfile
erikjohnston May 25, 2022
d1a1d6a
Don't block sync when lazy joining a room
erikjohnston May 25, 2022
531955f
Fix type annotation
erikjohnston May 25, 2022
6cc7269
Apply suggestions from code review
erikjohnston May 26, 2022
3a20548
s/self.storage/self._storage/
erikjohnston May 27, 2022
6e7625e
s/is_room_got_partial_state/has_room_only_got_partial_state/
erikjohnston May 27, 2022
711ea44
Merge get_filtered and get_current_state_ids
erikjohnston May 27, 2022
1f99ce0
Merge remote-tracking branch 'origin/develop' into erikj/await_curren…
erikjohnston May 27, 2022
b59418f
Merge remote-tracking branch 'origin/develop' into erikj/await_curren…
erikjohnston May 31, 2022
c6fe132
Fix up naming
erikjohnston May 31, 2022
4757b00
Apply suggestions from code review
erikjohnston Jun 1, 2022
b6cb65f
Lint
erikjohnston Jun 1, 2022
c513c20
Rename has_room_only_got_partial_state
erikjohnston Jun 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12872.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
When querying the current state of the room wait for lazy joining of the room to finish.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 2 additions & 1 deletion synapse/events/third_party_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def __init__(self, hs: "HomeServer"):
self.third_party_rules = None

self.store = hs.get_datastores().main
self._storage = hs.get_storage()

self._check_event_allowed_callbacks: List[CHECK_EVENT_ALLOWED_CALLBACK] = []
self._on_create_room_callbacks: List[ON_CREATE_ROOM_CALLBACK] = []
Expand Down Expand Up @@ -463,7 +464,7 @@ async def _get_state_map_for_room(self, room_id: str) -> StateMap[EventBase]:
Returns:
A dict mapping (event type, state key) to state event.
"""
state_ids = await self.store.get_filtered_current_state_ids(room_id)
state_ids = await self._storage.state.get_current_state_ids(room_id)
room_state_events = await self.store.get_events(state_ids.values())

state_events = {}
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ async def check_server_matches_acl(self, server_name: str, room_id: str) -> None
Raises:
AuthError if the server does not match the ACL
"""
state_ids = await self.store.get_current_state_ids(room_id)
state_ids = await self.storage.state.get_current_state_ids(room_id)
acl_event_id = state_ids.get((EventTypes.ServerACL, ""))

if not acl_event_id:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ async def get_user_ids_changed(
possibly_changed = set(changed)
possibly_left = set()
for room_id in rooms_changed:
current_state_ids = await self.store.get_current_state_ids(room_id)
current_state_ids = await self.state_storage.get_current_state_ids(room_id)

# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
Expand Down
5 changes: 4 additions & 1 deletion synapse/handlers/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, hs: "HomeServer"):
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastores().main
self.storage = hs.get_storage()
self.config = hs.config
self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search
self.require_membership = hs.config.server.require_membership_for_aliases
Expand Down Expand Up @@ -463,7 +464,9 @@ async def edit_published_room_list(
making_public = visibility == "public"
if making_public:
room_aliases = await self.store.get_aliases_for_room(room_id)
canonical_alias = await self.store.get_canonical_alias_for_room(room_id)
canonical_alias = await self.storage.state.get_canonical_alias_for_room(
room_id
)
if canonical_alias:
room_aliases.append(canonical_alias)

Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ async def on_make_join_request(
# Note that this requires the /send_join request to come back to the
# same server.
if room_version.msc3083_join_rules:
state_ids = await self.store.get_current_state_ids(room_id)
state_ids = await self.state_storage.get_current_state_ids(room_id)
if await self._event_auth_handler.has_restricted_join_rules(
state_ids, room_version
):
Expand Down Expand Up @@ -1488,6 +1488,7 @@ async def _sync_partial_state_room(
success = await self.store.clear_partial_state_room(room_id)
if success:
logger.info("State resync complete for %s", room_id)
self.storage.state.notify_room_un_partial_stated(room_id)

# TODO(faster_joins) update room stats and user directory?
return
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async def get_state_events(
)

if membership == Membership.JOIN:
state_ids = await self.store.get_filtered_current_state_ids(
state_ids = await self.state_storage.get_filtered_current_state_ids(
room_id, state_filter=state_filter
)
room_state = await self.store.get_events(state_ids.values())
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class BasePresenceHandler(abc.ABC):
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self._storage = hs.get_storage()
self.presence_router = hs.get_presence_router()
self.state = hs.get_state_handler()
self.is_mine_id = hs.is_mine_id
Expand Down Expand Up @@ -1346,7 +1347,7 @@ async def _unsafe_process(self) -> None:
self._event_pos,
room_max_stream_ordering,
)
max_pos, deltas = await self.store.get_current_state_deltas(
max_pos, deltas = await self._storage.state.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)

Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class LoginDict(TypedDict):
class RegistrationHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage = hs.get_storage()
self.clock = hs.get_clock()
self.hs = hs
self.auth = hs.get_auth()
Expand Down Expand Up @@ -528,7 +529,7 @@ async def _join_rooms(self, user_id: str) -> None:

if requires_invite:
# If the server is in the room, check if the room is public.
state = await self.store.get_filtered_current_state_ids(
state = await self._storage.state.get_filtered_current_state_ids(
room_id, StateFilter.from_types([(EventTypes.JoinRules, "")])
)

Expand Down
9 changes: 6 additions & 3 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class EventContext:
class RoomCreationHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage = hs.get_storage()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.hs = hs
Expand Down Expand Up @@ -481,7 +482,7 @@ async def clone_existing_room(
if room_type == RoomTypes.SPACE:
types_to_copy.append((EventTypes.SpaceChild, None))

old_room_state_ids = await self.store.get_filtered_current_state_ids(
old_room_state_ids = await self._storage.state.get_filtered_current_state_ids(
old_room_id, StateFilter.from_types(types_to_copy)
)
# map from event_id to BaseEvent
Expand Down Expand Up @@ -559,8 +560,10 @@ async def clone_existing_room(
)

# Transfer membership events
old_room_member_state_ids = await self.store.get_filtered_current_state_ids(
old_room_id, StateFilter.from_types([(EventTypes.Member, None)])
old_room_member_state_ids = (
await self._storage.state.get_filtered_current_state_ids(
old_room_id, StateFilter.from_types([(EventTypes.Member, None)])
)
)

# map from event_id to BaseEvent
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/room_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
class RoomListHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage = hs.get_storage()
self.hs = hs
self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search
self.response_cache: ResponseCache[
Expand Down Expand Up @@ -274,7 +275,7 @@ async def generate_room_entry(
if aliases:
result["aliases"] = aliases

current_state_ids = await self.store.get_current_state_ids(
current_state_ids = await self._storage.state.get_current_state_ids(
room_id, on_invalidate=cache_context.invalidate
)

Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
self.storage = hs.get_storage()
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
self.auth = hs.get_auth()
self.state_handler = hs.get_state_handler()
self.config = hs.config
Expand Down Expand Up @@ -994,7 +995,7 @@ async def _should_perform_remote_join(
# If the host is in the room, but not one of the authorised hosts
# for restricted join rules, a remote join must be used.
room_version = await self.store.get_room_version(room_id)
current_state_ids = await self.store.get_current_state_ids(room_id)
current_state_ids = await self.storage.state.get_current_state_ids(room_id)

# If restricted join rules are not being used, a local join can always
# be used.
Expand Down
7 changes: 4 additions & 3 deletions synapse/handlers/room_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class RoomSummaryHandler:
def __init__(self, hs: "HomeServer"):
self._event_auth_handler = hs.get_event_auth_handler()
self._store = hs.get_datastores().main
self._storage = hs.get_storage()
richvdh marked this conversation as resolved.
Show resolved Hide resolved
self._event_serializer = hs.get_event_client_serializer()
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()
Expand Down Expand Up @@ -537,7 +538,7 @@ async def _is_local_room_accessible(
Returns:
True if the room is accessible to the requesting user or server.
"""
state_ids = await self._store.get_current_state_ids(room_id)
state_ids = await self._storage.state.get_current_state_ids(room_id)

# If there's no state for the room, it isn't known.
if not state_ids:
Expand Down Expand Up @@ -702,7 +703,7 @@ async def _build_room_entry(self, room_id: str, for_federation: bool) -> JsonDic
# there should always be an entry
assert stats is not None, "unable to retrieve stats for %s" % (room_id,)

current_state_ids = await self._store.get_current_state_ids(room_id)
current_state_ids = await self._storage.state.get_current_state_ids(room_id)
create_event = await self._store.get_event(
current_state_ids[(EventTypes.Create, "")]
)
Expand Down Expand Up @@ -763,7 +764,7 @@ async def _get_child_events(self, room_id: str) -> Iterable[EventBase]:
"""

# look for child rooms/spaces.
current_state_ids = await self._store.get_current_state_ids(room_id)
current_state_ids = await self._storage.state.get_current_state_ids(room_id)

events = await self._store.get_events_as_list(
[
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class StatsHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
self._storage = hs.get_storage()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
self.clock = hs.get_clock()
Expand Down Expand Up @@ -105,7 +106,7 @@ async def _unsafe_process(self) -> None:
logger.debug(
"Processing room stats %s->%s", self.pos, room_max_stream_ordering
)
max_pos, deltas = await self.store.get_current_state_deltas(
max_pos, deltas = await self._storage.state.get_current_state_deltas(
self.pos, room_max_stream_ordering
)

Expand Down
11 changes: 7 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,8 @@ async def _load_filtered_recents(
# ensure that we always include current state in the timeline
current_state_ids: FrozenSet[str] = frozenset()
if any(e.is_state() for e in recents):
current_state_ids_map = await self.store.get_current_state_ids(
room_id
current_state_ids_map = (
await self.state_storage.get_current_state_ids(room_id)
)
current_state_ids = frozenset(current_state_ids_map.values())

Expand Down Expand Up @@ -574,8 +574,11 @@ async def _load_filtered_recents(
# ensure that we always include current state in the timeline
current_state_ids = frozenset()
if any(e.is_state() for e in loaded_recents):
current_state_ids_map = await self.store.get_current_state_ids(
room_id
# FIXME(faster room joins): We use the partial state here as
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# we don't want to block `/sync` on finishing a lazy join.
# Is this the correct way of doing it?
current_state_ids_map = (
await self.store.get_partial_current_state_ids(room_id)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a check to see if the state event we're about to send down to clients is in the current state, and if so to always send it even if filter_events_for_client would filter it out. Not sure if using get_partial_current_state_ids is the right approach here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really following, but this is something we can return to.

)
current_state_ids = frozenset(current_state_ids_map.values())

Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.store = hs.get_datastores().main
self._storage = hs.get_storage()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
Expand Down Expand Up @@ -174,7 +175,7 @@ async def _unsafe_process(self) -> None:
logger.debug(
"Processing user stats %s->%s", self.pos, room_max_stream_ordering
)
max_pos, deltas = await self.store.get_current_state_deltas(
max_pos, deltas = await self._storage.state.get_current_state_deltas(
self.pos, room_max_stream_ordering
)

Expand Down
7 changes: 4 additions & 3 deletions synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def __init__(self, hs: "HomeServer", auth_handler: AuthHandler) -> None:
self._store: Union[
DataStore, "GenericWorkerSlavedStore"
] = hs.get_datastores().main
self._storage = hs.get_storage()
self._auth = hs.get_auth()
self._auth_handler = auth_handler
self._server_name = hs.hostname
Expand Down Expand Up @@ -913,7 +914,7 @@ def get_state_events_in_room(
The filtered state events in the room.
"""
state_ids = yield defer.ensureDeferred(
self._store.get_filtered_current_state_ids(
self._storage.state.get_filtered_current_state_ids(
room_id=room_id, state_filter=StateFilter.from_types(types)
)
)
Expand Down Expand Up @@ -1292,7 +1293,7 @@ async def get_room_state(
# If a filter was provided, turn it into a StateFilter and retrieve a filtered
# view of the state.
state_filter = StateFilter.from_types(event_filter)
state_ids = await self._store.get_filtered_current_state_ids(
state_ids = await self._storage.state.get_filtered_current_state_ids(
room_id,
state_filter,
)
Expand All @@ -1301,7 +1302,7 @@ async def get_room_state(
# to get_filtered_current_state_ids above, with `state_filter = StateFilter.all()`,
# but get_filtered_current_state_ids isn't cached and `get_current_state_ids`
# is, so using the latter when we can is better for perf.
state_ids = await self._store.get_current_state_ids(room_id)
state_ids = await self._storage.state.get_current_state_ids(room_id)

state_events = await self._store.get_events(state_ids.values())

Expand Down
2 changes: 1 addition & 1 deletion synapse/push/mailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ async def send_notification_mail(
user_display_name = user_id

async def _fetch_room_state(room_id: str) -> None:
room_state = await self.store.get_current_state_ids(room_id)
room_state = await self.state_storage.get_current_state_ids(room_id)
state_by_room[room_id] = room_state

# Run at most 3 of these at once: sync does 10 at a time but email
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/admin/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ class RoomStateRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self._storage = hs.get_storage()
self.clock = hs.get_clock()
self._event_serializer = hs.get_event_client_serializer()

Expand All @@ -430,7 +431,7 @@ async def on_GET(
if not ret:
raise NotFoundError("Room not found")

event_ids = await self.store.get_current_state_ids(room_id)
event_ids = await self._storage.state.get_current_state_ids(room_id)
events = await self.store.get_events(event_ids.values())
now = self.clock.time_msec()
room_state = self._event_serializer.serialize_events(events.values(), now)
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from synapse.storage.databases.main import DataStore
from synapse.storage.persist_events import EventsPersistenceStorage
from synapse.storage.purge_events import PurgeEventsStorage
from synapse.storage.state import StateGroupStorage
from synapse.storage.state import StateStorage

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -51,7 +51,7 @@ def __init__(self, hs: "HomeServer", stores: Databases):
self.main = stores.main

self.purge_events = PurgeEventsStorage(hs, stores)
self.state = StateGroupStorage(hs, stores)
self.state = StateStorage(hs, stores)

self.persistence = None
if stores.persist_events:
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _invalidate_state_caches(

# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this means a rolling restart is unwise. We should either document it, or arrange for it to be safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, as we're actually not streaming the name of the caches over replication for these, we're just sending "invalidate all current state caches kthx" instead. (_attempt_to_invalidate_cache only invalidates things on the local process)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah. yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this means a rolling restart is unwise. We should either document it, or arrange for it to be safe.


def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
Expand Down
13 changes: 13 additions & 0 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,19 @@ def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None
keyvalues={"room_id": room_id},
)

async def is_room_got_partial_state(self, room_id: str) -> bool:
richvdh marked this conversation as resolved.
Show resolved Hide resolved
"Whether the given room only has partial state stored"
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

entry = await self.db_pool.simple_select_one_onecol(
table="partial_state_rooms",
keyvalues={"room_id": room_id},
retcol="room_id",
allow_none=True,
desc="is_room_got_partial_state",
)

return entry is not None


class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
Expand Down
Loading