Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Improve docstrings for methods related to sending EDUs to application…
Browse files Browse the repository at this point in the history
… services (#11138)
  • Loading branch information
anoadragon453 authored Oct 21, 2021
1 parent 0f9adc9 commit 6408372
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 23 deletions.
1 change: 1 addition & 0 deletions changelog.d/11138.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add docstrings and comments to the application service ephemeral event sending code.
94 changes: 82 additions & 12 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,26 @@ def notify_interested_services_ephemeral(
new_token: Optional[int],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.
This will determine which appservices
are interested in the event, and submit them.
"""
This is called by the notifier in the background when an ephemeral event is handled
by the homeserver.
Events will only be pushed to appservices
that have opted into ephemeral events
This will determine which appservices are interested in the event, and submit them.
Args:
stream_key: The stream the event came from.
new_token: The latest stream token
users: The user(s) involved with the event.
`stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
value for `stream_key` will cause this function to return early.
Ephemeral events will only be pushed to appservices that have opted into
them.
Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.
new_token: The latest stream token.
users: The users that should be informed of the new event, if any.
"""
if not self.notify_appservices:
return
Expand Down Expand Up @@ -232,40 +239,87 @@ async def _notify_interested_services_ephemeral(
for service in services:
# Only handle typing if we have the latest token
if stream_key == "typing_key" and new_token is not None:
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
#
# Instead we simply grab the latest typing updates in _handle_typing
# and, if they apply to this application service, send it off.
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
# We don't persist the token for typing_key for performance reasons

elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)

elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)

async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
"""
Return the typing events since the given stream token that the given application
service should receive.
First fetch all typing events between the given typing stream token (non-inclusive)
and the latest typing event stream token (inclusive). Then return only those typing
events that the given application service may be interested in.
Args:
service: The application service to check for which events it should receive.
new_token: A typing event stream token.
Returns:
A list of JSON dictionaries containing data derived from the typing events that
should be sent to the given application service.
"""
typing_source = self.event_sources.sources.typing
# Get the typing events from just before current
typing, _ = await typing_source.get_new_events_as(
service=service,
# For performance reasons, we don't persist the previous
# token in the DB and instead fetch the latest typing information
# token in the DB and instead fetch the latest typing event
# for appservices.
# TODO: It'd likely be more efficient to simply fetch the
# typing event with the given 'new_token' stream token and
# check if the given service was interested, rather than
# iterating over all typing events and only grabbing the
# latest few.
from_key=new_token - 1,
)
return typing

async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
First fetch all read receipts between the last receipt stream token that this
application service should have previously received (non-inclusive) and the
latest read receipt stream token (inclusive). Then from that set, return only
those read receipts that the given application service may be interested in.
Args:
service: The application service to check for which events it should receive.
Returns:
A list of JSON dictionaries containing data derived from the read receipts that
should be sent to the given application service.
"""
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
Expand All @@ -278,6 +332,22 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
) -> List[JsonDict]:
"""
Return the latest presence updates that the given application service should receive.
First, filter the given users list to those that the application service is
interested in. Then retrieve the latest presence updates since the
the last-known previously received presence stream token for the given
application service. Return those presence updates.
Args:
service: The application service that ephemeral events are being sent to.
users: The users that should receive the presence update.
Returns:
A list of json dictionaries containing data derived from the presence events
that should be sent to the given application service.
"""
events: List[JsonDict] = []
presence_source = self.event_sources.sources.presence
from_key = await self.store.get_type_stream_id_for_appservice(
Expand All @@ -290,9 +360,9 @@ async def _handle_presence(
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue

presence_events, _ = await presence_source.get_new_events(
user=user,
service=service,
from_key=from_key,
)
time_now = self.clock.time_msec()
Expand Down
4 changes: 4 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ async def notify_device_update(
) -> None:
"""Notify that a user's device(s) has changed. Pokes the notifier, and
remote servers if the user is local.
Args:
user_id: The Matrix ID of the user who's device list has been updated.
device_ids: The device IDs that have changed.
"""
if not device_ids:
# No changes to notify about, so this is a no-op.
Expand Down
34 changes: 29 additions & 5 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
from synapse.logging.utils import log_function
Expand Down Expand Up @@ -1483,11 +1482,37 @@ def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) ->
def format_user_presence_state(
state: UserPresenceState, now: int, include_user_id: bool = True
) -> JsonDict:
"""Convert UserPresenceState to a format that can be sent down to clients
"""Convert UserPresenceState to a JSON format that can be sent down to clients
and to other servers.
The "user_id" is optional so that this function can be used to format presence
updates for client /sync responses and for federation /send requests.
Args:
state: The user presence state to format.
now: The current timestamp since the epoch in ms.
include_user_id: Whether to include `user_id` in the returned dictionary.
As this function can be used both to format presence updates for client /sync
responses and for federation /send requests, only the latter needs the include
the `user_id` field.
Returns:
A JSON dictionary with the following keys:
* presence: The presence state as a str.
* user_id: Optional. Included if `include_user_id` is truthy. The canonical
Matrix ID of the user.
* last_active_ago: Optional. Included if `last_active_ts` is set on `state`.
The timestamp that the user was last active.
* status_msg: Optional. Included if `status_msg` is set on `state`. The user's
status.
* currently_active: Optional. Included only if `state.state` is "online".
Example:
{
"presence": "online",
"user_id": "@alice:example.com",
"last_active_ago": 16783813918,
"status_msg": "Hello world!",
"currently_active": True
}
"""
content: JsonDict = {"presence": state.state}
if include_user_id:
Expand Down Expand Up @@ -1526,7 +1551,6 @@ async def get_new_events(
is_guest: bool = False,
explicit_room_id: Optional[str] = None,
include_offline: bool = True,
service: Optional[ApplicationService] = None,
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
# 1. Get the rooms the user is in.
Expand Down
8 changes: 7 additions & 1 deletion synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,18 @@ async def get_new_events(
async def get_new_events_as(
self, from_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
"""Returns a set of new receipt events that an appservice
"""Returns a set of new read receipt events that an appservice
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
Returns:
A two-tuple containing the following:
* A list of json dictionaries derived from read receipts that the
appservice may be interested in.
* The current read receipt stream token.
"""
from_key = int(from_key)
to_key = self.get_current_key()
Expand Down
12 changes: 9 additions & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,17 +465,23 @@ async def get_new_events_as(
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
from_key: the stream position at which events should be fetched from.
service: The appservice which may be interested.
Returns:
A two-tuple containing the following:
* A list of json dictionaries derived from typing events that the
appservice may be interested in.
* The latest known room serial.
"""
with Measure(self.clock, "typing.get_new_events_as"):
from_key = int(from_key)
handler = self.get_typing_handler()

events = []
for room_id in handler._room_serials.keys():
if handler._room_serials[room_id] <= from_key:
continue

if not await service.matches_user_in_member_list(
room_id, handler.store
):
Expand Down
18 changes: 16 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,14 @@ def _notify_app_services_ephemeral(
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
):
) -> None:
"""Notify application services of ephemeral event activity.
Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event, if any.
"""
try:
stream_token = None
if isinstance(new_token, int):
Expand All @@ -402,10 +409,17 @@ def on_new_event(
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[Collection[str]] = None,
):
) -> None:
"""Used to inform listeners that something has happened event wise.
Will wake up all listeners for the given users and rooms.
Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event.
rooms: A collection of room IDs for which each joined member will be
informed of the new event.
"""
users = users or []
rooms = rooms or []
Expand Down

0 comments on commit 6408372

Please sign in to comment.