From 2999a14aed298ad4358e280fa9938010f1edd0d7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 16:22:57 +0100 Subject: [PATCH] Sliding Sync: Make `PerConnectionState` immutable (#17600) This is so that we can cache it. We also move the sliding sync types to `synapse/types/handlers/sliding_sync.py`. This is mainly in-prep for #17599 to avoid circular imports. The only change in behaviour is that `RoomSyncConfig.combine_sync_config(..)` now returns a new room sync config rather than mutating in-place. Reviewable commit-by-commit. --------- Co-authored-by: Eric Eastwood --- changelog.d/17600.misc | 1 + scripts-dev/mypy_synapse_plugin.py | 19 +- synapse/handlers/sliding_sync/__init__.py | 16 +- synapse/handlers/sliding_sync/extensions.py | 14 +- synapse/handlers/sliding_sync/room_lists.py | 34 +- synapse/handlers/sliding_sync/store.py | 8 +- synapse/types/handlers/__init__.py | 357 +-------------- .../handlers/sliding_sync.py} | 413 ++++++++++++++++-- tests/handlers/test_sliding_sync.py | 23 +- 9 files changed, 441 insertions(+), 444 deletions(-) create mode 100644 changelog.d/17600.misc rename synapse/{handlers/sliding_sync/types.py => types/handlers/sliding_sync.py} (52%) diff --git a/changelog.d/17600.misc b/changelog.d/17600.misc new file mode 100644 index 00000000000..a81c67f6d18 --- /dev/null +++ b/changelog.d/17600.misc @@ -0,0 +1 @@ +Make the sliding sync `PerConnectionState` class immutable. diff --git a/scripts-dev/mypy_synapse_plugin.py b/scripts-dev/mypy_synapse_plugin.py index 877b831751f..509047b41b1 100644 --- a/scripts-dev/mypy_synapse_plugin.py +++ b/scripts-dev/mypy_synapse_plugin.py @@ -38,6 +38,7 @@ NoneType, TupleType, TypeAliasType, + TypeVarType, UninhabitedType, UnionType, ) @@ -233,6 +234,7 @@ def check_is_cacheable( "synapse.synapse_rust.push.FilteredPushRules", # This is technically not immutable, but close enough. "signedjson.types.VerifyKey", + "synapse.types.StrCollection", } # Immutable containers only if the values are also immutable. @@ -298,7 +300,7 @@ def is_cacheable( elif rt.type.fullname in MUTABLE_CONTAINER_TYPES: # Mutable containers are mutable regardless of their underlying type. - return False, None + return False, f"container {rt.type.fullname} is mutable" elif "attrs" in rt.type.metadata: # attrs classes are only cachable iff it is frozen (immutable itself) @@ -318,6 +320,9 @@ def is_cacheable( else: return False, "non-frozen attrs class" + elif rt.type.is_enum: + # We assume Enum values are immutable + return True, None else: # Ensure we fail for unknown types, these generally means that the # above code is not complete. @@ -326,6 +331,18 @@ def is_cacheable( f"Don't know how to handle {rt.type.fullname} return type instance", ) + elif isinstance(rt, TypeVarType): + # We consider TypeVars immutable if they are bound to a set of immutable + # types. + if rt.values: + for value in rt.values: + ok, note = is_cacheable(value, signature, verbose) + if not ok: + return False, f"TypeVar bound not cacheable {value}" + return True, None + + return False, "TypeVar is unbound" + elif isinstance(rt, NoneType): # None is cachable. return True, None diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index c34ba83cd6e..7d4f6415c0e 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -29,13 +29,6 @@ _RoomMembershipForUser, ) from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore -from synapse.handlers.sliding_sync.types import ( - HaveSentRoomFlag, - MutablePerConnectionState, - PerConnectionState, - RoomSyncConfig, - StateValues, -) from synapse.logging.opentracing import ( SynapseTags, log_kv, @@ -57,10 +50,15 @@ StreamKeyType, StreamToken, ) -from synapse.types.handlers import ( - SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, +from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES +from synapse.types.handlers.sliding_sync import ( + HaveSentRoomFlag, + MutablePerConnectionState, + PerConnectionState, + RoomSyncConfig, SlidingSyncConfig, SlidingSyncResult, + StateValues, ) from synapse.types.state import StateFilter from synapse.util.async_helpers import concurrently_execute diff --git a/synapse/handlers/sliding_sync/extensions.py b/synapse/handlers/sliding_sync/extensions.py index a2d4f24f9ce..d9f4c56e6e0 100644 --- a/synapse/handlers/sliding_sync/extensions.py +++ b/synapse/handlers/sliding_sync/extensions.py @@ -20,11 +20,6 @@ from synapse.api.constants import AccountDataTypes, EduTypes from synapse.handlers.receipts import ReceiptEventSource -from synapse.handlers.sliding_sync.types import ( - HaveSentRoomFlag, - MutablePerConnectionState, - PerConnectionState, -) from synapse.logging.opentracing import trace from synapse.storage.databases.main.receipts import ReceiptInRoom from synapse.types import ( @@ -35,7 +30,14 @@ StrCollection, StreamToken, ) -from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult +from synapse.types.handlers.sliding_sync import ( + HaveSentRoomFlag, + MutablePerConnectionState, + OperationType, + PerConnectionState, + SlidingSyncConfig, + SlidingSyncResult, +) if TYPE_CHECKING: from synapse.server import HomeServer diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 4718e8092b9..0e6cb285244 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -40,11 +40,6 @@ ) from synapse.events import StrippedStateEvent from synapse.events.utils import parse_stripped_state_event -from synapse.handlers.sliding_sync.types import ( - HaveSentRoomFlag, - PerConnectionState, - RoomSyncConfig, -) from synapse.logging.opentracing import start_active_span, trace from synapse.storage.databases.main.state import ( ROOM_UNKNOWN_SENTINEL, @@ -61,7 +56,14 @@ StreamToken, UserID, ) -from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult +from synapse.types.handlers.sliding_sync import ( + HaveSentRoomFlag, + OperationType, + PerConnectionState, + RoomSyncConfig, + SlidingSyncConfig, + SlidingSyncResult, +) from synapse.types.state import StateFilter if TYPE_CHECKING: @@ -279,15 +281,11 @@ async def compute_interested_rooms( room_id ) if existing_room_sync_config is not None: - existing_room_sync_config.combine_room_sync_config( + room_sync_config = existing_room_sync_config.combine_room_sync_config( room_sync_config ) - else: - # Make a copy so if we modify it later, it doesn't - # affect all references. - relevant_room_map[room_id] = ( - room_sync_config.deep_copy() - ) + + relevant_room_map[room_id] = room_sync_config room_ids_in_list.append(room_id) @@ -351,11 +349,13 @@ async def compute_interested_rooms( # and need to fetch more info about. existing_room_sync_config = relevant_room_map.get(room_id) if existing_room_sync_config is not None: - existing_room_sync_config.combine_room_sync_config( - room_sync_config + room_sync_config = ( + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) ) - else: - relevant_room_map[room_id] = room_sync_config + + relevant_room_map[room_id] = room_sync_config # Filtered subset of `relevant_room_map` for rooms that may have updates # (in the event stream) diff --git a/synapse/handlers/sliding_sync/store.py b/synapse/handlers/sliding_sync/store.py index 3b727432fb1..e38fe3556ff 100644 --- a/synapse/handlers/sliding_sync/store.py +++ b/synapse/handlers/sliding_sync/store.py @@ -18,13 +18,13 @@ import attr from synapse.api.errors import SlidingSyncUnknownPosition -from synapse.handlers.sliding_sync.types import ( +from synapse.logging.opentracing import trace +from synapse.types import SlidingSyncStreamToken +from synapse.types.handlers.sliding_sync import ( MutablePerConnectionState, PerConnectionState, + SlidingSyncConfig, ) -from synapse.logging.opentracing import trace -from synapse.types import SlidingSyncStreamToken -from synapse.types.handlers import SlidingSyncConfig if TYPE_CHECKING: pass diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 126f03dd900..f2fbc1dddf2 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -17,34 +17,11 @@ # [This file includes modifications made by New Vector Limited] # # -from enum import Enum -from typing import TYPE_CHECKING, Dict, Final, List, Mapping, Optional, Sequence, Tuple -import attr -from typing_extensions import TypedDict -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import Extra -else: - from pydantic import Extra +from typing import List, Optional, TypedDict from synapse.api.constants import EventTypes -from synapse.events import EventBase -from synapse.types import ( - DeviceListUpdates, - JsonDict, - JsonMapping, - Requester, - SlidingSyncStreamToken, - StreamToken, - UserID, -) -from synapse.types.rest.client import SlidingSyncBody - -if TYPE_CHECKING: - from synapse.handlers.relations import BundledAggregations # Sliding Sync: The event types that clients should consider as new activity and affect # the `bump_stamp` @@ -114,335 +91,3 @@ class ShutdownRoomResponse(TypedDict): failed_to_kick_users: List[str] local_aliases: List[str] new_room_id: Optional[str] - - -class SlidingSyncConfig(SlidingSyncBody): - """ - Inherit from `SlidingSyncBody` since we need all of the same fields and add a few - extra fields that we need in the handler - """ - - user: UserID - requester: Requester - - # Pydantic config - class Config: - # By default, ignore fields that we don't recognise. - extra = Extra.ignore - # By default, don't allow fields to be reassigned after parsing. - allow_mutation = False - # Allow custom types like `UserID` to be used in the model - arbitrary_types_allowed = True - - -class OperationType(Enum): - """ - Represents the operation types in a Sliding Sync window. - - Attributes: - SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about - entries in this range. - INSERT: Sets a single entry. If the position is not empty then clients MUST move - entries to the left or the right depending on where the closest empty space is. - DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move - places. - INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for - offline support, but they should be treated as empty when additional operations - which concern indexes in the range arrive from the server. - """ - - SYNC: Final = "SYNC" - INSERT: Final = "INSERT" - DELETE: Final = "DELETE" - INVALIDATE: Final = "INVALIDATE" - - -@attr.s(slots=True, frozen=True, auto_attribs=True) -class SlidingSyncResult: - """ - The Sliding Sync result to be serialized to JSON for a response. - - Attributes: - next_pos: The next position token in the sliding window to request (next_batch). - lists: Sliding window API. A map of list key to list results. - rooms: Room subscription API. A map of room ID to room results. - extensions: Extensions API. A map of extension key to extension results. - """ - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class RoomResult: - """ - Attributes: - name: Room name or calculated room name. - avatar: Room avatar - heroes: List of stripped membership events (containing `user_id` and optionally - `avatar_url` and `displayname`) for the users used to calculate the room name. - is_dm: Flag to specify whether the room is a direct-message room (most likely - between two people). - initial: Flag which is set when this is the first time the server is sending this - data on this connection. Clients can use this flag to replace or update - their local state. When there is an update, servers MUST omit this flag - entirely and NOT send "initial":false as this is wasteful on bandwidth. The - absence of this flag means 'false'. - unstable_expanded_timeline: Flag which is set if we're returning more historic - events due to the timeline limit having increased. See "XXX: Odd behavior" - comment ing `synapse.handlers.sliding_sync`. - required_state: The current state of the room - timeline: Latest events in the room. The last event is the most recent. - bundled_aggregations: A mapping of event ID to the bundled aggregations for - the timeline events above. This allows clients to show accurate reaction - counts (or edits, threads), even if some of the reaction events were skipped - over in a gappy sync. - stripped_state: Stripped state events (for rooms where the usre is - invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2, - absent on joined/left rooms - prev_batch: A token that can be passed as a start parameter to the - `/rooms//messages` API to retrieve earlier messages. - limited: True if there are more events than `timeline_limit` looking - backwards from the `response.pos` to the `request.pos`. - num_live: The number of timeline events which have just occurred and are not historical. - The last N events are 'live' and should be treated as such. This is mostly - useful to determine whether a given @mention event should make a noise or not. - Clients cannot rely solely on the absence of `initial: true` to determine live - events because if a room not in the sliding window bumps into the window because - of an @mention it will have `initial: true` yet contain a single live event - (with potentially other old events in the timeline). - bump_stamp: The `stream_ordering` of the last event according to the - `bump_event_types`. This helps clients sort more readily without them - needing to pull in a bunch of the timeline to determine the last activity. - `bump_event_types` is a thing because for example, we don't want display - name changes to mark the room as unread and bump it to the top. For - encrypted rooms, we just have to consider any activity as a bump because we - can't see the content and the client has to figure it out for themselves. - joined_count: The number of users with membership of join, including the client's - own user ID. (same as sync `v2 m.joined_member_count`) - invited_count: The number of users with membership of invite. (same as sync v2 - `m.invited_member_count`) - notification_count: The total number of unread notifications for this room. (same - as sync v2) - highlight_count: The number of unread notifications for this room with the highlight - flag set. (same as sync v2) - """ - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class StrippedHero: - user_id: str - display_name: Optional[str] - avatar_url: Optional[str] - - name: Optional[str] - avatar: Optional[str] - heroes: Optional[List[StrippedHero]] - is_dm: bool - initial: bool - unstable_expanded_timeline: bool - # Should be empty for invite/knock rooms with `stripped_state` - required_state: List[EventBase] - # Should be empty for invite/knock rooms with `stripped_state` - timeline_events: List[EventBase] - bundled_aggregations: Optional[Dict[str, "BundledAggregations"]] - # Optional because it's only relevant to invite/knock rooms - stripped_state: List[JsonDict] - # Only optional because it won't be included for invite/knock rooms with `stripped_state` - prev_batch: Optional[StreamToken] - # Only optional because it won't be included for invite/knock rooms with `stripped_state` - limited: Optional[bool] - # Only optional because it won't be included for invite/knock rooms with `stripped_state` - num_live: Optional[int] - bump_stamp: int - joined_count: int - invited_count: int - notification_count: int - highlight_count: int - - def __bool__(self) -> bool: - return ( - # If this is the first time the client is seeing the room, we should not filter it out - # under any circumstance. - self.initial - # We need to let the client know if there are any new events - or bool(self.required_state) - or bool(self.timeline_events) - or bool(self.stripped_state) - ) - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class SlidingWindowList: - """ - Attributes: - count: The total number of entries in the list. Always present if this list - is. - ops: The sliding list operations to perform. - """ - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class Operation: - """ - Attributes: - op: The operation type to perform. - range: Which index positions are affected by this operation. These are - both inclusive. - room_ids: Which room IDs are affected by this operation. These IDs match - up to the positions in the `range`, so the last room ID in this list - matches the 9th index. The room data is held in a separate object. - """ - - op: OperationType - range: Tuple[int, int] - room_ids: List[str] - - count: int - ops: List[Operation] - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class Extensions: - """Responses for extensions - - Attributes: - to_device: The to-device extension (MSC3885) - e2ee: The E2EE device extension (MSC3884) - """ - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class ToDeviceExtension: - """The to-device extension (MSC3885) - - Attributes: - next_batch: The to-device stream token the client should use - to get more results - events: A list of to-device messages for the client - """ - - next_batch: str - events: Sequence[JsonMapping] - - def __bool__(self) -> bool: - return bool(self.events) - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class E2eeExtension: - """The E2EE device extension (MSC3884) - - Attributes: - device_list_updates: List of user_ids whose devices have changed or left (only - present on incremental syncs). - device_one_time_keys_count: Map from key algorithm to the number of - unclaimed one-time keys currently held on the server for this device. If - an algorithm is unlisted, the count for that algorithm is assumed to be - zero. If this entire parameter is missing, the count for all algorithms - is assumed to be zero. - device_unused_fallback_key_types: List of unused fallback key algorithms - for this device. - """ - - # Only present on incremental syncs - device_list_updates: Optional[DeviceListUpdates] - device_one_time_keys_count: Mapping[str, int] - device_unused_fallback_key_types: Sequence[str] - - def __bool__(self) -> bool: - # Note that "signed_curve25519" is always returned in key count responses - # regardless of whether we uploaded any keys for it. This is necessary until - # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed. - # - # Also related: - # https://github.com/element-hq/element-android/issues/3725 and - # https://github.com/matrix-org/synapse/issues/10456 - default_otk = self.device_one_time_keys_count.get("signed_curve25519") - more_than_default_otk = len(self.device_one_time_keys_count) > 1 or ( - default_otk is not None and default_otk > 0 - ) - - return bool( - more_than_default_otk - or self.device_list_updates - or self.device_unused_fallback_key_types - ) - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class AccountDataExtension: - """The Account Data extension (MSC3959) - - Attributes: - global_account_data_map: Mapping from `type` to `content` of global account - data events. - account_data_by_room_map: Mapping from room_id to mapping of `type` to - `content` of room account data events. - """ - - global_account_data_map: Mapping[str, JsonMapping] - account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] - - def __bool__(self) -> bool: - return bool( - self.global_account_data_map or self.account_data_by_room_map - ) - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class ReceiptsExtension: - """The Receipts extension (MSC3960) - - Attributes: - room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral - event (type, content) - """ - - room_id_to_receipt_map: Mapping[str, JsonMapping] - - def __bool__(self) -> bool: - return bool(self.room_id_to_receipt_map) - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class TypingExtension: - """The Typing Notification extension (MSC3961) - - Attributes: - room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral - event (type, content) - """ - - room_id_to_typing_map: Mapping[str, JsonMapping] - - def __bool__(self) -> bool: - return bool(self.room_id_to_typing_map) - - to_device: Optional[ToDeviceExtension] = None - e2ee: Optional[E2eeExtension] = None - account_data: Optional[AccountDataExtension] = None - receipts: Optional[ReceiptsExtension] = None - typing: Optional[TypingExtension] = None - - def __bool__(self) -> bool: - return bool( - self.to_device - or self.e2ee - or self.account_data - or self.receipts - or self.typing - ) - - next_pos: SlidingSyncStreamToken - lists: Mapping[str, SlidingWindowList] - rooms: Dict[str, RoomResult] - extensions: Extensions - - def __bool__(self) -> bool: - """Make the result appear empty if there are no updates. This is used - to tell if the notifier needs to wait for more events when polling for - events. - """ - # We don't include `self.lists` here, as a) `lists` is always non-empty even if - # there are no changes, and b) since we're sorting rooms by `stream_ordering` of - # the latest activity, anything that would cause the order to change would end - # up in `self.rooms` and cause us to send down the change. - return bool(self.rooms or self.extensions) - - @staticmethod - def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult": - "Return a new empty result" - return SlidingSyncResult( - next_pos=next_pos, - lists={}, - rooms={}, - extensions=SlidingSyncResult.Extensions(), - ) diff --git a/synapse/handlers/sliding_sync/types.py b/synapse/types/handlers/sliding_sync.py similarity index 52% rename from synapse/handlers/sliding_sync/types.py rename to synapse/types/handlers/sliding_sync.py index 003419d40a5..bca1ff7b541 100644 --- a/synapse/handlers/sliding_sync/types.py +++ b/synapse/types/handlers/sliding_sync.py @@ -18,30 +18,382 @@ from enum import Enum from typing import ( TYPE_CHECKING, + AbstractSet, Callable, Dict, Final, Generic, + List, Mapping, MutableMapping, Optional, + Sequence, Set, + Tuple, TypeVar, cast, ) import attr +from synapse._pydantic_compat import HAS_PYDANTIC_V2 from synapse.api.constants import EventTypes from synapse.types import MultiWriterStreamToken, RoomStreamToken, StrCollection, UserID -from synapse.types.handlers import SlidingSyncConfig + +if TYPE_CHECKING or HAS_PYDANTIC_V2: + from pydantic.v1 import Extra +else: + from pydantic import Extra + +from synapse.events import EventBase +from synapse.types import ( + DeviceListUpdates, + JsonDict, + JsonMapping, + Requester, + SlidingSyncStreamToken, + StreamToken, +) +from synapse.types.rest.client import SlidingSyncBody if TYPE_CHECKING: - pass + from synapse.handlers.relations import BundledAggregations logger = logging.getLogger(__name__) +class SlidingSyncConfig(SlidingSyncBody): + """ + Inherit from `SlidingSyncBody` since we need all of the same fields and add a few + extra fields that we need in the handler + """ + + user: UserID + requester: Requester + + # Pydantic config + class Config: + # By default, ignore fields that we don't recognise. + extra = Extra.ignore + # By default, don't allow fields to be reassigned after parsing. + allow_mutation = False + # Allow custom types like `UserID` to be used in the model + arbitrary_types_allowed = True + + +class OperationType(Enum): + """ + Represents the operation types in a Sliding Sync window. + + Attributes: + SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about + entries in this range. + INSERT: Sets a single entry. If the position is not empty then clients MUST move + entries to the left or the right depending on where the closest empty space is. + DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move + places. + INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for + offline support, but they should be treated as empty when additional operations + which concern indexes in the range arrive from the server. + """ + + SYNC: Final = "SYNC" + INSERT: Final = "INSERT" + DELETE: Final = "DELETE" + INVALIDATE: Final = "INVALIDATE" + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class SlidingSyncResult: + """ + The Sliding Sync result to be serialized to JSON for a response. + + Attributes: + next_pos: The next position token in the sliding window to request (next_batch). + lists: Sliding window API. A map of list key to list results. + rooms: Room subscription API. A map of room ID to room results. + extensions: Extensions API. A map of extension key to extension results. + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class RoomResult: + """ + Attributes: + name: Room name or calculated room name. + avatar: Room avatar + heroes: List of stripped membership events (containing `user_id` and optionally + `avatar_url` and `displayname`) for the users used to calculate the room name. + is_dm: Flag to specify whether the room is a direct-message room (most likely + between two people). + initial: Flag which is set when this is the first time the server is sending this + data on this connection. Clients can use this flag to replace or update + their local state. When there is an update, servers MUST omit this flag + entirely and NOT send "initial":false as this is wasteful on bandwidth. The + absence of this flag means 'false'. + unstable_expanded_timeline: Flag which is set if we're returning more historic + events due to the timeline limit having increased. See "XXX: Odd behavior" + comment ing `synapse.handlers.sliding_sync`. + required_state: The current state of the room + timeline: Latest events in the room. The last event is the most recent. + bundled_aggregations: A mapping of event ID to the bundled aggregations for + the timeline events above. This allows clients to show accurate reaction + counts (or edits, threads), even if some of the reaction events were skipped + over in a gappy sync. + stripped_state: Stripped state events (for rooms where the usre is + invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2, + absent on joined/left rooms + prev_batch: A token that can be passed as a start parameter to the + `/rooms//messages` API to retrieve earlier messages. + limited: True if there are more events than `timeline_limit` looking + backwards from the `response.pos` to the `request.pos`. + num_live: The number of timeline events which have just occurred and are not historical. + The last N events are 'live' and should be treated as such. This is mostly + useful to determine whether a given @mention event should make a noise or not. + Clients cannot rely solely on the absence of `initial: true` to determine live + events because if a room not in the sliding window bumps into the window because + of an @mention it will have `initial: true` yet contain a single live event + (with potentially other old events in the timeline). + bump_stamp: The `stream_ordering` of the last event according to the + `bump_event_types`. This helps clients sort more readily without them + needing to pull in a bunch of the timeline to determine the last activity. + `bump_event_types` is a thing because for example, we don't want display + name changes to mark the room as unread and bump it to the top. For + encrypted rooms, we just have to consider any activity as a bump because we + can't see the content and the client has to figure it out for themselves. + joined_count: The number of users with membership of join, including the client's + own user ID. (same as sync `v2 m.joined_member_count`) + invited_count: The number of users with membership of invite. (same as sync v2 + `m.invited_member_count`) + notification_count: The total number of unread notifications for this room. (same + as sync v2) + highlight_count: The number of unread notifications for this room with the highlight + flag set. (same as sync v2) + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class StrippedHero: + user_id: str + display_name: Optional[str] + avatar_url: Optional[str] + + name: Optional[str] + avatar: Optional[str] + heroes: Optional[List[StrippedHero]] + is_dm: bool + initial: bool + unstable_expanded_timeline: bool + # Should be empty for invite/knock rooms with `stripped_state` + required_state: List[EventBase] + # Should be empty for invite/knock rooms with `stripped_state` + timeline_events: List[EventBase] + bundled_aggregations: Optional[Dict[str, "BundledAggregations"]] + # Optional because it's only relevant to invite/knock rooms + stripped_state: List[JsonDict] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + prev_batch: Optional[StreamToken] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + limited: Optional[bool] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + num_live: Optional[int] + bump_stamp: int + joined_count: int + invited_count: int + notification_count: int + highlight_count: int + + def __bool__(self) -> bool: + return ( + # If this is the first time the client is seeing the room, we should not filter it out + # under any circumstance. + self.initial + # We need to let the client know if there are any new events + or bool(self.required_state) + or bool(self.timeline_events) + or bool(self.stripped_state) + ) + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class SlidingWindowList: + """ + Attributes: + count: The total number of entries in the list. Always present if this list + is. + ops: The sliding list operations to perform. + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class Operation: + """ + Attributes: + op: The operation type to perform. + range: Which index positions are affected by this operation. These are + both inclusive. + room_ids: Which room IDs are affected by this operation. These IDs match + up to the positions in the `range`, so the last room ID in this list + matches the 9th index. The room data is held in a separate object. + """ + + op: OperationType + range: Tuple[int, int] + room_ids: List[str] + + count: int + ops: List[Operation] + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class Extensions: + """Responses for extensions + + Attributes: + to_device: The to-device extension (MSC3885) + e2ee: The E2EE device extension (MSC3884) + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class ToDeviceExtension: + """The to-device extension (MSC3885) + + Attributes: + next_batch: The to-device stream token the client should use + to get more results + events: A list of to-device messages for the client + """ + + next_batch: str + events: Sequence[JsonMapping] + + def __bool__(self) -> bool: + return bool(self.events) + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class E2eeExtension: + """The E2EE device extension (MSC3884) + + Attributes: + device_list_updates: List of user_ids whose devices have changed or left (only + present on incremental syncs). + device_one_time_keys_count: Map from key algorithm to the number of + unclaimed one-time keys currently held on the server for this device. If + an algorithm is unlisted, the count for that algorithm is assumed to be + zero. If this entire parameter is missing, the count for all algorithms + is assumed to be zero. + device_unused_fallback_key_types: List of unused fallback key algorithms + for this device. + """ + + # Only present on incremental syncs + device_list_updates: Optional[DeviceListUpdates] + device_one_time_keys_count: Mapping[str, int] + device_unused_fallback_key_types: Sequence[str] + + def __bool__(self) -> bool: + # Note that "signed_curve25519" is always returned in key count responses + # regardless of whether we uploaded any keys for it. This is necessary until + # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed. + # + # Also related: + # https://github.com/element-hq/element-android/issues/3725 and + # https://github.com/matrix-org/synapse/issues/10456 + default_otk = self.device_one_time_keys_count.get("signed_curve25519") + more_than_default_otk = len(self.device_one_time_keys_count) > 1 or ( + default_otk is not None and default_otk > 0 + ) + + return bool( + more_than_default_otk + or self.device_list_updates + or self.device_unused_fallback_key_types + ) + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class AccountDataExtension: + """The Account Data extension (MSC3959) + + Attributes: + global_account_data_map: Mapping from `type` to `content` of global account + data events. + account_data_by_room_map: Mapping from room_id to mapping of `type` to + `content` of room account data events. + """ + + global_account_data_map: Mapping[str, JsonMapping] + account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] + + def __bool__(self) -> bool: + return bool( + self.global_account_data_map or self.account_data_by_room_map + ) + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class ReceiptsExtension: + """The Receipts extension (MSC3960) + + Attributes: + room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral + event (type, content) + """ + + room_id_to_receipt_map: Mapping[str, JsonMapping] + + def __bool__(self) -> bool: + return bool(self.room_id_to_receipt_map) + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class TypingExtension: + """The Typing Notification extension (MSC3961) + + Attributes: + room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral + event (type, content) + """ + + room_id_to_typing_map: Mapping[str, JsonMapping] + + def __bool__(self) -> bool: + return bool(self.room_id_to_typing_map) + + to_device: Optional[ToDeviceExtension] = None + e2ee: Optional[E2eeExtension] = None + account_data: Optional[AccountDataExtension] = None + receipts: Optional[ReceiptsExtension] = None + typing: Optional[TypingExtension] = None + + def __bool__(self) -> bool: + return bool( + self.to_device + or self.e2ee + or self.account_data + or self.receipts + or self.typing + ) + + next_pos: SlidingSyncStreamToken + lists: Mapping[str, SlidingWindowList] + rooms: Dict[str, RoomResult] + extensions: Extensions + + def __bool__(self) -> bool: + """Make the result appear empty if there are no updates. This is used + to tell if the notifier needs to wait for more events when polling for + events. + """ + # We don't include `self.lists` here, as a) `lists` is always non-empty even if + # there are no changes, and b) since we're sorting rooms by `stream_ordering` of + # the latest activity, anything that would cause the order to change would end + # up in `self.rooms` and cause us to send down the change. + return bool(self.rooms or self.extensions) + + @staticmethod + def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult": + "Return a new empty result" + return SlidingSyncResult( + next_pos=next_pos, + lists={}, + rooms={}, + extensions=SlidingSyncResult.Extensions(), + ) + + class StateValues: """ Understood values of the (type, state_key) tuple in `required_state`. @@ -60,7 +412,7 @@ class StateValues: # We can't freeze this class because we want to update it in place with the # de-duplicated data. -@attr.s(slots=True, auto_attribs=True) +@attr.s(slots=True, auto_attribs=True, frozen=True) class RoomSyncConfig: """ Holds the config for what data we should fetch for a room in the sync response. @@ -74,7 +426,7 @@ class RoomSyncConfig: """ timeline_limit: int - required_state_map: Dict[str, Set[str]] + required_state_map: Mapping[str, AbstractSet[str]] @classmethod def from_room_config( @@ -146,27 +498,22 @@ def from_room_config( required_state_map=required_state_map, ) - def deep_copy(self) -> "RoomSyncConfig": - required_state_map: Dict[str, Set[str]] = { - state_type: state_key_set.copy() - for state_type, state_key_set in self.required_state_map.items() - } - - return RoomSyncConfig( - timeline_limit=self.timeline_limit, - required_state_map=required_state_map, - ) - def combine_room_sync_config( self, other_room_sync_config: "RoomSyncConfig" - ) -> None: + ) -> "RoomSyncConfig": """ - Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the + Combine this `RoomSyncConfig` with another `RoomSyncConfig` and return the superset union of the two. """ + timeline_limit = self.timeline_limit + required_state_map = { + event_type: set(state_keys) + for event_type, state_keys in self.required_state_map.items() + } + # Take the highest timeline limit - if self.timeline_limit < other_room_sync_config.timeline_limit: - self.timeline_limit = other_room_sync_config.timeline_limit + if timeline_limit < other_room_sync_config.timeline_limit: + timeline_limit = other_room_sync_config.timeline_limit # Union the required state for ( @@ -175,14 +522,14 @@ def combine_room_sync_config( ) in other_room_sync_config.required_state_map.items(): # If we already have a wildcard for everything, we don't need to add # anything else - if StateValues.WILDCARD in self.required_state_map.get( + if StateValues.WILDCARD in required_state_map.get( StateValues.WILDCARD, set() ): break # If we already have a wildcard `state_key` for this `state_type`, we don't need # to add anything else - if StateValues.WILDCARD in self.required_state_map.get(state_type, set()): + if StateValues.WILDCARD in required_state_map.get(state_type, set()): continue # If we're getting wildcards for the `state_type` and `state_key`, that's @@ -191,16 +538,14 @@ def combine_room_sync_config( state_type == StateValues.WILDCARD and StateValues.WILDCARD in state_key_set ): - self.required_state_map = {state_type: {StateValues.WILDCARD}} + required_state_map = {state_type: {StateValues.WILDCARD}} # We can break, since we don't need to add anything else break for state_key in state_key_set: # If we already have a wildcard for this specific `state_key`, we don't need # to add it since the wildcard already covers it. - if state_key in self.required_state_map.get( - StateValues.WILDCARD, set() - ): + if state_key in required_state_map.get(StateValues.WILDCARD, set()): continue # If we're getting a wildcard for the `state_type`, get rid of any other @@ -211,7 +556,7 @@ def combine_room_sync_config( # Make a copy so we don't run into an error: `dictionary changed size # during iteration`, when we remove items for existing_state_type, existing_state_key_set in list( - self.required_state_map.items() + required_state_map.items() ): # Make a copy so we don't run into an error: `Set changed size during # iteration`, when we filter out and remove items @@ -221,19 +566,21 @@ def combine_room_sync_config( # If we've the left the `set()` empty, remove it from the map if existing_state_key_set == set(): - self.required_state_map.pop(existing_state_type, None) + required_state_map.pop(existing_state_type, None) # If we're getting a wildcard `state_key`, get rid of any other state_keys # for this `state_type` since the wildcard will cover it already. if state_key == StateValues.WILDCARD: - self.required_state_map[state_type] = {state_key} + required_state_map[state_type] = {state_key} break # Otherwise, just add it to the set else: - if self.required_state_map.get(state_type) is None: - self.required_state_map[state_type] = {state_key} + if required_state_map.get(state_type) is None: + required_state_map[state_type] = {state_key} else: - self.required_state_map[state_type].add(state_key) + required_state_map[state_type].add(state_key) + + return RoomSyncConfig(timeline_limit, required_state_map) def must_await_full_state( self, @@ -324,7 +671,7 @@ class HaveSentRoomFlag(Enum): LIVE = "live" -T = TypeVar("T") +T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken) @attr.s(auto_attribs=True, slots=True, frozen=True) @@ -439,7 +786,7 @@ def record_unsent_rooms(self, room_ids: StrCollection, from_token: T) -> None: self._statuses[room_id] = HaveSentRoom.previously(from_token) -@attr.s(auto_attribs=True) +@attr.s(auto_attribs=True, frozen=True) class PerConnectionState: """The per-connection state. A snapshot of what we've sent down the connection before. diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 2cf2f2982fd..2ef9f665f9f 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -18,7 +18,6 @@ # # import logging -from copy import deepcopy from typing import Dict, List, Optional from unittest.mock import patch @@ -47,7 +46,7 @@ from synapse.server import HomeServer from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import JsonDict, StreamToken, UserID -from synapse.types.handlers import SlidingSyncConfig +from synapse.types.handlers.sliding_sync import SlidingSyncConfig from synapse.util import Clock from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -566,23 +565,11 @@ def test_combine_room_sync_config( """ Combine A into B and B into A to make sure we get the same result. """ - # Since we're mutating these in place, make a copy for each of our trials - room_sync_config_a = deepcopy(a) - room_sync_config_b = deepcopy(b) + combined_config = a.combine_room_sync_config(b) + self._assert_room_config_equal(combined_config, expected, "B into A") - # Combine B into A - room_sync_config_a.combine_room_sync_config(room_sync_config_b) - - self._assert_room_config_equal(room_sync_config_a, expected, "B into A") - - # Since we're mutating these in place, make a copy for each of our trials - room_sync_config_a = deepcopy(a) - room_sync_config_b = deepcopy(b) - - # Combine A into B - room_sync_config_b.combine_room_sync_config(room_sync_config_a) - - self._assert_room_config_equal(room_sync_config_b, expected, "A into B") + combined_config = a.combine_room_sync_config(b) + self._assert_room_config_equal(combined_config, expected, "A into B") class GetRoomMembershipForUserAtToTokenTestCase(HomeserverTestCase):