Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream_ordering sort to Sliding Sync /sync #17293

Merged
merged 34 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ce45cc1
Sliding sync sort stub
MadLittleMods Jun 10, 2024
75b701f
Add changelog
MadLittleMods Jun 10, 2024
c8a240f
Prefer `? < a AND a <= ?`
MadLittleMods Jun 11, 2024
2a82ac0
Fix `get_last_event_in_room_before_stream_ordering(...)` not finding …
MadLittleMods Jun 12, 2024
2e1d142
Add actual guranteed order for UNION
MadLittleMods Jun 12, 2024
b1af992
Some clean-up
MadLittleMods Jun 12, 2024
901ce62
Try to better explain why
MadLittleMods Jun 12, 2024
87ad458
Fix `get_last_event_in_room_before_stream_ordering(...)` not finding …
MadLittleMods Jun 12, 2024
431b31e
Add actual guranteed order for UNION
MadLittleMods Jun 12, 2024
d7f40ae
Try to better explain why
MadLittleMods Jun 12, 2024
a8056ae
Add changelog
MadLittleMods Jun 12, 2024
3f317a9
We're actually using sub-query syntax so we can ORDER each query
MadLittleMods Jun 12, 2024
54bdc0c
Fix invalid syntax
MadLittleMods Jun 12, 2024
4d585b6
Merge branch 'develop' into madlittlemods/fix-and-tests-for-get_last_…
MadLittleMods Jun 12, 2024
42f24de
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 12, 2024
03547b0
Merge branch 'madlittlemods/fix-and-tests-for-get_last_event_in_room_…
MadLittleMods Jun 12, 2024
c94550d
Add `event.internal_metadata.instance_name` and event position to `ge…
MadLittleMods Jun 12, 2024
afb6627
Add rust changes for `event.internal_metadata.instance_name`
MadLittleMods Jun 12, 2024
af60f7b
First pass on `sort_rooms` and refactor to include room membership al…
MadLittleMods Jun 12, 2024
bd49c34
Add some tests
MadLittleMods Jun 12, 2024
5060588
Fix newly_left not being added back if we returned early (when `membe…
MadLittleMods Jun 12, 2024
5243a30
Fix ban test case
MadLittleMods Jun 12, 2024
d5929f1
Adjust wording
MadLittleMods Jun 12, 2024
8935c6c
Fix lints
MadLittleMods Jun 12, 2024
93aa4ff
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 12, 2024
185e0b5
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 13, 2024
8244b25
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 13, 2024
35808b3
Fix filtering
MadLittleMods Jun 13, 2024
a917eda
No more sort option
MadLittleMods Jun 13, 2024
84eaeea
Add rest test
MadLittleMods Jun 13, 2024
99ed012
Update changelog
MadLittleMods Jun 13, 2024
7d80418
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 17, 2024
ef92f3c
Stable sort with just `stream_ordering`
MadLittleMods Jun 17, 2024
63ff8f9
Rename to `get_last_event_pos_in_room_before_stream_ordering(...)`
MadLittleMods Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17293.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `stream_ordering` sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async def get_state_events(

if at_token:
last_event_id = (
await self.store.get_last_event_in_room_before_stream_ordering(
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=at_token.room_key,
)
Expand Down
161 changes: 132 additions & 29 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@
#
#
import logging
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

from immutabledict import immutabledict

from synapse.api.constants import AccountDataTypes, Membership
from synapse.events import EventBase
from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
from synapse.storage.roommember import RoomsForUser
from synapse.types import (
PersistedEventPosition,
Requester,
RoomStreamToken,
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult

if TYPE_CHECKING:
Expand All @@ -33,6 +40,27 @@
logger = logging.getLogger(__name__)


def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser:
"""
Quick helper to convert an event to a `RoomsForUser` object.
"""
# These fields should be present for all persisted events
assert event.internal_metadata.stream_ordering is not None
assert event.internal_metadata.instance_name is not None

return RoomsForUser(
room_id=event.room_id,
sender=event.sender,
membership=event.membership,
event_id=event.event_id,
event_pos=PersistedEventPosition(
event.internal_metadata.instance_name,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
event.internal_metadata.stream_ordering,
),
room_version_id=event.room_version.identifier,
)


def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
"""
Returns True if the membership event should be included in the sync response,
Expand Down Expand Up @@ -169,26 +197,28 @@ async def current_sync_for_user(
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()

# Get all of the room IDs that the user should be able to see in the sync
# response
room_id_set = await self.get_sync_room_ids_for_user(
sync_config.user,
from_token=from_token,
to_token=to_token,
)

# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
# response
sync_room_map = await self.get_sync_room_ids_for_user(
sync_config.user,
from_token=from_token,
to_token=to_token,
)

for list_key, list_config in sync_config.lists.items():
# Apply filters
filtered_room_ids = room_id_set
filtered_sync_room_map = sync_room_map
if list_config.filters is not None:
filtered_room_ids = await self.filter_rooms(
sync_config.user, room_id_set, list_config.filters, to_token
filtered_sync_room_map = await self.filter_rooms(
sync_config.user, sync_room_map, list_config.filters, to_token
)
# TODO: Apply sorts
sorted_room_ids = sorted(filtered_room_ids)

sorted_room_info = await self.sort_rooms(
filtered_sync_room_map, to_token
)

ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
Expand All @@ -197,12 +227,17 @@ async def current_sync_for_user(
SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC,
range=range,
room_ids=sorted_room_ids[range[0] : range[1]],
room_ids=[
room_id
for room_id, _ in sorted_room_info[
range[0] : range[1]
]
],
)
)

lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_ids),
count=len(sorted_room_info),
ops=ops,
)

Expand All @@ -219,7 +254,7 @@ async def get_sync_room_ids_for_user(
user: UserID,
to_token: StreamToken,
from_token: Optional[StreamToken] = None,
) -> AbstractSet[str]:
) -> Dict[str, RoomsForUser]:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated things so instead of just a list of room ID's, we now also return the corresponding membership information for the user in the room. This way we can share the membership info in the filtering/sorting, and soon to be room data.

"""
Fetch room IDs that should be listed for this user in the sync response (the
full room list that will be filtered, sorted, and sliced).
Expand All @@ -237,11 +272,14 @@ async def get_sync_room_ids_for_user(
to tell when a room was forgotten at the moment so we can't factor it into the
from/to range.


Args:
user: User to fetch rooms for
to_token: The token to fetch rooms up to.
from_token: The point in the stream to sync from.

Returns:
A dictionary of room IDs that should be listed in the sync response along
with membership information in that room at the time of `to_token`.
"""
user_id = user.to_string()

Expand All @@ -261,11 +299,11 @@ async def get_sync_room_ids_for_user(

# If the user has never joined any rooms before, we can just return an empty list
if not room_for_user_list:
return set()
return {}

# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
room_for_user.room_id
room_for_user.room_id: room_for_user
for room_for_user in room_for_user_list
if filter_membership_for_sync(
membership=room_for_user.membership,
Expand Down Expand Up @@ -415,7 +453,9 @@ async def get_sync_room_ids_for_user(
not was_last_membership_already_included
and should_prev_membership_be_included
):
sync_room_id_set.add(room_id)
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_after_to_token
)
# 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
#
# For example, if the last membership event after the `to_token` is a "join"
Expand All @@ -426,7 +466,7 @@ async def get_sync_room_ids_for_user(
was_last_membership_already_included
and not should_prev_membership_be_included
):
sync_room_id_set.discard(room_id)
del sync_room_id_set[room_id]

# 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed
Expand Down Expand Up @@ -461,25 +501,32 @@ async def get_sync_room_ids_for_user(
# include newly_left rooms because the last event that the user should see
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
sync_room_id_set.add(room_id)
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_in_from_to_range
)

return sync_room_id_set

async def filter_rooms(
self,
user: UserID,
room_id_set: AbstractSet[str],
sync_room_map: Dict[str, RoomsForUser],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
) -> AbstractSet[str]:
) -> Dict[str, RoomsForUser]:
"""
Filter rooms based on the sync request.

Args:
user: User to filter rooms for
room_id_set: Set of room IDs to filter down
sync_room_map: Dictionary of room IDs to sort along with membership
information in the room at the time of `to_token`.
filters: Filters to apply
to_token: We filter based on the state of the room at this token

Returns:
A filtered dictionary of room IDs along with membership information in the
room at the time of `to_token`.
"""
user_id = user.to_string()

Expand All @@ -488,7 +535,7 @@ async def filter_rooms(
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`

filtered_room_id_set = set(room_id_set)
filtered_room_id_set = set(sync_room_map.keys())

# Filter for Direct-Message (DM) rooms
if filters.is_dm is not None:
Expand Down Expand Up @@ -544,4 +591,60 @@ async def filter_rooms(
if filters.not_tags:
raise NotImplementedError()

return filtered_room_id_set
# Assemble a new sync room map but only with the `filtered_room_id_set`
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}

async def sort_rooms(
self,
sync_room_map: Dict[str, RoomsForUser],
to_token: StreamToken,
) -> List[Tuple[str, RoomsForUser]]:
"""
Sort by stream_ordering of the last event in the room. In order to get
a stable sort, we tie-break by room ID.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

Args:
sync_room_map: Dictionary of room IDs to sort along with membership
information in the room at the time of `to_token`.
to_token: We sort based on the events in the room at this token (<= `to_token`)

Returns:
A sorted list of room IDs by stream_ordering along with membership information.
"""

# Assemble a map of room ID to the `stream_ordering` of the last activity that the
# user should see in the room (<= `to_token`)
last_activity_in_room_map: Dict[str, int] = {}
for room_id, room_for_user in sync_room_map.items():
# If they are fully-joined to the room, let's find the latest activity
# at/before the `to_token`.
if room_for_user.membership == Membership.JOIN:
last_event_result = (
await self.store.get_last_event_in_room_before_stream_ordering(
room_id, to_token.room_key
)
)

# If the room has no events at/before the `to_token`, this is probably a
# mistake in the code that generates the `sync_room_map` since that should
# only give us rooms that the user had membership in during the token range.
assert last_event_result is not None

_, event_pos = last_event_result

last_activity_in_room_map[room_id] = event_pos.stream
else:
# Otherwise, if the user has left/been invited/knocked/been banned from
# a room, they shouldn't see anything past that point.
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream

return sorted(
sync_room_map.items(),
# Sort by the last activity (stream_ordering) in the room, tie-break on room_id
key=lambda room_info: (
last_activity_in_room_map[room_info[0]],
room_info[0],
),
# We want descending order
reverse=True,
)
10 changes: 6 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,9 +1038,11 @@ async def get_state_at(
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
# of the stream token if there were multiple forward extremities at the time.
last_event_id = await self.store.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
last_event_id = (
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
)
)

if last_event_id:
Expand Down Expand Up @@ -1521,7 +1523,7 @@ async def _compute_state_delta_for_incremental_sync(
# We need to make sure the first event in our batch points to the
# last event in the previous batch.
last_event_id_prev_batch = (
await self.store.get_last_event_in_room_before_stream_ordering(
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=since_token.room_key,
)
Expand Down
34 changes: 31 additions & 3 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ def _f(txn: LoggingTransaction) -> Optional[Tuple[int, int, str]]:
"get_room_event_before_stream_ordering", _f
)

async def get_last_event_in_room_before_stream_ordering(
async def get_last_event_id_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
Expand All @@ -910,10 +910,36 @@ async def get_last_event_in_room_before_stream_ordering(
The ID of the most recent event, or None if there are no events in the room
before this stream ordering.
"""
last_event_result = await self.get_last_event_in_room_before_stream_ordering(
room_id, end_token
)

if last_event_result:
return last_event_result[0]

return None

async def get_last_event_in_room_before_stream_ordering(
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
self,
room_id: str,
end_token: RoomStreamToken,
) -> Optional[Tuple[str, PersistedEventPosition]]:
"""
Returns the ID and event position of the last event in a room at or before a
stream ordering.

Args:
room_id
end_token: The token used to stream from

Returns:
The ID of the most recent event and it's position, or None if there are no
events in the room before this stream ordering.
"""

def get_last_event_in_room_before_stream_ordering_txn(
txn: LoggingTransaction,
) -> Optional[str]:
) -> Optional[Tuple[str, PersistedEventPosition]]:
# We're looking for the closest event at or before the token. We need to
# handle the fact that the stream token can be a vector clock (with an
# `instance_map`) and events can be persisted on different instances
Expand Down Expand Up @@ -975,7 +1001,9 @@ def get_last_event_in_room_before_stream_ordering_txn(
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
):
return event_id
return event_id, PersistedEventPosition(
instance_name, stream_ordering
)

return None

Expand Down
Loading
Loading