Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implement MSC3395: Synthetic appservice events #10909

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ async def matches_user_in_member_list(
for user_id in member_list:
if self.is_interested_in_user(user_id):
return True

return False

def _matches_room_id(self, event: EventBase) -> bool:
Expand Down Expand Up @@ -233,6 +234,15 @@ async def is_interested(

return False

def is_interested_in_synthetic_user_event(self, event_type: str, user_id: UserID):
for regex_obj in self.namespaces["users"]:
if not regex_obj["regex"].match(user_id):
continue
# TODO: Validate structure further up.
if event_type in regex_obj.get("uk.half-shot.msc3395.synthetic_events", {"events": []})["events"]:
return True
return False

@cached(num_args=1)
async def is_interested_in_presence(
self, user_id: UserID, store: "DataStore"
Expand All @@ -258,10 +268,10 @@ async def is_interested_in_presence(
return False

def is_interested_in_user(self, user_id: str) -> bool:
return (
bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
or user_id == self.sender
)
if user_id == self.sender:
return True
regex_obj = self._matches_regex(user_id, ApplicationService.NS_USERS)
return regex_obj and regex_obj.get("uk.half-shot.msc3395.events", True)

def is_interested_in_alias(self, alias: str) -> bool:
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
Expand Down Expand Up @@ -329,11 +339,13 @@ def __init__(
id: int,
events: List[EventBase],
ephemeral: List[JsonDict],
synthetic_events: Optional[JsonDict] = None,
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
self.synthetic_events = synthetic_events

async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
Expand All @@ -347,6 +359,7 @@ async def send(self, as_api: "ApplicationServiceApi") -> bool:
service=self.service,
events=self.events,
ephemeral=self.ephemeral,
synthetic_events=self.synthetic_events,
txn_id=self.id,
)

Expand Down
11 changes: 8 additions & 3 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ async def push_bulk(
service: "ApplicationService",
events: List[EventBase],
ephemeral: List[JsonDict],
synthetic_events: Optional[List[JsonDict]],
txn_id: Optional[int] = None,
):
if service.url is None:
Expand All @@ -218,11 +219,15 @@ async def push_bulk(

uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))

body = {"events": events}

# Never send ephemeral events to appservices that do not support it
if service.supports_ephemeral:
body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
else:
body = {"events": events}
body["de.sorunome.msc2409.ephemeral"] = ephemeral

# We will only populate this if the appservice requests synthetic events
if synthetic_events and len(synthetic_events):
body["uk.half-shot.msc3395.synthetic_events"] = synthetic_events

try:
await self.put_json(
Expand Down
23 changes: 20 additions & 3 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
# Maximum number of ephemeral events to provide in an AS transaction.
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100

# Maximum number of syntethci events to provide in an AS transaction.
MAX_SYNTHETIC_EVENTS_PER_TRANSACTION = 100

class ApplicationServiceScheduler:
"""Public facing API for this module. Does the required DI to tie the
Expand Down Expand Up @@ -99,6 +101,11 @@ def submit_ephemeral_events_for_as(
):
self.queuer.enqueue_ephemeral(service, events)

def submit_synthetic_events_for_as(
self, service: ApplicationService, events: List[JsonDict]
):
self.queuer.enqueue_ephemeral(service, events)


class _ServiceQueuer:
"""Queue of events waiting to be sent to appservices.
Expand All @@ -111,6 +118,7 @@ class _ServiceQueuer:
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
self.queued_ephemeral = {} # dict of {service_id: [events]}
self.queued_synthetic = {} # dict of {service_id: [events]}

# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
Expand All @@ -134,6 +142,10 @@ def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict])
self.queued_ephemeral.setdefault(service.id, []).extend(events)
self._start_background_request(service)

def enqueue_syntheic(self, service: ApplicationService, events: List[JsonDict]):
self.queued_synthetic.setdefault(service.id, []).extend(events)
self._start_background_request(service)

async def _send_request(self, service: ApplicationService):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
Expand All @@ -150,11 +162,15 @@ async def _send_request(self, service: ApplicationService):
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]

if not events and not ephemeral:
all_events_synthetic = self.queued_synthetic.get(service.id, [])
synthetic = all_events_ephemeral[:MAX_SYNTHETIC_EVENTS_PER_TRANSACTION]
del all_events_synthetic[:MAX_SYNTHETIC_EVENTS_PER_TRANSACTION]

if not events and not ephemeral and not synthetic:
return

try:
await self.txn_ctrl.send(service, events, ephemeral)
await self.txn_ctrl.send(service, events, ephemeral, synthetic)
except Exception:
logger.exception("AS request failed")
finally:
Expand Down Expand Up @@ -191,10 +207,11 @@ async def send(
service: ApplicationService,
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
synthetic_events: Optional[List[JsonDict]] = None,
):
try:
txn = await self.store.create_appservice_txn(
service=service, events=events, ephemeral=ephemeral or []
service=service, events=events, ephemeral=ephemeral or [], synthetic_events=synthetic_events
)
service_is_up = await self._is_service_up(service)
if service_is_up:
Expand Down
58 changes: 57 additions & 1 deletion synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,58 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
finally:
self.is_processing = False

def notify_synthetic_event(
self,
event_type: str,
user_id: UserID,
content: JsonDict,
) -> None:
"""This is called when another service wishes to
notify about a synthetic event.

This will determine which appservices
are interested in the event, and submit them.

Events will only be pushed to appservices
that have opted into ephemeral events

Args:
event_type: The type of event to notify about.
user_id: The user_id of the user involved in the event.
content: The content of the event itself.
"""
if not self.notify_appservices:
return

logger.debug("Checking interested services for synthetic event %s:%s" % (event_type, user_id))
services = self._get_services_for_user_synthetic_event(event_type, user_id)

if not services:
return

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_synthetic_event(
services, user_id, {
"type": event_type,
"content": content,
}
)

@wrap_as_background_process("notify_synthetic_event")
async def _notify_synthetic_event(
self,
services: List[ApplicationService],
user_id: UserID,
event: JsonDict,
) -> None:
logger.debug("Submitting synthetic event to interested services %s:%s" % (event["type"], user_id))
with Measure(self.clock, "notify_synthetic_event"):
for service in services:
# TODO: Store event in DB if we can't submit it now.
self.scheduler.submit_synthetic_events_for_as(service, [event])


def notify_interested_services_ephemeral(
self,
stream_key: str,
Expand Down Expand Up @@ -434,12 +486,16 @@ async def _get_services_for_event(

def _get_services_for_user(self, user_id: str) -> List[ApplicationService]:
services = self.store.get_app_services()
return [s for s in services if (s.is_interested_in_user(user_id))]
return [s for s in services if s.is_interested_in_user(user_id)]

def _get_services_for_3pn(self, protocol: str) -> List[ApplicationService]:
services = self.store.get_app_services()
return [s for s in services if s.is_interested_in_protocol(protocol)]

def _get_services_for_user_synthetic_event(self, event_type: str, user_id: str) -> List[ApplicationService]:
services = self.store.get_app_services()
return [s for s in services if s.is_interested_in_synthetic_user_event(event_type, user_id)]

async def _is_unknown_user(self, user_id: str) -> bool:
if not self.is_mine_id(user_id):
# we don't know if they are unknown or not since it isn't one of our
Expand Down
21 changes: 21 additions & 0 deletions synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,17 @@ async def delete_access_token(self, access_token: str) -> None:
access_token=access_token,
)


# Inform interested appservices
self.hs.get_application_service_handler().notify_synthetic_event(
"m.user.logout",
user_id,
{
"user_id": user_info.user_id,
"device_id": user_info.device_id,
}
)

# delete pushers associated with this access token
if user_info.token_id is not None:
await self.hs.get_pusherpool().remove_pushers_by_access_token(
Expand Down Expand Up @@ -1408,6 +1419,16 @@ async def delete_access_tokens_for_user(
user_id, (token_id for _, token_id, _ in tokens_and_devices)
)

# Inform interested appservices
self.hs.get_application_service_handler().notify_synthetic_event(
"m.user.logout",
user_id,
{
"user_id": user_id,
"device_id": device_id,
}
)

async def add_threepid(
self, user_id: str, medium: str, address: str, validated_at: int
) -> None:
Expand Down
11 changes: 11 additions & 0 deletions synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, hs: "HomeServer"):
self._room_member_handler = hs.get_room_member_handler()
self._identity_handler = hs.get_identity_handler()
self._profile_handler = hs.get_profile_handler()
self._application_service_handler = hs.get_application_service_handler()
self.user_directory_handler = hs.get_user_directory_handler()
self._server_name = hs.hostname

Expand Down Expand Up @@ -159,6 +160,16 @@ async def deactivate_account(
# Mark the user as deactivated.
await self.store.set_user_deactivated_status(user_id, True)


# Inform interested appservices
self._application_service_handler.notify_synthetic_event(
"m.user.deactivated",
user_id,
{
"user_id": user_id,
}
)

return identity_server_supports_unbinding

async def _reject_pending_invites_for_user(self, user_id: str) -> None:
Expand Down
15 changes: 15 additions & 0 deletions synapse/replication/http/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, hs):
super().__init__(hs)
self.store = hs.get_datastore()
self.registration_handler = hs.get_registration_handler()
self._application_service_handler = hs.get_application_service_handler()

@staticmethod
async def _serialize_payload(
Expand Down Expand Up @@ -91,6 +92,20 @@ async def _handle_request(self, request, user_id):
shadow_banned=content["shadow_banned"],
)

# Inform interested appservices
self._application_service_handler.notify_synthetic_event(
"m.user.registration",
user_id,
{
"user_id": user_id,
"guest": content["make_guest"],
"org.matrix.synapse.admin": content["admin"],
"org.matrix.synapse.user_type": content["user_type"],
"org.matrix.synapse.shadow_banned": content["shadow_banned"],
"org.matrix.synapse.appservice_id": content["appservice_id"],
}
)

return 200, {}


Expand Down
12 changes: 12 additions & 0 deletions synapse/rest/client/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(self, hs: "HomeServer"):

self.auth_handler = self.hs.get_auth_handler()
self.registration_handler = hs.get_registration_handler()
self.appservice_handler = hs.get_application_service_handler()
self._sso_handler = hs.get_sso_handler()

self._well_known_builder = WellKnownBuilder(hs)
Expand Down Expand Up @@ -353,6 +354,17 @@ async def _complete_login(
if callback is not None:
await callback(result)

# Inform interested appservices
self.appservice_handler.notify_synthetic_event(
"m.user.login",
user_id,
{
"user_id": user_id,
"device_id": device_id,
}
)


return result

async def _do_token_login(
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async def create_appservice_txn(
service: ApplicationService,
events: List[EventBase],
ephemeral: List[JsonDict],
synthetic_events: Optional[List[JsonDict]] = None,
) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
with the given list of events. Ephemeral events are NOT persisted to the
Expand Down Expand Up @@ -233,7 +234,7 @@ def _create_appservice_txn(txn):
(service.id, new_txn_id, event_ids),
)
return AppServiceTransaction(
service=service, id=new_txn_id, events=events, ephemeral=ephemeral
service=service, id=new_txn_id, events=events, ephemeral=ephemeral, synthetic_events=synthetic_events
)

return await self.db_pool.runInteraction(
Expand Down