Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding sync: Store the per-connection state in the database. #17599

Merged
merged 27 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e2ade85
Move sliding sync types
erikjohnston Aug 21, 2024
5b77f4a
Update mypy plugin to handle enums and typevars
erikjohnston Aug 22, 2024
7087c7c
Make RoomSyncConfig immutable
erikjohnston Aug 22, 2024
e34d634
Make PerConnectionState immutable
erikjohnston Aug 22, 2024
87d5336
Newsfile
erikjohnston Aug 22, 2024
d1ee253
Allow making columns AUTOINCREMENT
erikjohnston Aug 22, 2024
b3d8e2d
Add simple_insert_returning_txn
erikjohnston Aug 22, 2024
3838b18
Store state
erikjohnston Aug 19, 2024
ed7591c
Remove mark_token_seen
erikjohnston Aug 22, 2024
03eac5a
Newsfile
erikjohnston Aug 22, 2024
7935423
Apply suggestions from code review
erikjohnston Aug 26, 2024
948456b
Add comment about why we ignore zero position
erikjohnston Aug 26, 2024
68a2a98
Don't bother to use zip
erikjohnston Aug 26, 2024
8ed1c07
Add timestamp index
erikjohnston Aug 26, 2024
bae50d3
Rename column
erikjohnston Aug 26, 2024
1e5a3a7
Fix errors
erikjohnston Aug 26, 2024
0e07f65
Rename get_and_clear_connection_positions
erikjohnston Aug 27, 2024
4a68975
Don't assert unkown streams, log
erikjohnston Aug 27, 2024
ac14e57
Index on created ts
erikjohnston Aug 27, 2024
5fe6466
Update synapse/types/handlers/sliding_sync.py
erikjohnston Aug 27, 2024
9065382
Add cast comment
erikjohnston Aug 28, 2024
52f2199
Rename to 'effective_device_id'
erikjohnston Aug 28, 2024
174e1ad
Merge remote-tracking branch 'origin/develop' into erikj/ss_immutable
erikjohnston Aug 29, 2024
5152f19
Merge branch 'erikj/ss_immutable' into erikj/ss_store_state
erikjohnston Aug 29, 2024
8a54f48
Merge remote-tracking branch 'origin/develop' into erikj/ss_store_state
erikjohnston Aug 29, 2024
2da80fe
Add commente on new tables in in schema notes
erikjohnston Aug 29, 2024
57a9506
Merge remote-tracking branch 'origin/develop' into erikj/ss_store_state
erikjohnston Aug 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17599.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Store sliding sync per-connection state in the database.
1 change: 1 addition & 0 deletions changelog.d/17600.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the sliding sync `PerConnectionState` class immutable.
19 changes: 18 additions & 1 deletion scripts-dev/mypy_synapse_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
NoneType,
TupleType,
TypeAliasType,
TypeVarType,
UninhabitedType,
UnionType,
)
Expand Down Expand Up @@ -233,6 +234,7 @@ def check_is_cacheable(
"synapse.synapse_rust.push.FilteredPushRules",
# This is technically not immutable, but close enough.
"signedjson.types.VerifyKey",
"synapse.types.StrCollection",
}

# Immutable containers only if the values are also immutable.
Expand Down Expand Up @@ -298,7 +300,7 @@ def is_cacheable(

elif rt.type.fullname in MUTABLE_CONTAINER_TYPES:
# Mutable containers are mutable regardless of their underlying type.
return False, None
return False, f"container {rt.type.fullname} is mutable"

elif "attrs" in rt.type.metadata:
# attrs classes are only cachable iff it is frozen (immutable itself)
Expand All @@ -318,6 +320,9 @@ def is_cacheable(
else:
return False, "non-frozen attrs class"

elif rt.type.is_enum:
# We assume Enum values are immutable
return True, None
else:
# Ensure we fail for unknown types, these generally means that the
# above code is not complete.
Expand All @@ -326,6 +331,18 @@ def is_cacheable(
f"Don't know how to handle {rt.type.fullname} return type instance",
)

elif isinstance(rt, TypeVarType):
# We consider TypeVars immutable if they are bound to a set of immutable
# types.
if rt.values:
for value in rt.values:
ok, note = is_cacheable(value, signature, verbose)
if not ok:
return False, f"TypeVar bound not cacheable {value}"
return True, None

return False, "TypeVar is unbound"

elif isinstance(rt, NoneType):
# None is cachable.
return True, None
Expand Down
2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.session import SessionStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.stream import StreamWorkerStore
Expand Down Expand Up @@ -159,6 +160,7 @@ class GenericWorkerStore(
SessionStore,
TaskSchedulerWorkerStore,
ExperimentalFeaturesStore,
SlidingSyncStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.
Expand Down
26 changes: 11 additions & 15 deletions synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@
_RoomMembershipForUser,
)
from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore
from synapse.handlers.sliding_sync.types import (
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
RoomSyncConfig,
StateValues,
)
from synapse.logging.opentracing import (
SynapseTags,
log_kv,
Expand All @@ -57,7 +50,15 @@
StreamKeyType,
StreamToken,
)
from synapse.types.handlers import SlidingSyncConfig, SlidingSyncResult
from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
RoomSyncConfig,
SlidingSyncConfig,
SlidingSyncResult,
StateValues,
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import concurrently_execute
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -99,7 +100,7 @@ def __init__(self, hs: "HomeServer"):
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
self.is_mine_id = hs.is_mine_id

self.connection_store = SlidingSyncConnectionStore()
self.connection_store = SlidingSyncConnectionStore(self.store)
self.extensions = SlidingSyncExtensionHandler(hs)
self.room_lists = SlidingSyncRoomLists(hs)

Expand Down Expand Up @@ -220,16 +221,11 @@ async def current_sync_for_user(
# amount of time (more with round-trips and re-processing) in the end to
# get everything again.
previous_connection_state = (
await self.connection_store.get_per_connection_state(
await self.connection_store.get_and_clear_connection_positions(
sync_config, from_token
)
)

await self.connection_store.mark_token_seen(
sync_config=sync_config,
from_token=from_token,
)

# Get all of the room IDs that the user should be able to see in the sync
# response
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
Expand Down
14 changes: 8 additions & 6 deletions synapse/handlers/sliding_sync/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@

from synapse.api.constants import AccountDataTypes, EduTypes
from synapse.handlers.receipts import ReceiptEventSource
from synapse.handlers.sliding_sync.types import (
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
)
from synapse.logging.opentracing import trace
from synapse.storage.databases.main.receipts import ReceiptInRoom
from synapse.types import (
Expand All @@ -35,7 +30,14 @@
StrCollection,
StreamToken,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
MutablePerConnectionState,
OperationType,
PerConnectionState,
SlidingSyncConfig,
SlidingSyncResult,
)

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down
34 changes: 17 additions & 17 deletions synapse/handlers/sliding_sync/room_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@
)
from synapse.events import StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event
from synapse.handlers.sliding_sync.types import (
HaveSentRoomFlag,
PerConnectionState,
RoomSyncConfig,
)
from synapse.logging.opentracing import start_active_span, trace
from synapse.storage.databases.main.state import (
ROOM_UNKNOWN_SENTINEL,
Expand All @@ -61,7 +56,14 @@
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
OperationType,
PerConnectionState,
RoomSyncConfig,
SlidingSyncConfig,
SlidingSyncResult,
)
from synapse.types.state import StateFilter

if TYPE_CHECKING:
Expand Down Expand Up @@ -279,15 +281,11 @@ async def compute_interested_rooms(
room_id
)
if existing_room_sync_config is not None:
existing_room_sync_config.combine_room_sync_config(
room_sync_config = existing_room_sync_config.combine_room_sync_config(
room_sync_config
)
else:
# Make a copy so if we modify it later, it doesn't
# affect all references.
relevant_room_map[room_id] = (
room_sync_config.deep_copy()
)

relevant_room_map[room_id] = room_sync_config

room_ids_in_list.append(room_id)

Expand Down Expand Up @@ -351,11 +349,13 @@ async def compute_interested_rooms(
# and need to fetch more info about.
existing_room_sync_config = relevant_room_map.get(room_id)
if existing_room_sync_config is not None:
existing_room_sync_config.combine_room_sync_config(
room_sync_config
room_sync_config = (
existing_room_sync_config.combine_room_sync_config(
room_sync_config
)
)
else:
relevant_room_map[room_id] = room_sync_config

relevant_room_map[room_id] = room_sync_config

# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
Expand Down
Loading
Loading