diff --git a/changelog.d/11138.misc b/changelog.d/11138.misc new file mode 100644 index 000000000000..79b777697593 --- /dev/null +++ b/changelog.d/11138.misc @@ -0,0 +1 @@ +Add docstrings and comments to the application service ephemeral event sending code. \ No newline at end of file diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 163278708c9e..36c206dae6a0 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -185,19 +185,26 @@ def notify_interested_services_ephemeral( new_token: Optional[int], users: Optional[Collection[Union[str, UserID]]] = None, ) -> None: - """This is called by the notifier in the background - when a ephemeral event handled by the homeserver. - - This will determine which appservices - are interested in the event, and submit them. + """ + This is called by the notifier in the background when an ephemeral event is handled + by the homeserver. - Events will only be pushed to appservices - that have opted into ephemeral events + This will determine which appservices are interested in the event, and submit them. Args: stream_key: The stream the event came from. - new_token: The latest stream token - users: The user(s) involved with the event. + + `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other + value for `stream_key` will cause this function to return early. + + Ephemeral events will only be pushed to appservices that have opted into + them. + + Appservices will only receive ephemeral events that fall within their + registered user and room namespaces. + + new_token: The latest stream token. + users: The users that should be informed of the new event, if any. """ if not self.notify_appservices: return @@ -232,21 +239,32 @@ async def _notify_interested_services_ephemeral( for service in services: # Only handle typing if we have the latest token if stream_key == "typing_key" and new_token is not None: + # Note that we don't persist the token (via set_type_stream_id_for_appservice) + # for typing_key due to performance reasons and due to their highly + # ephemeral nature. + # + # Instead we simply grab the latest typing updates in _handle_typing + # and, if they apply to this application service, send it off. events = await self._handle_typing(service, new_token) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) - # We don't persist the token for typing_key for performance reasons + elif stream_key == "receipt_key": events = await self._handle_receipts(service) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) + + # Persist the latest handled stream token for this appservice await self.store.set_type_stream_id_for_appservice( service, "read_receipt", new_token ) + elif stream_key == "presence_key": events = await self._handle_presence(service, users) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) + + # Persist the latest handled stream token for this appservice await self.store.set_type_stream_id_for_appservice( service, "presence", new_token ) @@ -254,18 +272,54 @@ async def _notify_interested_services_ephemeral( async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: + """ + Return the typing events since the given stream token that the given application + service should receive. + + First fetch all typing events between the given typing stream token (non-inclusive) + and the latest typing event stream token (inclusive). Then return only those typing + events that the given application service may be interested in. + + Args: + service: The application service to check for which events it should receive. + new_token: A typing event stream token. + + Returns: + A list of JSON dictionaries containing data derived from the typing events that + should be sent to the given application service. + """ typing_source = self.event_sources.sources.typing # Get the typing events from just before current typing, _ = await typing_source.get_new_events_as( service=service, # For performance reasons, we don't persist the previous - # token in the DB and instead fetch the latest typing information + # token in the DB and instead fetch the latest typing event # for appservices. + # TODO: It'd likely be more efficient to simply fetch the + # typing event with the given 'new_token' stream token and + # check if the given service was interested, rather than + # iterating over all typing events and only grabbing the + # latest few. from_key=new_token - 1, ) return typing async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: + """ + Return the latest read receipts that the given application service should receive. + + First fetch all read receipts between the last receipt stream token that this + application service should have previously received (non-inclusive) and the + latest read receipt stream token (inclusive). Then from that set, return only + those read receipts that the given application service may be interested in. + + Args: + service: The application service to check for which events it should receive. + + Returns: + A list of JSON dictionaries containing data derived from the read receipts that + should be sent to the given application service. + """ from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) @@ -278,6 +332,22 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: async def _handle_presence( self, service: ApplicationService, users: Collection[Union[str, UserID]] ) -> List[JsonDict]: + """ + Return the latest presence updates that the given application service should receive. + + First, filter the given users list to those that the application service is + interested in. Then retrieve the latest presence updates since the + the last-known previously received presence stream token for the given + application service. Return those presence updates. + + Args: + service: The application service that ephemeral events are being sent to. + users: The users that should receive the presence update. + + Returns: + A list of json dictionaries containing data derived from the presence events + that should be sent to the given application service. + """ events: List[JsonDict] = [] presence_source = self.event_sources.sources.presence from_key = await self.store.get_type_stream_id_for_appservice( @@ -290,9 +360,9 @@ async def _handle_presence( interested = await service.is_interested_in_presence(user, self.store) if not interested: continue + presence_events, _ = await presence_source.get_new_events( user=user, - service=service, from_key=from_key, ) time_now = self.clock.time_msec() diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 6eafbea25d02..68b446eb66c8 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -454,6 +454,10 @@ async def notify_device_update( ) -> None: """Notify that a user's device(s) has changed. Pokes the notifier, and remote servers if the user is local. + + Args: + user_id: The Matrix ID of the user who's device list has been updated. + device_ids: The device IDs that have changed. """ if not device_ids: # No changes to notify about, so this is a no-op. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b5968e047bf1..fdab50da373c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -52,7 +52,6 @@ from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError from synapse.api.presence import UserPresenceState -from synapse.appservice import ApplicationService from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background from synapse.logging.utils import log_function @@ -1483,11 +1482,37 @@ def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> def format_user_presence_state( state: UserPresenceState, now: int, include_user_id: bool = True ) -> JsonDict: - """Convert UserPresenceState to a format that can be sent down to clients + """Convert UserPresenceState to a JSON format that can be sent down to clients and to other servers. - The "user_id" is optional so that this function can be used to format presence - updates for client /sync responses and for federation /send requests. + Args: + state: The user presence state to format. + now: The current timestamp since the epoch in ms. + include_user_id: Whether to include `user_id` in the returned dictionary. + As this function can be used both to format presence updates for client /sync + responses and for federation /send requests, only the latter needs the include + the `user_id` field. + + Returns: + A JSON dictionary with the following keys: + * presence: The presence state as a str. + * user_id: Optional. Included if `include_user_id` is truthy. The canonical + Matrix ID of the user. + * last_active_ago: Optional. Included if `last_active_ts` is set on `state`. + The timestamp that the user was last active. + * status_msg: Optional. Included if `status_msg` is set on `state`. The user's + status. + * currently_active: Optional. Included only if `state.state` is "online". + + Example: + + { + "presence": "online", + "user_id": "@alice:example.com", + "last_active_ago": 16783813918, + "status_msg": "Hello world!", + "currently_active": True + } """ content: JsonDict = {"presence": state.state} if include_user_id: @@ -1526,7 +1551,6 @@ async def get_new_events( is_guest: bool = False, explicit_room_id: Optional[str] = None, include_offline: bool = True, - service: Optional[ApplicationService] = None, ) -> Tuple[List[UserPresenceState], int]: # The process for getting presence events are: # 1. Get the rooms the user is in. diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 374e961e3bf8..4911a1153519 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -241,12 +241,18 @@ async def get_new_events( async def get_new_events_as( self, from_key: int, service: ApplicationService ) -> Tuple[List[JsonDict], int]: - """Returns a set of new receipt events that an appservice + """Returns a set of new read receipt events that an appservice may be interested in. Args: from_key: the stream position at which events should be fetched from service: The appservice which may be interested + + Returns: + A two-tuple containing the following: + * A list of json dictionaries derived from read receipts that the + appservice may be interested in. + * The current read receipt stream token. """ from_key = int(from_key) to_key = self.get_current_key() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d10e9b8ec441..c411d6992421 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -465,17 +465,23 @@ async def get_new_events_as( may be interested in. Args: - from_key: the stream position at which events should be fetched from - service: The appservice which may be interested + from_key: the stream position at which events should be fetched from. + service: The appservice which may be interested. + + Returns: + A two-tuple containing the following: + * A list of json dictionaries derived from typing events that the + appservice may be interested in. + * The latest known room serial. """ with Measure(self.clock, "typing.get_new_events_as"): - from_key = int(from_key) handler = self.get_typing_handler() events = [] for room_id in handler._room_serials.keys(): if handler._room_serials[room_id] <= from_key: continue + if not await service.matches_user_in_member_list( room_id, handler.store ): diff --git a/synapse/notifier.py b/synapse/notifier.py index 1a9f84ba4533..1acd899fab79 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -379,7 +379,14 @@ def _notify_app_services_ephemeral( stream_key: str, new_token: Union[int, RoomStreamToken], users: Optional[Collection[Union[str, UserID]]] = None, - ): + ) -> None: + """Notify application services of ephemeral event activity. + + Args: + stream_key: The stream the event came from. + new_token: The value of the new stream token. + users: The users that should be informed of the new event, if any. + """ try: stream_token = None if isinstance(new_token, int): @@ -402,10 +409,17 @@ def on_new_event( new_token: Union[int, RoomStreamToken], users: Optional[Collection[Union[str, UserID]]] = None, rooms: Optional[Collection[str]] = None, - ): + ) -> None: """Used to inform listeners that something has happened event wise. Will wake up all listeners for the given users and rooms. + + Args: + stream_key: The stream the event came from. + new_token: The value of the new stream token. + users: The users that should be informed of the new event. + rooms: A collection of room IDs for which each joined member will be + informed of the new event. """ users = users or [] rooms = rooms or []