diff --git a/changelog.d/11207.bugfix b/changelog.d/11207.bugfix new file mode 100644 index 000000000000..7e98d565a14f --- /dev/null +++ b/changelog.d/11207.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 67f8ffcafffd..ddc9105ee926 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -34,6 +34,7 @@ ) from synapse.storage.databases.main.directory import RoomAliasMapping from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID +from synapse.util.async_helpers import Linearizer from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -58,6 +59,10 @@ def __init__(self, hs: "HomeServer"): self.current_max = 0 self.is_processing = False + self._ephemeral_events_linearizer = Linearizer( + name="appservice_ephemeral_events" + ) + def notify_interested_services(self, max_token: RoomStreamToken) -> None: """Notifies (pushes) all application services interested in this event. @@ -260,26 +265,37 @@ async def _notify_interested_services_ephemeral( events = await self._handle_typing(service, new_token) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) + continue - elif stream_key == "receipt_key": - events = await self._handle_receipts(service) - if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) - - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "read_receipt", new_token + # Since we read/update the stream position for this AS/stream + with ( + await self._ephemeral_events_linearizer.queue( + (service.id, stream_key) ) + ): + if stream_key == "receipt_key": + events = await self._handle_receipts(service, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) - elif stream_key == "presence_key": - events = await self._handle_presence(service, users) - if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + elif stream_key == "presence_key": + events = await self._handle_presence(service, users, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token - ) + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) async def _handle_typing( self, service: ApplicationService, new_token: int @@ -316,7 +332,9 @@ async def _handle_typing( ) return typing - async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: + async def _handle_receipts( + self, service: ApplicationService, new_token: Optional[int] + ) -> List[JsonDict]: """ Return the latest read receipts that the given application service should receive. @@ -335,6 +353,12 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) + if new_token is not None and new_token <= from_key: + logger.debug( + "Rejecting token lower than or equal to stored: %s" % (new_token,) + ) + return [] + receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( service=service, from_key=from_key @@ -342,7 +366,10 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: return receipts async def _handle_presence( - self, service: ApplicationService, users: Collection[Union[str, UserID]] + self, + service: ApplicationService, + users: Collection[Union[str, UserID]], + new_token: Optional[int], ) -> List[JsonDict]: """ Return the latest presence updates that the given application service should receive. @@ -365,6 +392,12 @@ async def _handle_presence( from_key = await self.store.get_type_stream_id_for_appservice( service, "presence" ) + if new_token is not None and new_token <= from_key: + logger.debug( + "Rejecting token lower than or equal to stored: %s" % (new_token,) + ) + return [] + for user in users: if isinstance(user, str): user = UserID.from_string(user) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 43998020b2eb..1f6a924452ad 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -40,6 +40,7 @@ def setUp(self): hs.get_application_service_scheduler.return_value = self.mock_scheduler hs.get_clock.return_value = MockClock() self.handler = ApplicationServicesHandler(hs) + self.event_source = hs.get_event_sources() def test_notify_interested_services(self): interested_service = self._mkservice(is_interested=True) @@ -252,6 +253,56 @@ async def get_3pe_protocol(service, unusedProtocol): }, ) + def test_notify_interested_services_ephemeral(self): + """ + Test sending ephemeral events to the appservice handler are scheduled + to be pushed out to interested appservices, and that the stream ID is + updated accordingly. + """ + interested_service = self._mkservice(is_interested=True) + services = [interested_service] + + self.mock_store.get_app_services.return_value = services + self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable( + 579 + ) + + event = Mock(event_id="event_1") + self.event_source.sources.receipt.get_new_events_as.return_value = ( + make_awaitable(([event], None)) + ) + + self.handler.notify_interested_services_ephemeral("receipt_key", 580) + self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( + interested_service, [event] + ) + self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with( + interested_service, + "read_receipt", + 580, + ) + + def test_notify_interested_services_ephemeral_out_of_order(self): + """ + Test sending out of order ephemeral events to the appservice handler + are ignored. + """ + interested_service = self._mkservice(is_interested=True) + services = [interested_service] + + self.mock_store.get_app_services.return_value = services + self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable( + 580 + ) + + event = Mock(event_id="event_1") + self.event_source.sources.receipt.get_new_events_as.return_value = ( + make_awaitable(([event], None)) + ) + + self.handler.notify_interested_services_ephemeral("receipt_key", 579) + self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called() + def _mkservice(self, is_interested, protocols=None): service = Mock() service.is_interested.return_value = make_awaitable(is_interested)