From bbf79dd2323958c7825831954a296ee028b9f950 Mon Sep 17 00:00:00 2001 From: jesopo Date: Mon, 23 May 2022 15:41:39 +0000 Subject: [PATCH] move should_drop_federated_event while look to a helper function Signed-off-by: jesopo --- synapse/federation/federation_server.py | 54 +++++++++++++++---------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8b9717459d55..16eabf228e01 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1039,6 +1039,34 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None: pdu.room_id, room_version, lock, origin, pdu ) + async def _get_next_valid_staged_event_for_room( + self, room_id: str, room_version: RoomVersion + ) -> Optional[Tuple[str, EventBase]]: + """Return the first non-spam event from staging queue. + """ + + while True: + # We need to do this check outside the lock to avoid a race between + # a new event being inserted by another instance and it attempting + # to acquire the lock. + next = await self.store.get_next_staged_event_for_room( + room_id, room_version + ) + + if next is None: + return None + + origin, event = next + + if await self._spam_checker.should_drop_federated_event(event): + logger.warning( + "Staged federated event contains spam, dropping %s", + event.event_id, + ) + continue + + return next + @wrap_as_background_process("_process_incoming_pdus_in_room_inner") async def _process_incoming_pdus_in_room_inner( self, @@ -1116,31 +1144,15 @@ async def _process_incoming_pdus_in_room_inner( (self._clock.time_msec() - received_ts) / 1000 ) - while True: - # We need to do this check outside the lock to avoid a race between - # a new event being inserted by another instance and it attempting - # to acquire the lock. - next = await self.store.get_next_staged_event_for_room( - room_id, room_version - ) - - if next is None: - break - - origin, event = next - - if await self._spam_checker.should_drop_federated_event(event): - logger.warning( - "Staged federated event contains spam, dropping %s", - event.event_id, - ) - continue - - break + next = await self._get_next_valid_staged_event_for_room( + room_id, room_version + ) if not next: break + origin, event = next + # Prune the event queue if it's getting large. # # We do this *after* handling the first event as the common case is