Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Bound ephemeral events by key #12544

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12544.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug where attempting to send a large amount of read receipts to an application service all at once would result in duplicate content and abnormally high memory usage. Contributed by Brad & Nick @ Beeper.
4 changes: 2 additions & 2 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ async def _handle_typing(
return typing

async def _handle_receipts(
self, service: ApplicationService, new_token: Optional[int]
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
Expand Down Expand Up @@ -447,7 +447,7 @@ async def _handle_receipts(

receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
service=service, from_key=from_key, to_key=new_token
)
return receipts

Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,14 @@ async def get_new_events(
return events, to_key

async def get_new_events_as(
self, from_key: int, service: ApplicationService
self, from_key: int, to_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
"""Returns a set of new read receipt events that an appservice
may be interested in.

Args:
from_key: the stream position at which events should be fetched from
to_key: the stream position up to which events should be fetched to
service: The appservice which may be interested

Returns:
Expand All @@ -255,7 +256,6 @@ async def get_new_events_as(
* The current read receipt stream token.
"""
from_key = int(from_key)
to_key = self.get_current_key()

if from_key == to_key:
return [], to_key
Expand Down
82 changes: 82 additions & 0 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,88 @@ def prepare(self, reactor, clock, hs):
"exclusive_as_user", "password", self.exclusive_as_user_device_id
)

def test_sending_read_receipt_batches_to_application_services(self):
"""Tests that a large batch of read receipts are sent correctly to
interested application services.
"""
# Register an application service that's interested in a certain user
# and room prefix
interested_appservice = self._register_application_service(
namespaces={
ApplicationService.NS_USERS: [
{
"regex": "@exclusive_as_user:.+",
"exclusive": True,
}
],
ApplicationService.NS_ROOMS: [
{
"regex": "!fakeroom_.*",
"exclusive": True,
}
],
},
)

# "Complete" a transaction.
# All this really does for us is make an entry in the application_services_state
# database table, which tracks the current stream_token per stream ID per AS.
self.get_success(
self.hs.get_datastores().main.complete_appservice_txn(
0,
interested_appservice,
)
)

# Now, pretend that we receive a large burst of read receipts (300 total) that
# all come in at once.
for i in range(300):
self.get_success(
# Insert a fake read receipt into the database
self.hs.get_datastores().main.insert_receipt(
# We have to use unique room ID + user ID combinations here, as the db query
# is an upsert.
room_id=f"!fakeroom_{i}:test",
receipt_type="m.read",
user_id=self.local_user,
event_ids=[f"$eventid_{i}"],
data={},
)
)

# Now notify the appservice handler that 300 read receipts have all arrived
# at once. What will it do!
# note: stream tokens start at 2
for stream_token in range(2, 303):
self.get_success(
self.hs.get_application_service_handler()._notify_interested_services_ephemeral(
services=[interested_appservice],
stream_key="receipt_key",
new_token=stream_token,
users=[self.exclusive_as_user],
)
)

# Using our txn send mock, we can see what the AS received. After iterating over every
# transaction, we'd like to see all 300 read receipts accounted for.
# No more, no less.
all_ephemeral_events = []
for call in self.send_mock.call_args_list:
ephemeral_events = call[0][2]
all_ephemeral_events += ephemeral_events

# Ensure that no duplicate events were sent
self.assertEqual(len(all_ephemeral_events), 300)

# Check that the ephemeral event is a read receipt with the expected structure
latest_read_receipt = all_ephemeral_events[-1]
self.assertEqual(latest_read_receipt["type"], "m.receipt")

event_id = list(latest_read_receipt["content"].keys())[0]
self.assertEqual(
latest_read_receipt["content"][event_id]["m.read"], {self.local_user: {}}
)

@unittest.override_config(
{"experimental_features": {"msc2409_to_device_messages_enabled": True}}
)
Expand Down