Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise get_rooms_for_user (drop with_stream_ordering) #13787

Merged
merged 17 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13787.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise get rooms for user calls. Contributed by Nick @ Beeper (@fizzadar).
6 changes: 2 additions & 4 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,9 @@ async def get_user_ids_changed(
possibly_left = possibly_changed | possibly_left

# Double check if we still share rooms with the given user.
users_rooms = await self.store.get_rooms_for_users_with_stream_ordering(
possibly_left
)
users_rooms = await self.store.get_rooms_for_users(possibly_left)
for changed_user_id, entries in users_rooms.items():
if any(e.room_id in room_ids for e in entries):
if any(rid in room_ids for rid in entries):
possibly_left.discard(changed_user_id)
else:
possibly_joined.discard(changed_user_id)
Expand Down
14 changes: 4 additions & 10 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1474,16 +1474,14 @@ async def _generate_sync_entry_for_device_list(
since_token.device_list_key
)
if changed_users is not None:
result = await self.store.get_rooms_for_users_with_stream_ordering(
changed_users
)
result = await self.store.get_rooms_for_users(changed_users)

for changed_user_id, entries in result.items():
# Check if the changed user shares any rooms with the user,
# or if the changed user is the syncing user (as we always
# want to include device list updates of their own devices).
if user_id == changed_user_id or any(
e.room_id in joined_rooms for e in entries
rid in joined_rooms for rid in entries
):
users_that_have_changed.add(changed_user_id)
else:
Expand Down Expand Up @@ -1517,13 +1515,9 @@ async def _generate_sync_entry_for_device_list(
newly_left_users.update(left_users)

# Remove any users that we still share a room with.
left_users_rooms = (
await self.store.get_rooms_for_users_with_stream_ordering(
newly_left_users
)
)
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
for user_id, entries in left_users_rooms.items():
if any(e.room_id in joined_rooms for e in entries):
if any(rid in joined_rooms for rid in entries):
newly_left_users.discard(user_id)

return DeviceListUpdates(
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,))
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,10 @@ def _update_current_state_txn(
self.store.get_rooms_for_user_with_stream_ordering.invalidate,
(member,),
)
txn.call_after(
self.store.get_rooms_for_user.invalidate,
(member,),
)

self.store._invalidate_state_caches_and_stream(
txn, room_id, members_changed
Expand Down
117 changes: 56 additions & 61 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Callable,
Collection,
Dict,
FrozenSet,
Expand Down Expand Up @@ -52,7 +52,6 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -600,58 +599,6 @@ def _get_rooms_for_user_with_stream_ordering_txn(
for room_id, instance, stream_id in txn
)

@cachedList(
cached_method_name="get_rooms_for_user_with_stream_ordering",
list_name="user_ids",
)
async def get_rooms_for_users_with_stream_ordering(
self, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
"""A batched version of `get_rooms_for_user_with_stream_ordering`.

Returns:
Map from user_id to set of rooms that is currently in.
"""
return await self.db_pool.runInteraction(
"get_rooms_for_users_with_stream_ordering",
self._get_rooms_for_users_with_stream_ordering_txn,
user_ids,
)

def _get_rooms_for_users_with_stream_ordering_txn(
self, txn: LoggingTransaction, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:

clause, args = make_in_list_sql_clause(
self.database_engine,
"c.state_key",
user_ids,
)

sql = f"""
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.membership = ?
AND {clause}
"""

txn.execute(sql, [Membership.JOIN] + args)

result: Dict[str, Set[GetRoomsForUserWithStreamOrdering]] = {
user_id: set() for user_id in user_ids
}
for user_id, room_id, instance, stream_id in txn:
result[user_id].add(
GetRoomsForUserWithStreamOrdering(
room_id, PersistedEventPosition(instance, stream_id)
)
)

return {user_id: frozenset(v) for user_id, v in result.items()}

async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
) -> Set[str]:
Expand Down Expand Up @@ -687,19 +634,67 @@ def _get_users_server_still_shares_room_with_txn(
_get_users_server_still_shares_room_with_txn,
)

@cancellable
async def get_rooms_for_user(
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
) -> FrozenSet[str]:
@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]:
"""Returns a set of room_ids the user is currently joined to.

If a remote user only returns rooms this server is currently
participating in.
"""
rooms = await self.get_rooms_for_user_with_stream_ordering(
user_id, on_invalidate=on_invalidate
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
(user_id,),
None,
update_metrics=False,
)
return frozenset(r.room_id for r in rooms)
if rooms:
return frozenset(r.room_id for r in rooms)

room_ids = await self.db_pool.simple_select_onecol(
table="current_state_events",
keyvalues={
"type": EventTypes.Member,
"membership": Membership.JOIN,
"state_key": user_id,
},
retcol="room_id",
desc="get_rooms_for_user",
)

return frozenset(room_ids)

@cachedList(
cached_method_name="get_rooms_for_user",
list_name="user_ids",
)
async def get_rooms_for_users(
self, user_ids: Collection[str]
) -> Dict[str, FrozenSet[str]]:
"""A batched version of `get_rooms_for_user`.

Returns:
Map from user_id to set of rooms that is currently in.
"""

rows = await self.db_pool.simple_select_many_batch(
table="current_state_events",
column="state_key",
iterable=user_ids,
retcols=(
"user_id",
"room_id",
),
keyvalues={
"type": EventTypes.Member,
"membership": Membership.JOIN,
},
desc="get_rooms_for_users",
)

user_rooms: Dict[str, Set[str]] = defaultdict(set)
for row in rows:
user_rooms[row["user_id"]].add(row["room_id"])

return {key: frozenset(rooms) for key, rooms in user_rooms.items()}

@cached(max_entries=10000)
async def does_pair_of_users_share_a_room(
Expand Down
1 change: 1 addition & 0 deletions tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def test_unknown_room_version(self):

# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store.get_rooms_for_user.invalidate_all()
self.get_success(self.store._get_event_cache.clear())
self.store._event_ref.clear()

Expand Down