Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use get_current_users_in_room from store and not StateHandler (#9910)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored May 5, 2021
1 parent d530500 commit d0aee69
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 17 deletions.
1 change: 1 addition & 0 deletions changelog.d/9910.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession.
1 change: 1 addition & 0 deletions changelog.d/9910.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance after joining a large room when presence is enabled.
4 changes: 2 additions & 2 deletions synapse/handlers/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def _create_association(
# TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association.
if not servers:
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
servers = {get_domain_from_id(u) for u in users}

if not servers:
Expand Down Expand Up @@ -270,7 +270,7 @@ async def get_association(self, room_alias: RoomAlias) -> JsonDict:
Codes.NOT_FOUND,
)

users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
extra_servers = {get_domain_from_id(u) for u in users}
servers = set(extra_servers) | set(servers)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async def get_stream(
# Send down presence.
if event.state_key == auth_user_id:
# Send down presence for everyone in the room.
users = await self.state.get_current_users_in_room(
users = await self.store.get_users_in_room(
event.room_id
) # type: Iterable[str]
else:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ async def get_joined_members(self, requester: Requester, room_id: str) -> dict:
"Getting joined members after leaving is not implemented"
)

users_with_profile = await self.state.get_current_users_in_room(room_id)
users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)

# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ async def _on_user_joined_room(

remote_host = get_domain_from_id(user_id)

users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, users))

states_d = await self.current_state_for_users(user_ids)
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,7 @@ async def shutdown_room(
new_room_id = None
logger.info("Shutting down room %r", room_id)

users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ async def _generate_sync_entry_for_device_list(

# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
joined_users = await self.state.get_current_users_in_room(room_id)
joined_users = await self.store.get_users_in_room(room_id)
newly_joined_or_invited_users.update(joined_users)

# TODO: Check that these users are actually new, i.e. either they
Expand All @@ -1206,7 +1206,7 @@ async def _generate_sync_entry_for_device_list(

# Now find users that we no longer track
for room_id in newly_left_rooms:
left_users = await self.state.get_current_users_in_room(room_id)
left_users = await self.store.get_users_in_room(room_id)
newly_left_users.update(left_users)

# Remove any users that we still share a room with.
Expand Down Expand Up @@ -1361,7 +1361,7 @@ async def _generate_sync_entry_for_presence(

extra_users_ids = set(newly_joined_or_invited_users)
for room_id in newly_joined_rooms:
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())

Expand Down
10 changes: 7 additions & 3 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,23 @@ async def get_current_state_ids(
return ret.state

async def get_current_users_in_room(
self, room_id: str, latest_event_ids: Optional[List[str]] = None
self, room_id: str, latest_event_ids: List[str]
) -> Dict[str, ProfileInfo]:
"""
Get the users who are currently in a room.
Note: This is much slower than using the equivalent method
`DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`,
so this should only be used when wanting the users at a particular point
in the room.
Args:
room_id: The ID of the room.
latest_event_ids: Precomputed list of latest event IDs. Will be computed if None.
Returns:
Dictionary of user IDs to their profileinfo.
"""
if not latest_event_ids:
latest_event_ids = await self.store.get_latest_event_ids_in_room(room_id)

assert latest_event_ids is not None

logger.debug("calling resolve_state_groups from get_current_users_in_room")
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))

self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))

Expand Down
8 changes: 6 additions & 2 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,12 @@ async def get_users_in_room_with_profiles(

def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]:
sql = """
SELECT user_id, display_name, avatar_url FROM room_memberships
WHERE room_id = ? AND membership = ?
SELECT state_key, display_name, avatar_url FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
"""
txn.execute(sql, (room_id, Membership.JOIN))

Expand Down
4 changes: 1 addition & 3 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ async def _populate_user_directory_process_rooms(self, progress, batch_size):
batch_size (int): Maximum number of state events to process
per cycle.
"""
state = self.hs.get_state_handler()

# If we don't have progress filed, delete everything.
if not progress:
await self.delete_all_from_user_dir()
Expand Down Expand Up @@ -197,7 +195,7 @@ def _get_next_batch(txn):
room_id
)

users_with_profile = await state.get_current_users_in_room(room_id)
users_with_profile = await self.get_users_in_room_with_profiles(room_id)
user_ids = set(users_with_profile)

# Update each user in the user directory.
Expand Down

0 comments on commit d0aee69

Please sign in to comment.