Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

add SpamChecker callback for silently dropping inbound federated events #12744

Merged
merged 8 commits into from
May 23, 2022
53 changes: 32 additions & 21 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,33 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
pdu.room_id, room_version, lock, origin, pdu
)

async def _get_next_valid_staged_event_for_room(
jesopo marked this conversation as resolved.
Show resolved Hide resolved
self, room_id: str, room_version: RoomVersion
) -> Optional[Tuple[str, EventBase]]:
"""Return the first non-spam event from staging queue."""
jesopo marked this conversation as resolved.
Show resolved Hide resolved

while True:
# We need to do this check outside the lock to avoid a race between
# a new event being inserted by another instance and it attempting
# to acquire the lock.
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)

if next is None:
return None

origin, event = next

if await self._spam_checker.should_drop_federated_event(event):
logger.warning(
"Staged federated event contains spam, dropping %s",
event.event_id,
)
continue

return next

@wrap_as_background_process("_process_incoming_pdus_in_room_inner")
async def _process_incoming_pdus_in_room_inner(
self,
Expand Down Expand Up @@ -1116,31 +1143,15 @@ async def _process_incoming_pdus_in_room_inner(
(self._clock.time_msec() - received_ts) / 1000
)

while True:
# We need to do this check outside the lock to avoid a race between
# a new event being inserted by another instance and it attempting
# to acquire the lock.
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)

if next is None:
break

origin, event = next

if await self._spam_checker.should_drop_federated_event(event):
logger.warning(
"Staged federated event contains spam, dropping %s",
event.event_id,
)
continue

break
next = await self._get_next_valid_staged_event_for_room(
room_id, room_version
)

if not next:
break

origin, event = next

# Prune the event queue if it's getting large.
#
# We do this *after* handling the first event as the common case is
Expand Down