Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream_ordering sort to Sliding Sync /sync #17293

Merged
merged 34 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ce45cc1
Sliding sync sort stub
MadLittleMods Jun 10, 2024
75b701f
Add changelog
MadLittleMods Jun 10, 2024
c8a240f
Prefer `? < a AND a <= ?`
MadLittleMods Jun 11, 2024
2a82ac0
Fix `get_last_event_in_room_before_stream_ordering(...)` not finding …
MadLittleMods Jun 12, 2024
2e1d142
Add actual guranteed order for UNION
MadLittleMods Jun 12, 2024
b1af992
Some clean-up
MadLittleMods Jun 12, 2024
901ce62
Try to better explain why
MadLittleMods Jun 12, 2024
87ad458
Fix `get_last_event_in_room_before_stream_ordering(...)` not finding …
MadLittleMods Jun 12, 2024
431b31e
Add actual guranteed order for UNION
MadLittleMods Jun 12, 2024
d7f40ae
Try to better explain why
MadLittleMods Jun 12, 2024
a8056ae
Add changelog
MadLittleMods Jun 12, 2024
3f317a9
We're actually using sub-query syntax so we can ORDER each query
MadLittleMods Jun 12, 2024
54bdc0c
Fix invalid syntax
MadLittleMods Jun 12, 2024
4d585b6
Merge branch 'develop' into madlittlemods/fix-and-tests-for-get_last_…
MadLittleMods Jun 12, 2024
42f24de
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 12, 2024
03547b0
Merge branch 'madlittlemods/fix-and-tests-for-get_last_event_in_room_…
MadLittleMods Jun 12, 2024
c94550d
Add `event.internal_metadata.instance_name` and event position to `ge…
MadLittleMods Jun 12, 2024
afb6627
Add rust changes for `event.internal_metadata.instance_name`
MadLittleMods Jun 12, 2024
af60f7b
First pass on `sort_rooms` and refactor to include room membership al…
MadLittleMods Jun 12, 2024
bd49c34
Add some tests
MadLittleMods Jun 12, 2024
5060588
Fix newly_left not being added back if we returned early (when `membe…
MadLittleMods Jun 12, 2024
5243a30
Fix ban test case
MadLittleMods Jun 12, 2024
d5929f1
Adjust wording
MadLittleMods Jun 12, 2024
8935c6c
Fix lints
MadLittleMods Jun 12, 2024
93aa4ff
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 12, 2024
185e0b5
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 13, 2024
8244b25
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 13, 2024
35808b3
Fix filtering
MadLittleMods Jun 13, 2024
a917eda
No more sort option
MadLittleMods Jun 13, 2024
84eaeea
Add rest test
MadLittleMods Jun 13, 2024
99ed012
Update changelog
MadLittleMods Jun 13, 2024
7d80418
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 17, 2024
ef92f3c
Stable sort with just `stream_ordering`
MadLittleMods Jun 17, 2024
63ff8f9
Rename to `get_last_event_pos_in_room_before_stream_ordering(...)`
MadLittleMods Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17293.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add recency sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
1 change: 1 addition & 0 deletions changelog.d/17295.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix edge case in `/sync` returning the wrong the state when using sharded event persisters.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions rust/src/events/internal_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ pub struct EventInternalMetadata {
/// The stream ordering of this event. None, until it has been persisted.
#[pyo3(get, set)]
stream_ordering: Option<NonZeroI64>,
#[pyo3(get, set)]
instance_name: Option<String>,

/// whether this event is an outlier (ie, whether we have the state at that
/// point in the DAG)
Expand Down Expand Up @@ -232,6 +234,7 @@ impl EventInternalMetadata {
Ok(EventInternalMetadata {
data,
stream_ordering: None,
instance_name: None,
outlier: false,
})
}
Expand Down
2 changes: 2 additions & 0 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase:
pruned_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier

# Mark the event as redacted
Expand All @@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase:
new_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
new_event.internal_metadata.outlier = event.internal_metadata.outlier

return new_event
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async def get_state_events(

if at_token:
last_event_id = (
await self.store.get_last_event_in_room_before_stream_ordering(
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=at_token.room_key,
)
Expand Down Expand Up @@ -1551,6 +1551,7 @@ async def _persist_events(
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
event.internal_metadata.instance_name = writer_instance

return event

Expand Down
167 changes: 134 additions & 33 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@
#
#
import logging
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

from immutabledict import immutabledict

from synapse.api.constants import Membership
from synapse.events import EventBase
from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
from synapse.storage.roommember import RoomsForUser
from synapse.types import (
PersistedEventPosition,
Requester,
RoomStreamToken,
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult

if TYPE_CHECKING:
Expand All @@ -33,6 +40,27 @@
logger = logging.getLogger(__name__)


def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser:
"""
Quick helper to convert an event to a `RoomsForUser` object.
"""
# These fields should be present for all persisted events
assert event.internal_metadata.stream_ordering is not None
assert event.internal_metadata.instance_name is not None

return RoomsForUser(
room_id=event.room_id,
sender=event.sender,
membership=event.membership,
event_id=event.event_id,
event_pos=PersistedEventPosition(
event.internal_metadata.instance_name,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
event.internal_metadata.stream_ordering,
),
room_version_id=event.room_version.identifier,
)


def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
"""
Returns True if the membership event should be included in the sync response,
Expand Down Expand Up @@ -151,25 +179,25 @@ async def current_sync_for_user(
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()

# Get all of the room IDs that the user should be able to see in the sync
# response
room_id_set = await self.get_sync_room_ids_for_user(
sync_config.user,
from_token=from_token,
to_token=to_token,
)

# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
# response
sync_room_map = await self.get_sync_room_ids_for_user(
sync_config.user,
from_token=from_token,
to_token=to_token,
)

for list_key, list_config in sync_config.lists.items():
# TODO: Apply filters
#
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`
filtered_room_ids = room_id_set
filtered_room_map = sync_room_map
# TODO: Apply sorts
sorted_room_ids = sorted(filtered_room_ids)
sorted_room_info = await self.sort_rooms(filtered_room_map, to_token)

ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
Expand All @@ -178,12 +206,17 @@ async def current_sync_for_user(
SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC,
range=range,
room_ids=sorted_room_ids[range[0] : range[1]],
room_ids=[
room_id
for room_id, _ in sorted_room_info[
range[0] : range[1]
]
],
)
)

lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_ids),
count=len(sorted_room_info),
ops=ops,
)

Expand All @@ -200,7 +233,7 @@ async def get_sync_room_ids_for_user(
user: UserID,
to_token: StreamToken,
from_token: Optional[StreamToken] = None,
) -> AbstractSet[str]:
) -> Dict[str, RoomsForUser]:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated things so instead of just a list of room ID's, we now also return the corresponding membership information for the user in the room. This way we can share the membership info in the filtering/sorting, and soon to be room data.

"""
Fetch room IDs that should be listed for this user in the sync response (the
full room list that will be filtered, sorted, and sliced).
Expand All @@ -217,6 +250,15 @@ async def get_sync_room_ids_for_user(
`forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
to tell when a room was forgotten at the moment so we can't factor it into the
from/to range.

Args:
user: User to fetch rooms for
to_token: The token to fetch rooms up to.
from_token: The point in the stream to sync from.

Returns:
A dictionary of room IDs that should be listed in the sync response along
with membership information in that room at the time of `to_token`.
"""
user_id = user.to_string()

Expand All @@ -236,11 +278,11 @@ async def get_sync_room_ids_for_user(

# If the user has never joined any rooms before, we can just return an empty list
if not room_for_user_list:
return set()
return {}

# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
room_for_user.room_id
room_for_user.room_id: room_for_user
for room_for_user in room_for_user_list
if filter_membership_for_sync(
membership=room_for_user.membership,
Expand Down Expand Up @@ -275,12 +317,6 @@ async def get_sync_room_ids_for_user(
instance_map=immutabledict(instance_to_max_stream_ordering_map),
)

# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we can just straight-up return the room list (nothing has
# changed)
if membership_snapshot_token.is_before_or_eq(to_token.room_key):
return sync_room_id_set

# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
Expand All @@ -300,14 +336,20 @@ async def get_sync_room_ids_for_user(

# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
#
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed)
membership_change_events_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
membership_change_events_after_to_token = (
await self.store.get_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally,
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)
)

# 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
Expand Down Expand Up @@ -390,7 +432,9 @@ async def get_sync_room_ids_for_user(
not was_last_membership_already_included
and should_prev_membership_be_included
):
sync_room_id_set.add(room_id)
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_after_to_token
)
# 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
#
# For example, if the last membership event after the `to_token` is a "join"
Expand All @@ -401,7 +445,7 @@ async def get_sync_room_ids_for_user(
was_last_membership_already_included
and not should_prev_membership_be_included
):
sync_room_id_set.discard(room_id)
del sync_room_id_set[room_id]

# 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed
Expand Down Expand Up @@ -436,6 +480,63 @@ async def get_sync_room_ids_for_user(
# include newly_left rooms because the last event that the user should see
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
sync_room_id_set.add(room_id)
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_in_from_to_range
)

return sync_room_id_set

async def sort_rooms(
self,
sync_room_map: Dict[str, RoomsForUser],
to_token: StreamToken,
) -> List[Tuple[str, RoomsForUser]]:
"""
Sort by stream_ordering of the last event in the room. In order to get
a stable sort, we tie-break by room ID.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

Args:
sync_room_map: Dictionary of room IDs to sort along with membership
information in the room at the time of `to_token`.
to_token: We sort based on the events in the room at this token (<= `to_token`)

Returns:
A sorted list of room IDs by stream_ordering along with membership information.
"""

# Assemble a map of room ID to the `stream_ordering` of the last activity that the
# user should see in the room (<= `to_token`)
last_activity_in_room_map: Dict[str, int] = {}
for room_id, room_for_user in sync_room_map.items():
# If they are fully-joined to the room, let's find the latest activity
# at/before the `to_token`.
if room_for_user.membership == Membership.JOIN:
last_event_result = (
await self.store.get_last_event_in_room_before_stream_ordering(
room_id, to_token.room_key
)
)

# If the room has no events at/before the `to_token`, this is probably a
# mistake in the code that generates the `sync_room_map` since that should
# only give us rooms that the user had membership in during the token range.
assert last_event_result is not None

_, event_pos = last_event_result

last_activity_in_room_map[room_id] = event_pos.stream
else:
# Otherwise, if the user has left/been invited/knocked/been banned from
# a room, they shouldn't see anything past that point.
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream

return sorted(
sync_room_map.items(),
# Sort by the last activity (stream_ordering) in the room, tie-break on room_id
key=lambda room_info: (
last_activity_in_room_map[room_info[0]],
room_info[0],
),
# We want descending order
reverse=True,
)
10 changes: 6 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,9 +1038,11 @@ async def get_state_at(
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
# of the stream token if there were multiple forward extremities at the time.
last_event_id = await self.store.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
last_event_id = (
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
)
)

if last_event_id:
Expand Down Expand Up @@ -1521,7 +1523,7 @@ async def _compute_state_delta_for_incremental_sync(
# We need to make sure the first event in our batch points to the
# last event in the previous batch.
last_event_id_prev_batch = (
await self.store.get_last_event_in_room_before_stream_ordering(
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=since_token.room_key,
)
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ async def _persist_events_and_state_updates(
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name

await self.db_pool.runInteraction(
"persist_events",
Expand Down
Loading
Loading