Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream_ordering sort to Sliding Sync /sync #17293

Merged
merged 34 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ce45cc1
Sliding sync sort stub
MadLittleMods Jun 10, 2024
75b701f
Add changelog
MadLittleMods Jun 10, 2024
c8a240f
Prefer `? < a AND a <= ?`
MadLittleMods Jun 11, 2024
2a82ac0
Fix `get_last_event_in_room_before_stream_ordering(...)` not finding …
MadLittleMods Jun 12, 2024
2e1d142
Add actual guranteed order for UNION
MadLittleMods Jun 12, 2024
b1af992
Some clean-up
MadLittleMods Jun 12, 2024
901ce62
Try to better explain why
MadLittleMods Jun 12, 2024
87ad458
Fix `get_last_event_in_room_before_stream_ordering(...)` not finding …
MadLittleMods Jun 12, 2024
431b31e
Add actual guranteed order for UNION
MadLittleMods Jun 12, 2024
d7f40ae
Try to better explain why
MadLittleMods Jun 12, 2024
a8056ae
Add changelog
MadLittleMods Jun 12, 2024
3f317a9
We're actually using sub-query syntax so we can ORDER each query
MadLittleMods Jun 12, 2024
54bdc0c
Fix invalid syntax
MadLittleMods Jun 12, 2024
4d585b6
Merge branch 'develop' into madlittlemods/fix-and-tests-for-get_last_…
MadLittleMods Jun 12, 2024
42f24de
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 12, 2024
03547b0
Merge branch 'madlittlemods/fix-and-tests-for-get_last_event_in_room_…
MadLittleMods Jun 12, 2024
c94550d
Add `event.internal_metadata.instance_name` and event position to `ge…
MadLittleMods Jun 12, 2024
afb6627
Add rust changes for `event.internal_metadata.instance_name`
MadLittleMods Jun 12, 2024
af60f7b
First pass on `sort_rooms` and refactor to include room membership al…
MadLittleMods Jun 12, 2024
bd49c34
Add some tests
MadLittleMods Jun 12, 2024
5060588
Fix newly_left not being added back if we returned early (when `membe…
MadLittleMods Jun 12, 2024
5243a30
Fix ban test case
MadLittleMods Jun 12, 2024
d5929f1
Adjust wording
MadLittleMods Jun 12, 2024
8935c6c
Fix lints
MadLittleMods Jun 12, 2024
93aa4ff
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 12, 2024
185e0b5
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 13, 2024
8244b25
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 13, 2024
35808b3
Fix filtering
MadLittleMods Jun 13, 2024
a917eda
No more sort option
MadLittleMods Jun 13, 2024
84eaeea
Add rest test
MadLittleMods Jun 13, 2024
99ed012
Update changelog
MadLittleMods Jun 13, 2024
7d80418
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods Jun 17, 2024
ef92f3c
Stable sort with just `stream_ordering`
MadLittleMods Jun 17, 2024
63ff8f9
Rename to `get_last_event_pos_in_room_before_stream_ordering(...)`
MadLittleMods Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17293.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add recency sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
1 change: 1 addition & 0 deletions changelog.d/17295.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix edge case in `/sync` returning the wrong the state when using sharded event persisters.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
21 changes: 20 additions & 1 deletion synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async def current_sync_for_user(
# `["m.room.member", "$LAZY"]`
filtered_room_ids = room_id_set
# TODO: Apply sorts
sorted_room_ids = sorted(filtered_room_ids)
sorted_room_ids = await self.sort_rooms(filtered_room_ids, to_token)

ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
Expand Down Expand Up @@ -439,3 +439,22 @@ async def get_sync_room_ids_for_user(
sync_room_id_set.add(room_id)

return sync_room_id_set

async def sort_rooms(
self,
room_id_set: AbstractSet[str],
to_token: StreamToken,
) -> List[str]:
"""
Sort by recency of the last event in the room (stream_ordering). In order to get
a stable sort, we tie-break by room ID.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

Args:
room_id_set: Set of room IDs to sort
to_token: We sort based on the events in the room at this token
"""
# TODO: `get_last_event_in_room_before_stream_ordering()`

# TODO: Handle when people are left/banned from the room and shouldn't see past that point

return list(room_id_set)
32 changes: 22 additions & 10 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,12 +914,23 @@ async def get_last_event_in_room_before_stream_ordering(
def get_last_event_in_room_before_stream_ordering_txn(
txn: LoggingTransaction,
) -> Optional[str]:
# We need to handle the fact that the stream tokens can be vector
# clocks. We do this by getting all rows between the minimum and
# maximum stream ordering in the token, plus one row less than the
# minimum stream ordering. We then filter the results against the
# token and return the first row that matches.

# We're looking for the closest event at or before the token. We need to
# handle the fact that the stream token can be a vector clock (with an
# `instance_map`) and events can be persisted on different instances
# (sharded event persisters). The first subquery handles the events that
# would be within the vector clock and gets all rows between the minimum and
# maximum stream ordering in the token which need to be filtered against the
# `instance_map`. The second subquery handles the "before" case and finds a
# row before the token. We then filter out any results past the token's
# vector clock and return the first row that matches.
min_stream = end_token.stream
max_stream = end_token.get_max_stream_pos()

# We use `union all` because we don't need any of the deduplication logic
# (`union` is really a union + distinct). `UNION ALL`` does preserve the
# ordering of the operand queries but there is no actual gurantee that it
# has this behavior in all scenarios so we need the extra `ORDER BY` at the
# bottom.
sql = """
SELECT * FROM (
SELECT instance_name, stream_ordering, topological_ordering, event_id
Expand All @@ -931,7 +942,7 @@ def get_last_event_in_room_before_stream_ordering_txn(
AND rejections.event_id IS NULL
ORDER BY stream_ordering DESC
) AS a
UNION
UNION ALL
SELECT * FROM (
SELECT instance_name, stream_ordering, topological_ordering, event_id
FROM events
Expand All @@ -943,15 +954,16 @@ def get_last_event_in_room_before_stream_ordering_txn(
ORDER BY stream_ordering DESC
LIMIT 1
) AS b
ORDER BY stream_ordering DESC
"""
txn.execute(
sql,
(
room_id,
end_token.stream,
end_token.get_max_stream_pos(),
min_stream,
max_stream,
room_id,
end_token.stream,
min_stream,
),
)

Expand Down
15 changes: 0 additions & 15 deletions synapse/types/rest/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,6 @@ class SlidingSyncList(CommonRoomParameters):
ranges: Sliding window ranges. If this field is missing, no sliding window
is used and all rooms are returned in this list. Integers are
*inclusive*.
sort: How the list should be sorted on the server. The first value is
applied first, then tiebreaks are performed with each subsequent sort
listed.

FIXME: Furthermore, it's not currently defined how servers should behave
if they encounter a filter or sort operation they do not recognise. If
the server rejects the request with an HTTP 400 then that will break
backwards compatibility with new clients vs old servers. However, the
client would be otherwise unaware that only some of the sort/filter
operations have taken effect. We may need to include a "warnings"
section to indicate which sort/filter operations are unrecognised,
allowing for some form of graceful degradation of service.
-- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions

slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with
sliding windows). When true, the `ranges` and `sort` fields are ignored.
required_state: Required state for each room returned. An array of event
Expand Down Expand Up @@ -253,7 +239,6 @@ class Filters(RequestBodyModel):
ranges: Optional[List[Tuple[int, int]]] = None
else:
ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None # type: ignore[valid-type]
sort: Optional[List[StrictStr]] = None
slow_get_all_rooms: Optional[StrictBool] = False
include_heroes: Optional[StrictBool] = False
filters: Optional[Filters] = None
Expand Down
3 changes: 0 additions & 3 deletions tests/handlers/test_sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,6 @@ def test_sharded_event_persisters(self) -> None:

# Get a token while things are stuck after our activity
stuck_activity_token = self.event_sources.get_current_token()
logger.info("stuck_activity_token %s", stuck_activity_token)
# Let's make sure we're working with a token that has an `instance_map`
self.assertNotEqual(len(stuck_activity_token.room_key.instance_map), 0)

Expand All @@ -1057,7 +1056,6 @@ def test_sharded_event_persisters(self) -> None:
join_on_worker2_pos = self.get_success(
self.store.get_position_for_event(join_on_worker2_response["event_id"])
)
logger.info("join_on_worker2_pos %s", join_on_worker2_pos)
# Ensure the join technially came after our token
self.assertGreater(
join_on_worker2_pos.stream,
Expand All @@ -1076,7 +1074,6 @@ def test_sharded_event_persisters(self) -> None:
join_on_worker3_pos = self.get_success(
self.store.get_position_for_event(join_on_worker3_response["event_id"])
)
logger.info("join_on_worker3_pos %s", join_on_worker3_pos)
# Ensure the join came after the min but still encapsulated by the token
self.assertGreaterEqual(
join_on_worker3_pos.stream,
Expand Down
Loading
Loading