Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use get_current_users_in_room from store and not StateHandler #9910

Merged
merged 8 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9910.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession.
1 change: 1 addition & 0 deletions changelog.d/9910.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance after joining a large room when presence is enabled.
4 changes: 2 additions & 2 deletions synapse/handlers/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def _create_association(
# TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association.
if not servers:
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
servers = {get_domain_from_id(u) for u in users}

if not servers:
Expand Down Expand Up @@ -270,7 +270,7 @@ async def get_association(self, room_alias: RoomAlias) -> JsonDict:
Codes.NOT_FOUND,
)

users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
extra_servers = {get_domain_from_id(u) for u in users}
servers = set(extra_servers) | set(servers)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async def get_stream(
# Send down presence.
if event.state_key == auth_user_id:
# Send down presence for everyone in the room.
users = await self.state.get_current_users_in_room(
users = await self.store.get_users_in_room(
event.room_id
) # type: Iterable[str]
else:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ async def get_joined_members(self, requester: Requester, room_id: str) -> dict:
"Getting joined members after leaving is not implemented"
)

users_with_profile = await self.state.get_current_users_in_room(room_id)
users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)

# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ async def _on_user_joined_room(

remote_host = get_domain_from_id(user_id)

users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, users))

states_d = await self.current_state_for_users(user_ids)
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,7 @@ async def shutdown_room(
new_room_id = None
logger.info("Shutting down room %r", room_id)

users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ async def _generate_sync_entry_for_device_list(

# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
joined_users = await self.state.get_current_users_in_room(room_id)
joined_users = await self.store.get_users_in_room(room_id)
newly_joined_or_invited_users.update(joined_users)

# TODO: Check that these users are actually new, i.e. either they
Expand All @@ -1206,7 +1206,7 @@ async def _generate_sync_entry_for_device_list(

# Now find users that we no longer track
for room_id in newly_left_rooms:
left_users = await self.state.get_current_users_in_room(room_id)
left_users = await self.store.get_users_in_room(room_id)
newly_left_users.update(left_users)

# Remove any users that we still share a room with.
Expand Down Expand Up @@ -1361,7 +1361,7 @@ async def _generate_sync_entry_for_presence(

extra_users_ids = set(newly_joined_or_invited_users)
for room_id in newly_joined_rooms:
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())

Expand Down
10 changes: 7 additions & 3 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,23 @@ async def get_current_state_ids(
return ret.state

async def get_current_users_in_room(
self, room_id: str, latest_event_ids: Optional[List[str]] = None
self, room_id: str, latest_event_ids: List[str]
) -> Dict[str, ProfileInfo]:
"""
Get the users who are currently in a room.

Note: This is much slower than using the equivalent method
`DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`,
so this should only be used when wanting the users at a particular point
in the room.

Args:
room_id: The ID of the room.
latest_event_ids: Precomputed list of latest event IDs. Will be computed if None.
Returns:
Dictionary of user IDs to their profileinfo.
"""
if not latest_event_ids:
latest_event_ids = await self.store.get_latest_event_ids_in_room(room_id)

assert latest_event_ids is not None

logger.debug("calling resolve_state_groups from get_current_users_in_room")
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))

self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, err, we didn't ever invalidate this cache. I think we only ever used this function when handling visibility changes in the user directory, so I think its probably fairly unlikely that this bit anyone...

await self.store.get_users_in_room_with_profiles(room_id)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the question is whether we want to pull this into a separate PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"meh". I woudn't bother.

You could always stick in a .bugfix as well as a .feature newsfile if you're feeling keen.

self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))

Expand Down
8 changes: 6 additions & 2 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,12 @@ async def get_users_in_room_with_profiles(

def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]:
sql = """
SELECT user_id, display_name, avatar_url FROM room_memberships
WHERE room_id = ? AND membership = ?
SELECT state_key, display_name, avatar_url FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
"""
txn.execute(sql, (room_id, Membership.JOIN))

Expand Down
4 changes: 1 addition & 3 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ async def _populate_user_directory_process_rooms(self, progress, batch_size):
batch_size (int): Maximum number of state events to process
per cycle.
"""
state = self.hs.get_state_handler()

# If we don't have progress filed, delete everything.
if not progress:
await self.delete_all_from_user_dir()
Expand Down Expand Up @@ -197,7 +195,7 @@ def _get_next_batch(txn):
room_id
)

users_with_profile = await state.get_current_users_in_room(room_id)
users_with_profile = await self.get_users_in_room_with_profiles(room_id)
user_ids = set(users_with_profile)

# Update each user in the user directory.
Expand Down