Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise get_rooms_for_user (drop with_stream_ordering) #13787

Merged
merged 17 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/13787.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Optimise get rooms for user calls. Contributed by Nick @ Beeper (@fizzadar).

Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 2 additions & 4 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,9 @@ async def get_user_ids_changed(
possibly_left = possibly_changed | possibly_left

# Double check if we still share rooms with the given user.
users_rooms = await self.store.get_rooms_for_users_with_stream_ordering(
possibly_left
)
users_rooms = await self.store.get_rooms_for_users(possibly_left)
for changed_user_id, entries in users_rooms.items():
if any(e.room_id in room_ids for e in entries):
if any(rid in room_ids for rid in entries):
possibly_left.discard(changed_user_id)
else:
possibly_joined.discard(changed_user_id)
Expand Down
14 changes: 4 additions & 10 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1474,16 +1474,14 @@ async def _generate_sync_entry_for_device_list(
since_token.device_list_key
)
if changed_users is not None:
result = await self.store.get_rooms_for_users_with_stream_ordering(
changed_users
)
result = await self.store.get_rooms_for_users(changed_users)

for changed_user_id, entries in result.items():
# Check if the changed user shares any rooms with the user,
# or if the changed user is the syncing user (as we always
# want to include device list updates of their own devices).
if user_id == changed_user_id or any(
e.room_id in joined_rooms for e in entries
rid in joined_rooms for rid in entries
):
users_that_have_changed.add(changed_user_id)
else:
Expand Down Expand Up @@ -1517,13 +1515,9 @@ async def _generate_sync_entry_for_device_list(
newly_left_users.update(left_users)

# Remove any users that we still share a room with.
left_users_rooms = (
await self.store.get_rooms_for_users_with_stream_ordering(
newly_left_users
)
)
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
for user_id, entries in left_users_rooms.items():
if any(e.room_id in joined_rooms for e in entries):
if any(rid in joined_rooms for rid in entries):
newly_left_users.discard(user_id)

return DeviceListUpdates(
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,))
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,10 @@ def _update_current_state_txn(
self.store.get_rooms_for_user_with_stream_ordering.invalidate,
(member,),
)
txn.call_after(
self.store.get_rooms_for_user.invalidate,
(member,),
)

self.store._invalidate_state_caches_and_stream(
txn, room_id, members_changed
Expand Down
135 changes: 74 additions & 61 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import logging
from typing import (
TYPE_CHECKING,
Callable,
Collection,
Dict,
FrozenSet,
Expand Down Expand Up @@ -52,7 +51,6 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -583,58 +581,6 @@ def _get_rooms_for_user_with_stream_ordering_txn(
for room_id, instance, stream_id in txn
)

@cachedList(
cached_method_name="get_rooms_for_user_with_stream_ordering",
list_name="user_ids",
)
async def get_rooms_for_users_with_stream_ordering(
self, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
"""A batched version of `get_rooms_for_user_with_stream_ordering`.

Returns:
Map from user_id to set of rooms that is currently in.
"""
return await self.db_pool.runInteraction(
"get_rooms_for_users_with_stream_ordering",
self._get_rooms_for_users_with_stream_ordering_txn,
user_ids,
)

def _get_rooms_for_users_with_stream_ordering_txn(
self, txn: LoggingTransaction, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:

clause, args = make_in_list_sql_clause(
self.database_engine,
"c.state_key",
user_ids,
)

sql = f"""
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.membership = ?
AND {clause}
"""

txn.execute(sql, [Membership.JOIN] + args)

result: Dict[str, Set[GetRoomsForUserWithStreamOrdering]] = {
user_id: set() for user_id in user_ids
}
for user_id, room_id, instance, stream_id in txn:
result[user_id].add(
GetRoomsForUserWithStreamOrdering(
room_id, PersistedEventPosition(instance, stream_id)
)
)

return {user_id: frozenset(v) for user_id, v in result.items()}

async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
) -> Set[str]:
Expand Down Expand Up @@ -670,19 +616,86 @@ def _get_users_server_still_shares_room_with_txn(
_get_users_server_still_shares_room_with_txn,
)

@cancellable
async def get_rooms_for_user(
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
) -> FrozenSet[str]:
@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]:
"""Returns a set of room_ids the user is currently joined to.

If a remote user only returns rooms this server is currently
participating in.
"""
rooms = await self.get_rooms_for_user_with_stream_ordering(
user_id, on_invalidate=on_invalidate
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
(user_id,),
None,
update_metrics=False,
)
if rooms:
return frozenset(r.room_id for r in rooms)

return await self.db_pool.runInteraction(
"get_rooms_for_user",
self._get_rooms_for_user_txn,
user_id,
)

def _get_rooms_for_user_txn(
self, txn: LoggingTransaction, user_id: str
) -> FrozenSet[str]:
sql = """
SELECT room_id
FROM current_state_events AS c
WHERE
c.type = 'm.room.member'
AND c.state_key = ?
AND c.membership = ?
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may as well just use simple_select_onecol here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


txn.execute(sql, (user_id, Membership.JOIN))
return frozenset(row[0] for row in txn)

@cachedList(
cached_method_name="get_rooms_for_user",
list_name="user_ids",
)
async def get_rooms_for_users(
self, user_ids: Collection[str]
) -> Dict[str, FrozenSet[str]]:
"""A batched version of `get_rooms_for_user`.

Returns:
Map from user_id to set of rooms that is currently in.
"""
return await self.db_pool.runInteraction(
"get_rooms_for_users",
self._get_rooms_for_users_txn,
user_ids,
)
return frozenset(r.room_id for r in rooms)

def _get_rooms_for_users_txn(
self, txn: LoggingTransaction, user_ids: Collection[str]
) -> Dict[str, FrozenSet[str]]:

clause, args = make_in_list_sql_clause(
self.database_engine,
"c.state_key",
user_ids,
)

sql = f"""
SELECT c.state_key, room_id
FROM current_state_events AS c
WHERE
c.type = 'm.room.member'
AND c.membership = ?
AND {clause}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, I think we can use simple_select_many_batch to simplify things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""

txn.execute(sql, [Membership.JOIN] + args)

result: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids}
for user_id, room_id in txn:
result[user_id].add(room_id)

return {user_id: frozenset(v) for user_id, v in result.items()}

@cached(max_entries=10000)
async def does_pair_of_users_share_a_room(
Expand Down
1 change: 1 addition & 0 deletions tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def test_unknown_room_version(self):

# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store.get_rooms_for_user.invalidate_all()
self.get_success(self.store._get_event_cache.clear())
self.store._event_ref.clear()

Expand Down