Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Support sending device lists everywhere; needs cleaning up
Browse files Browse the repository at this point in the history
  • Loading branch information
anoadragon453 committed Feb 1, 2022
1 parent d6a123a commit f1a31c4
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 41 deletions.
10 changes: 5 additions & 5 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
from synapse.types import DeviceLists, GroupID, JsonDict, UserID, get_domain_from_id
from synapse.util.caches.descriptors import _CacheContext, cached

if TYPE_CHECKING:
Expand Down Expand Up @@ -325,10 +325,7 @@ async def is_interested_in_presence(
return False

def is_user_in_namespace(self, user_id: str) -> bool:
return (
bool(self._matches_regex(ApplicationService.NS_USERS, user_id))
or user_id == self.sender
)
return bool(self._matches_regex(ApplicationService.NS_USERS, user_id))

def is_room_alias_in_namespace(self, alias: str) -> bool:
return bool(self._matches_regex(ApplicationService.NS_ALIASES, alias))
Expand Down Expand Up @@ -397,12 +394,14 @@ def __init__(
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
device_list_summary: DeviceLists,
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
self.to_device_messages = to_device_messages
self.device_list_summary = device_list_summary

async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
Expand All @@ -417,6 +416,7 @@ async def send(self, as_api: "ApplicationServiceApi") -> bool:
events=self.events,
ephemeral=self.ephemeral,
to_device_messages=self.to_device_messages,
device_list_summary=self.device_list_summary,
txn_id=self.id,
)

Expand Down
21 changes: 18 additions & 3 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,7 +14,7 @@
# limitations under the License.
import logging
import urllib.parse
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union

from prometheus_client import Counter

Expand All @@ -22,7 +23,7 @@
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.http.client import SimpleHttpClient
from synapse.types import JsonDict, ThirdPartyInstanceID
from synapse.types import DeviceLists, JsonDict, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache

if TYPE_CHECKING:
Expand Down Expand Up @@ -219,6 +220,7 @@ async def push_bulk(
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
device_list_summary: DeviceLists,
txn_id: Optional[int] = None,
) -> bool:
"""
Expand Down Expand Up @@ -252,7 +254,7 @@ async def push_bulk(
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))

# Never send ephemeral events to appservices that do not support it
body: Dict[str, List[JsonDict]] = {"events": serialized_events}
body: Dict[str, Union[JsonDict, List[JsonDict]]] = {"events": serialized_events}
if service.supports_ephemeral:
body.update(
{
Expand All @@ -262,6 +264,19 @@ async def push_bulk(
}
)

# Send device list summaries if needed
if device_list_summary:
logger.info("Sending device list summary: %s", device_list_summary)
body.update(
{
# TODO: Update to stable prefix once MSC3202 completes FCP merge
"org.matrix.msc3202.device_lists": {
"changed": list(device_list_summary.changed),
"left": list(device_list_summary.left),
}
}
)

try:
await self.put_json(
uri=uri,
Expand Down
63 changes: 59 additions & 4 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main import DataStore
from synapse.types import JsonDict
from synapse.types import DeviceLists, JsonDict
from synapse.util import Clock

if TYPE_CHECKING:
Expand Down Expand Up @@ -115,6 +115,7 @@ def enqueue_for_appservice(
events: Optional[Collection[EventBase]] = None,
ephemeral: Optional[Collection[JsonDict]] = None,
to_device_messages: Optional[Collection[JsonDict]] = None,
device_list_summary: Optional[DeviceLists] = None,
) -> None:
"""
Enqueue some data to be sent off to an application service.
Expand All @@ -126,10 +127,18 @@ def enqueue_for_appservice(
to_device_messages: The to-device messages to send. These differ from normal
to-device messages sent to clients, as they have 'to_device_id' and
'to_user_id' fields.
device_list_summary: A summary of users that the application service either needs
to refresh the device lists of, or those that the application service need no
longer track the device lists of.
"""
# We purposefully allow this method to run with empty events/ephemeral
# collections, so that callers do not need to check iterable size themselves.
if not events and not ephemeral and not to_device_messages:
if (
not events
and not ephemeral
and not to_device_messages
and not device_list_summary
):
return

if events:
Expand All @@ -140,6 +149,10 @@ def enqueue_for_appservice(
self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
to_device_messages
)
if device_list_summary:
self.queuer.queued_device_list_summaries.setdefault(
appservice.id, []
).append(device_list_summary)

# Kick off a new application service transaction
self.queuer.start_background_request(appservice)
Expand All @@ -160,6 +173,8 @@ def __init__(self, txn_ctrl: "_TransactionController", clock: Clock):
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
# dict of {service_id: [to_device_message_json]}
self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
# dict of {service_id: [device_list_summary]}
self.queued_device_list_summaries: Dict[str, List[DeviceLists]] = {}

# the appservices which currently have a transaction in flight
self.requests_in_flight: Set[str] = set()
Expand Down Expand Up @@ -199,12 +214,49 @@ async def _send_request(self, service: ApplicationService) -> None:
]
del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]

if not events and not ephemeral and not to_device_messages_to_send:
# Consolidate any pending device list summaries into a single, up-to-date
# summary.
# Note: this code assumes that in a single DeviceLists, a user will
# never be in both "changed" and "left" sets.
device_list_summary = DeviceLists()
while self.queued_device_list_summaries.get(service.id, []):
# Pop a summary off the front of the queue
summary = self.queued_device_list_summaries[service.id].pop(0)

# For every user in the incoming "changed" set:
# * Remove them from the existing "left" set if necessary
# (as we need to start tracking them again)
# * Add them to the existing "changed" set if necessary.
for user_id in summary.changed:
if user_id in device_list_summary.left:
device_list_summary.left.remove(user_id)
device_list_summary.changed.add(user_id)

# For every user in the incoming "left" set:
# * Remove them from the existing "changed" set if necessary
# (we no longer need to track them)
# * Add them to the existing "left" set if necessary.
for user_id in summary.left:
if user_id in device_list_summary.changed:
device_list_summary.changed.remove(user_id)
device_list_summary.left.add(user_id)

if (
not events
and not ephemeral
and not to_device_messages_to_send
# Note that DeviceLists implements __bool__
and not device_list_summary
):
return

try:
await self.txn_ctrl.send(
service, events, ephemeral, to_device_messages_to_send
service,
events,
ephemeral,
to_device_messages_to_send,
device_list_summary,
)
except Exception:
logger.exception("AS request failed")
Expand Down Expand Up @@ -238,6 +290,7 @@ async def send(
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
to_device_messages: Optional[List[JsonDict]] = None,
device_list_summary: Optional[DeviceLists] = None,
) -> None:
"""
Create a transaction with the given data and send to the provided
Expand All @@ -248,13 +301,15 @@ async def send(
events: The persistent events to include in the transaction.
ephemeral: The ephemeral events to include in the transaction.
to_device_messages: The to-device messages to include in the transaction.
device_list_summary: The device list summary to include in the transaction.
"""
try:
txn = await self.store.create_appservice_txn(
service=service,
events=events,
ephemeral=ephemeral or [],
to_device_messages=to_device_messages or [],
device_list_summary=device_list_summary or DeviceLists(),
)
service_is_up = await self._is_service_up(service)
if service_is_up:
Expand Down
56 changes: 29 additions & 27 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@
wrap_as_background_process,
)
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.types import DeviceLists, JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.metrics import Measure

if TYPE_CHECKING:
Expand Down Expand Up @@ -344,22 +343,16 @@ async def _notify_interested_services_ephemeral(
)

elif stream_key == "device_list_key":
users_whose_device_lists_changed = await self._get_device_list_changes(
device_list_summary = await self._get_device_list_summary(
service, new_token
)
if users_whose_device_lists_changed:
# TODO: Have a way of including things in an outgoing appservice
# transaction that's not "events" or "ephemeral"
payload = [{
"changed": users_whose_device_lists_changed,
"left": [],
}]
self.scheduler.submit_ephemeral_events_for_as(
service, payload
if device_list_summary:
self.scheduler.enqueue_for_appservice(
service, device_list_summary=device_list_summary
)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
await self.store.set_appservice_stream_type_pos(
service, "device_list", new_token
)

Expand Down Expand Up @@ -569,11 +562,11 @@ async def _get_to_device_messages(

return message_payload

async def _get_device_list_changes(
async def _get_device_list_summary(
self,
appservice: ApplicationService,
new_key: int,
) -> List[str]:
) -> DeviceLists:
"""
Retrieve a list of users who have changed their device lists.
Expand All @@ -582,8 +575,9 @@ async def _get_device_list_changes(
new_key: The stream key of the device list change that triggered this method call.
Returns:
A list of users whose device lists have changed and need to be resynced by the
appservice.
A set of device list updates, comprised of users that the appservices needs to:
* resync the device list of, and
* stop tracking the device list of.
"""
# Fetch the last successfully processed device list update stream ID
# for this appservice.
Expand All @@ -592,21 +586,31 @@ async def _get_device_list_changes(
)

# Fetch the users who have modified their device list since then.
users_with_changed_device_lists = await self.store.get_users_whose_devices_changed(
from_key, filter_user_ids=None, to_key=new_key
users_with_changed_device_lists = (
await self.store.get_users_whose_devices_changed(
from_key, filter_user_ids=None, to_key=new_key
)
)

# Filter out any users the application service is not interested in
#
# For each user who changed their device list, we want to check whether this
# appservice would be interested in the change
filtered_users_with_changed_device_lists = [
# appservice would be interested in the change.
filtered_users_with_changed_device_lists = {
user_id
for user_id in users_with_changed_device_lists
if self._is_appservice_interested_in_device_lists_of_user(appservice, user_id)
]
if self._is_appservice_interested_in_device_lists_of_user(
appservice, user_id
)
}

# Create a summary of "changed" and "left" users.
# TODO: Calculate "left" users.
device_list_summary = DeviceLists(
changed=filtered_users_with_changed_device_lists
)

return filtered_users_with_changed_device_lists
return device_list_summary

async def _is_appservice_interested_in_device_lists_of_user(
self,
Expand Down Expand Up @@ -642,9 +646,7 @@ async def _is_appservice_interested_in_device_lists_of_user(
for room_id in room_ids:
# This method covers checking room members for appservice interest as well as
# room ID and alias checks.
if await appservice.is_interested_in_room(
room_id, self.store
):
if await appservice.is_interested_in_room(room_id, self.store):
return True

return False
Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.types import JsonDict
from synapse.types import DeviceLists, JsonDict
from synapse.util import json_encoder

if TYPE_CHECKING:
Expand Down Expand Up @@ -199,6 +199,7 @@ async def create_appservice_txn(
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
device_list_summary: DeviceLists,
) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
with the given list of events. Ephemeral events are NOT persisted to the
Expand All @@ -209,6 +210,7 @@ async def create_appservice_txn(
events: A list of persistent events to put in the transaction.
ephemeral: A list of ephemeral events to put in the transaction.
to_device_messages: A list of to-device messages to put in the transaction.
device_list_summary: The device list summary to include in the transaction.
Returns:
A new transaction.
Expand Down Expand Up @@ -244,6 +246,7 @@ def _create_appservice_txn(txn):
events=events,
ephemeral=ephemeral,
to_device_messages=to_device_messages,
device_list_summary=device_list_summary,
)

return await self.db_pool.runInteraction(
Expand Down Expand Up @@ -341,6 +344,7 @@ def _get_oldest_unsent_txn(txn):
events=events,
ephemeral=[],
to_device_messages=[],
device_list_summary=DeviceLists(),
)

def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
Expand Down
Loading

0 comments on commit f1a31c4

Please sign in to comment.