Skip to content

Commit

Permalink
Add stream_ordering sort to Sliding Sync /sync (#17293)
Browse files Browse the repository at this point in the history
Sort is no longer configurable and we always sort rooms by the `stream_ordering` of the last event in the room or the point where the user can see up to in cases of leave/ban/invite/knock.
  • Loading branch information
MadLittleMods authored Jun 17, 2024
1 parent e88332b commit e5b8a3e
Show file tree
Hide file tree
Showing 9 changed files with 459 additions and 121 deletions.
1 change: 1 addition & 0 deletions changelog.d/17293.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `stream_ordering` sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async def get_state_events(

if at_token:
last_event_id = (
await self.store.get_last_event_in_room_before_stream_ordering(
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=at_token.room_key,
)
Expand Down
158 changes: 129 additions & 29 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@
#
#
import logging
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

from immutabledict import immutabledict

from synapse.api.constants import AccountDataTypes, Membership
from synapse.events import EventBase
from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
from synapse.storage.roommember import RoomsForUser
from synapse.types import (
PersistedEventPosition,
Requester,
RoomStreamToken,
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult

if TYPE_CHECKING:
Expand All @@ -33,6 +40,27 @@
logger = logging.getLogger(__name__)


def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser:
"""
Quick helper to convert an event to a `RoomsForUser` object.
"""
# These fields should be present for all persisted events
assert event.internal_metadata.stream_ordering is not None
assert event.internal_metadata.instance_name is not None

return RoomsForUser(
room_id=event.room_id,
sender=event.sender,
membership=event.membership,
event_id=event.event_id,
event_pos=PersistedEventPosition(
event.internal_metadata.instance_name,
event.internal_metadata.stream_ordering,
),
room_version_id=event.room_version.identifier,
)


def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
"""
Returns True if the membership event should be included in the sync response,
Expand Down Expand Up @@ -169,26 +197,28 @@ async def current_sync_for_user(
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()

# Get all of the room IDs that the user should be able to see in the sync
# response
room_id_set = await self.get_sync_room_ids_for_user(
sync_config.user,
from_token=from_token,
to_token=to_token,
)

# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
# response
sync_room_map = await self.get_sync_room_ids_for_user(
sync_config.user,
from_token=from_token,
to_token=to_token,
)

for list_key, list_config in sync_config.lists.items():
# Apply filters
filtered_room_ids = room_id_set
filtered_sync_room_map = sync_room_map
if list_config.filters is not None:
filtered_room_ids = await self.filter_rooms(
sync_config.user, room_id_set, list_config.filters, to_token
filtered_sync_room_map = await self.filter_rooms(
sync_config.user, sync_room_map, list_config.filters, to_token
)
# TODO: Apply sorts
sorted_room_ids = sorted(filtered_room_ids)

sorted_room_info = await self.sort_rooms(
filtered_sync_room_map, to_token
)

ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
Expand All @@ -197,12 +227,17 @@ async def current_sync_for_user(
SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC,
range=range,
room_ids=sorted_room_ids[range[0] : range[1]],
room_ids=[
room_id
for room_id, _ in sorted_room_info[
range[0] : range[1]
]
],
)
)

lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_ids),
count=len(sorted_room_info),
ops=ops,
)

Expand All @@ -219,7 +254,7 @@ async def get_sync_room_ids_for_user(
user: UserID,
to_token: StreamToken,
from_token: Optional[StreamToken] = None,
) -> AbstractSet[str]:
) -> Dict[str, RoomsForUser]:
"""
Fetch room IDs that should be listed for this user in the sync response (the
full room list that will be filtered, sorted, and sliced).
Expand All @@ -237,11 +272,14 @@ async def get_sync_room_ids_for_user(
to tell when a room was forgotten at the moment so we can't factor it into the
from/to range.
Args:
user: User to fetch rooms for
to_token: The token to fetch rooms up to.
from_token: The point in the stream to sync from.
Returns:
A dictionary of room IDs that should be listed in the sync response along
with membership information in that room at the time of `to_token`.
"""
user_id = user.to_string()

Expand All @@ -261,11 +299,11 @@ async def get_sync_room_ids_for_user(

# If the user has never joined any rooms before, we can just return an empty list
if not room_for_user_list:
return set()
return {}

# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
room_for_user.room_id
room_for_user.room_id: room_for_user
for room_for_user in room_for_user_list
if filter_membership_for_sync(
membership=room_for_user.membership,
Expand Down Expand Up @@ -415,7 +453,9 @@ async def get_sync_room_ids_for_user(
not was_last_membership_already_included
and should_prev_membership_be_included
):
sync_room_id_set.add(room_id)
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_after_to_token
)
# 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
#
# For example, if the last membership event after the `to_token` is a "join"
Expand All @@ -426,7 +466,7 @@ async def get_sync_room_ids_for_user(
was_last_membership_already_included
and not should_prev_membership_be_included
):
sync_room_id_set.discard(room_id)
del sync_room_id_set[room_id]

# 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed
Expand Down Expand Up @@ -461,25 +501,32 @@ async def get_sync_room_ids_for_user(
# include newly_left rooms because the last event that the user should see
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
sync_room_id_set.add(room_id)
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_in_from_to_range
)

return sync_room_id_set

async def filter_rooms(
self,
user: UserID,
room_id_set: AbstractSet[str],
sync_room_map: Dict[str, RoomsForUser],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
) -> AbstractSet[str]:
) -> Dict[str, RoomsForUser]:
"""
Filter rooms based on the sync request.
Args:
user: User to filter rooms for
room_id_set: Set of room IDs to filter down
sync_room_map: Dictionary of room IDs to sort along with membership
information in the room at the time of `to_token`.
filters: Filters to apply
to_token: We filter based on the state of the room at this token
Returns:
A filtered dictionary of room IDs along with membership information in the
room at the time of `to_token`.
"""
user_id = user.to_string()

Expand All @@ -488,7 +535,7 @@ async def filter_rooms(
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`

filtered_room_id_set = set(room_id_set)
filtered_room_id_set = set(sync_room_map.keys())

# Filter for Direct-Message (DM) rooms
if filters.is_dm is not None:
Expand Down Expand Up @@ -544,4 +591,57 @@ async def filter_rooms(
if filters.not_tags:
raise NotImplementedError()

return filtered_room_id_set
# Assemble a new sync room map but only with the `filtered_room_id_set`
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}

async def sort_rooms(
self,
sync_room_map: Dict[str, RoomsForUser],
to_token: StreamToken,
) -> List[Tuple[str, RoomsForUser]]:
"""
Sort by `stream_ordering` of the last event that the user should see in the
room. `stream_ordering` is unique so we get a stable sort.
Args:
sync_room_map: Dictionary of room IDs to sort along with membership
information in the room at the time of `to_token`.
to_token: We sort based on the events in the room at this token (<= `to_token`)
Returns:
A sorted list of room IDs by `stream_ordering` along with membership information.
"""

# Assemble a map of room ID to the `stream_ordering` of the last activity that the
# user should see in the room (<= `to_token`)
last_activity_in_room_map: Dict[str, int] = {}
for room_id, room_for_user in sync_room_map.items():
# If they are fully-joined to the room, let's find the latest activity
# at/before the `to_token`.
if room_for_user.membership == Membership.JOIN:
last_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id, to_token.room_key
)
)

# If the room has no events at/before the `to_token`, this is probably a
# mistake in the code that generates the `sync_room_map` since that should
# only give us rooms that the user had membership in during the token range.
assert last_event_result is not None

_, event_pos = last_event_result

last_activity_in_room_map[room_id] = event_pos.stream
else:
# Otherwise, if the user has left/been invited/knocked/been banned from
# a room, they shouldn't see anything past that point.
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream

return sorted(
sync_room_map.items(),
# Sort by the last activity (stream_ordering) in the room
key=lambda room_info: last_activity_in_room_map[room_info[0]],
# We want descending order
reverse=True,
)
10 changes: 6 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,9 +1036,11 @@ async def get_state_at(
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
# of the stream token if there were multiple forward extremities at the time.
last_event_id = await self.store.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
last_event_id = (
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
)
)

if last_event_id:
Expand Down Expand Up @@ -1519,7 +1521,7 @@ async def _compute_state_delta_for_incremental_sync(
# We need to make sure the first event in our batch points to the
# last event in the previous batch.
last_event_id_prev_batch = (
await self.store.get_last_event_in_room_before_stream_ordering(
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=since_token.room_key,
)
Expand Down
42 changes: 36 additions & 6 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ def _f(txn: LoggingTransaction) -> Optional[Tuple[int, int, str]]:
"get_room_event_before_stream_ordering", _f
)

async def get_last_event_in_room_before_stream_ordering(
async def get_last_event_id_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
Expand All @@ -910,10 +910,38 @@ async def get_last_event_in_room_before_stream_ordering(
The ID of the most recent event, or None if there are no events in the room
before this stream ordering.
"""
last_event_result = (
await self.get_last_event_pos_in_room_before_stream_ordering(
room_id, end_token
)
)

if last_event_result:
return last_event_result[0]

return None

async def get_last_event_pos_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
) -> Optional[Tuple[str, PersistedEventPosition]]:
"""
Returns the ID and event position of the last event in a room at or before a
stream ordering.
Args:
room_id
end_token: The token used to stream from
Returns:
The ID of the most recent event and it's position, or None if there are no
events in the room before this stream ordering.
"""

def get_last_event_in_room_before_stream_ordering_txn(
def get_last_event_pos_in_room_before_stream_ordering_txn(
txn: LoggingTransaction,
) -> Optional[str]:
) -> Optional[Tuple[str, PersistedEventPosition]]:
# We're looking for the closest event at or before the token. We need to
# handle the fact that the stream token can be a vector clock (with an
# `instance_map`) and events can be persisted on different instances
Expand Down Expand Up @@ -975,13 +1003,15 @@ def get_last_event_in_room_before_stream_ordering_txn(
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
):
return event_id
return event_id, PersistedEventPosition(
instance_name, stream_ordering
)

return None

return await self.db_pool.runInteraction(
"get_last_event_in_room_before_stream_ordering",
get_last_event_in_room_before_stream_ordering_txn,
"get_last_event_pos_in_room_before_stream_ordering",
get_last_event_pos_in_room_before_stream_ordering_txn,
)

async def get_current_room_stream_token_for_room_id(
Expand Down
Loading

0 comments on commit e5b8a3e

Please sign in to comment.