Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Reimplement get_rooms_for_user and get_rooms_for_users
Browse files Browse the repository at this point in the history
This avoids the join on `events` to get stream ordering that is mostly
unused.
  • Loading branch information
Fizzadar committed Sep 13, 2022
1 parent fa2f3d8 commit 1008106
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 7 deletions.
3 changes: 3 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
self.get_rooms_for_user.invalidate(
(data.state_key,)
)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,10 @@ def _update_current_state_txn(
self.store.get_rooms_for_user_with_stream_ordering.invalidate,
(member,),
)
txn.call_after(
self.store.get_rooms_for_user.invalidate,
(member,),
)

self.store._invalidate_state_caches_and_stream(
txn, room_id, members_changed
Expand Down
79 changes: 72 additions & 7 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import logging
from typing import (
TYPE_CHECKING,
Callable,
Collection,
Dict,
FrozenSet,
Expand Down Expand Up @@ -52,7 +51,6 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -670,19 +668,86 @@ def _get_users_server_still_shares_room_with_txn(
_get_users_server_still_shares_room_with_txn,
)

@cancellable
@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user(
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
self, user_id: str
) -> FrozenSet[str]:
"""Returns a set of room_ids the user is currently joined to.
If a remote user only returns rooms this server is currently
participating in.
"""
rooms = await self.get_rooms_for_user_with_stream_ordering(
user_id, on_invalidate=on_invalidate
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(user_id)
if rooms:
return frozenset(r.room_id for r in rooms)

return await self.db_pool.runInteraction(
"get_rooms_for_user",
self._get_rooms_for_user_txn,
user_id,
)
return frozenset(r.room_id for r in rooms)

def _get_rooms_for_user_txn(
self, txn: LoggingTransaction, user_id: str
) -> FrozenSet[str]:
sql = """
SELECT room_id
FROM current_state_events AS c
WHERE
c.type = 'm.room.member'
AND c.state_key = ?
AND c.membership = ?
"""

txn.execute(sql, (user_id, Membership.JOIN))
return frozenset(txn)

@cachedList(
cached_method_name="get_rooms_for_user",
list_name="user_ids",
)
async def get_rooms_for_users(
self, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
"""A batched version of `get_rooms_for_user`.
Returns:
Map from user_id to set of rooms that is currently in.
"""
return await self.db_pool.runInteraction(
"get_rooms_for_users",
self._get_rooms_for_users_txn,
user_ids,
)

def _get_rooms_for_users_txn(
self, txn: LoggingTransaction, user_ids: Collection[str]
) -> Dict[str, FrozenSet[str]]:

clause, args = make_in_list_sql_clause(
self.database_engine,
"c.state_key",
user_ids,
)

sql = f"""
SELECT c.state_key, room_id
FROM current_state_events AS c
WHERE
c.type = 'm.room.member'
AND c.membership = ?
AND {clause}
"""

txn.execute(sql, [Membership.JOIN] + args)

result: Dict[str, Set[str]] = {
user_id: set() for user_id in user_ids
}
for user_id, room_id in txn:
result[user_id].add(room_id)

return {user_id: frozenset(v) for user_id, v in result.items()}

@cached(max_entries=10000)
async def does_pair_of_users_share_a_room(
Expand Down

0 comments on commit 1008106

Please sign in to comment.