Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Only send rooms with updates down sliding sync #17479

Merged
merged 56 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
30263b4
Add SlidingSyncStreamToken
erikjohnston Jul 17, 2024
1ad1cce
Pass throught SlidingSyncStreamToken
erikjohnston Jul 16, 2024
e8df0d7
Don't create tokens manually in SSS tests
erikjohnston Jul 16, 2024
f3030af
Fix to use new token format
erikjohnston Jul 16, 2024
f3a4cfb
Newsfile
erikjohnston Jul 17, 2024
d44f7e1
WIP/PoC of storing whether we have sent rooms down to clients
erikjohnston Jul 15, 2024
53273db
Add conn_id field
erikjohnston Jul 17, 2024
e2a88e4
Use new room store to track if we've sent a room down
erikjohnston Jul 16, 2024
1858317
Handle initial flag correctly
erikjohnston Jul 17, 2024
de6e3bd
Handle state deltas in non-initial rooms
erikjohnston Jul 17, 2024
e2c47bf
Fix tests
erikjohnston Jul 17, 2024
a90c408
Newsfile
erikjohnston Jul 17, 2024
2968f2e
Bump typing-extensions for 'assert_never'
erikjohnston Jul 17, 2024
2784881
Add tests
erikjohnston Jul 17, 2024
d689204
Add docstring
erikjohnston Jul 18, 2024
560087b
Remove '_token' prefix
erikjohnston Jul 18, 2024
40d9587
Apply suggestions from code review
erikjohnston Jul 18, 2024
14eb781
Reword doc
erikjohnston Jul 18, 2024
9ae2551
Keyword args
erikjohnston Jul 18, 2024
51f7602
Rename bounds
erikjohnston Jul 18, 2024
eab092b
Add context to conn_id
erikjohnston Jul 18, 2024
605b358
Refactor to avoid SyncConfig.connection_id()
erikjohnston Jul 18, 2024
37c4463
Add some unit tests for 'get_room_sync_data'
erikjohnston Jul 18, 2024
7f26fc4
Merge remote-tracking branch 'origin/develop' into erikj/ss_room_store
erikjohnston Jul 18, 2024
532594e
Fix linting in tests
erikjohnston Jul 18, 2024
1230a51
Merge remote-tracking branch 'origin/develop' into erikj/ss_tokens
erikjohnston Jul 19, 2024
01f1dca
Merge branch 'erikj/ss_tokens' into erikj/ss_room_store
erikjohnston Jul 19, 2024
6f738a4
Apply suggestions from code review
erikjohnston Jul 23, 2024
588dfb8
Merge remote-tracking branch 'origin/develop' into erikj/ss_tokens
erikjohnston Jul 23, 2024
0c4e633
Fix tests
erikjohnston Jul 23, 2024
60790d6
Change token names again
erikjohnston Jul 23, 2024
4ce3e51
Ensure there is only one SlidingSyncHandler
erikjohnston Jul 23, 2024
602b6c8
Add test for cache being cleared
erikjohnston Jul 23, 2024
e2f3d48
Merge branch 'erikj/ss_tokens' into erikj/ss_room_store
erikjohnston Jul 23, 2024
81b2162
Merge remote-tracking branch 'origin/develop' into erikj/ss_room_store
erikjohnston Jul 24, 2024
75f7c01
Only send rooms with updates down sliding sync
erikjohnston Jul 24, 2024
b36cada
Newsfile
erikjohnston Jul 24, 2024
0812894
Update comments
erikjohnston Jul 25, 2024
66f80cc
Merge remote-tracking branch 'origin/develop' into erikj/ss_room_store
erikjohnston Jul 25, 2024
2c48784
Use do_sync in tests
erikjohnston Jul 25, 2024
abd7a5b
Move to integration tests
erikjohnston Jul 25, 2024
470c5d3
Review comments
erikjohnston Jul 26, 2024
90f2184
Previously state test
erikjohnston Jul 26, 2024
e7a6c19
Add never test
erikjohnston Jul 26, 2024
b9facaf
Review comments
erikjohnston Jul 26, 2024
b396450
Add test that empty room comes down initial sync
erikjohnston Jul 26, 2024
c56a745
Make it clear we only filter out rooms in incremental sync
erikjohnston Jul 26, 2024
c53e83d
Don't send down rooms if nothing has happened
erikjohnston Jul 26, 2024
1740f11
Merge remote-tracking branch 'origin/erikj/ss_room_store' into erikj/…
erikjohnston Jul 26, 2024
fd61672
Add dedicated function 'get_rooms_that_might_have_updates'
erikjohnston Jul 26, 2024
7ae5f78
Apply suggestions from code review
erikjohnston Jul 29, 2024
76c69e6
Add clarifying comments
erikjohnston Jul 29, 2024
d043009
Merge remote-tracking branch 'origin/develop' into erikj/ss_noop_rooms
erikjohnston Jul 29, 2024
4646edb
Fix comment
erikjohnston Jul 29, 2024
7088cb0
Apply suggestions from code review
erikjohnston Jul 29, 2024
07c0561
Change comment and andd assertion
erikjohnston Jul 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17479.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Do not send down empty room entries down experimental sliding sync endpoint.
51 changes: 49 additions & 2 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,51 @@ async def current_sync_for_user(
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}

# Filter out rooms that haven't received updates and we've sent down
# previously.
Comment on lines +622 to +623
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the Sliding Sync MSC3575, I don't see this as allowed. In the spec, the delta_tokens only applied to timeline/required_state.

I'm assuming this is another "Simplified" Sliding Sync change we're making.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From discussion elsewhere: the proxy won't send down rooms where there haven't been changes. I think the confusion is that the list ops that the proxy will use in that case is not sync but a delete / insert.

Since we're getting rid of list ops then this distinction doesn't matter, and its ok for us to filter stuff out.

if from_token:
rooms_should_send = set()

# First we check if there are rooms that match a list/room
# subscription and have updates we need to send (i.e. either because
# we haven't sent the room down, or we have but there are missing
# updates).
for room_id in relevant_room_map:
status = await self.connection_store.have_sent_room(
sync_config,
from_token.connection_position,
room_id,
)
if (
# The room was never sent down before so the client needs to know
# about it regardless of any updates.
status.status == HaveSentRoomFlag.NEVER
# `PREVIOUSLY` literally means the "room was sent down before *AND*
# there are updates we haven't sent down" so we already know this
# room has updates.
or status.status == HaveSentRoomFlag.PREVIOUSLY
):
rooms_should_send.add(room_id)
elif status.status == HaveSentRoomFlag.LIVE:
# We know that we've sent all updates up until `from_token`,
# so we just need to check if there have been updates since
# then.
pass
else:
assert_never(status.status)

# We only need to check for new events since any state changes
# will also come down as new events.
rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
relevant_room_map.keys(), from_token.stream_token.room_key
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)
rooms_should_send.update(rooms_that_have_updates)
relevant_room_map = {
room_id: room_sync_config
for room_id, room_sync_config in relevant_room_map.items()
if room_id in rooms_should_send
}

@trace
@tag_args
async def handle_room(room_id: str) -> None:
Expand All @@ -633,7 +678,9 @@ async def handle_room(room_id: str) -> None:
to_token=to_token,
)

rooms[room_id] = room_sync_result
# Filter out empty room results during incremental sync
if room_sync_result or not from_token:
rooms[room_id] = room_sync_result

with start_active_span("sliding_sync.generate_room_entries"):
await concurrently_execute(handle_room, relevant_room_map, 10)
Expand Down Expand Up @@ -2198,7 +2245,7 @@ class SlidingSyncConnectionStore:
a connection position of 5 might have totally different states on worker A and
worker B.

One complication that we need to deal with here is needing to handle requests being
One complication that we need to deal with here is needing to handle requests being
resent, i.e. if we sent down a room in a response that the client received, we must
consider the room *not* sent when we get the request again.

Expand Down
10 changes: 10 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2104,3 +2104,13 @@ async def get_timeline_gaps(
return RoomStreamToken(stream=last_position.stream - 1)

return None

def get_rooms_that_might_have_updates(
self, room_ids: StrCollection, from_token: RoomStreamToken
) -> StrCollection:
"""Filters given room IDs down to those that might have updates, i.e.
removes rooms that definitely do not have updates.
"""
return self._events_stream_cache.get_entities_changed(
room_ids, from_token.stream
)
17 changes: 16 additions & 1 deletion synapse/types/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,17 @@ class StrippedHero:
notification_count: int
highlight_count: int

def __bool__(self) -> bool:
return (
# If this is the first time the client is seeing the room, we should not filter it out
# under any circumstance.
self.initial
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# We need to let the client know if there are any new events
or bool(self.required_state)
or bool(self.timeline_events)
or bool(self.stripped_state)
)

@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList:
"""
Expand Down Expand Up @@ -367,7 +378,11 @@ def __bool__(self) -> bool:
to tell if the notifier needs to wait for more events when polling for
events.
"""
return bool(self.lists or self.rooms or self.extensions)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
# the latest activity, anything that would cause the order to change would end
# up in `self.rooms` and cause us to send down the change.
return bool(self.rooms or self.extensions)

@staticmethod
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
Expand Down
89 changes: 62 additions & 27 deletions tests/rest/client/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
)
from tests.server import TimedOutException
from tests.test_utils.event_injection import create_event, mark_event_as_partial_state
from tests.unittest import skip_unless

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1656,12 +1655,6 @@ def test_wait_for_new_data(self) -> None:
channel.json_body["rooms"][room_id]["timeline"],
)

# TODO: Once we remove `ops`, we should be able to add a `RoomResult.__bool__` to
# check if there are any updates since the `from_token`.
@skip_unless(
False,
"Once we remove ops from the Sliding Sync response, this test should pass",
)
def test_wait_for_new_data_timeout(self) -> None:
"""
Test to make sure that the Sliding Sync request waits for new data to arrive but
Expand Down Expand Up @@ -1711,12 +1704,8 @@ def test_wait_for_new_data_timeout(self) -> None:
channel.await_result(timeout_ms=1200)
self.assertEqual(channel.code, 200, channel.json_body)

# We still see rooms because that's how Sliding Sync lists work but we reached
# the timeout before seeing them
self.assertEqual(
[event["event_id"] for event in channel.json_body["rooms"].keys()],
[room_id],
)
# There should be no room sent down.
self.assertFalse(channel.json_body["rooms"])

def test_filter_list(self) -> None:
"""
Expand Down Expand Up @@ -3556,19 +3545,7 @@ def test_rooms_ban_incremental_sync2(self) -> None:
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)

# Nothing to see for this banned user in the room in the token range
self.assertIsNone(response_body["rooms"][room_id1].get("timeline"))
# No events returned in the timeline so nothing is "live"
self.assertEqual(
response_body["rooms"][room_id1]["num_live"],
0,
response_body["rooms"][room_id1],
)
# There aren't anymore events to paginate to in this range
self.assertEqual(
response_body["rooms"][room_id1]["limited"],
False,
response_body["rooms"][room_id1],
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
self.assertIsNone(response_body["rooms"].get(room_id1))

def test_rooms_no_required_state(self) -> None:
"""
Expand Down Expand Up @@ -3668,12 +3645,15 @@ def test_rooms_required_state_incremental_sync(self) -> None:
# This one doesn't exist in the room
[EventTypes.Tombstone, ""],
],
"timeline_limit": 0,
"timeline_limit": 1,
}
}
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)

# Send a message so the room comes down sync.
self.helper.send(room_id1, "msg", tok=user1_tok)

# Make the incremental Sliding Sync request
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)

Expand Down Expand Up @@ -4880,6 +4860,61 @@ def test_rooms_timeline_incremental_sync_NEVER(self) -> None:
self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)

def test_rooms_with_no_updates_do_not_come_down_incremental_sync(self) -> None:
"""
Test that rooms with no updates are returned in subsequent incremental
syncs.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)

sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
}
}
}

_, from_token = self.do_sync(sync_body, tok=user1_tok)

# Make the incremental Sliding Sync request
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)

# Nothing has happened in the room, so the room should not come down
# /sync.
self.assertIsNone(response_body["rooms"].get(room_id1))

def test_empty_initial_room_comes_down_sync(self) -> None:
"""
Test that rooms come down /sync even with empty required state and
timeline limit in initial sync.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)

sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
}
}
}

# Make the Sliding Sync request
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)


class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
"""Tests for the to-device sliding sync extension"""
Expand Down
Loading