From 97c5a7a6b2e6fb2fdcbc6ceecf840e971b262a39 Mon Sep 17 00:00:00 2001 From: jesopo Date: Mon, 16 May 2022 14:15:26 +0000 Subject: [PATCH 1/8] add SpamChecker callback for hard rejecting federated events Signed-off-by: jesopo --- synapse/events/spamcheck.py | 35 +++++++++++++++++++++++ synapse/federation/federation_server.py | 37 +++++++++++++++++++------ synapse/module_api/__init__.py | 3 ++ 3 files changed, 67 insertions(+), 8 deletions(-) diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index f30207376ae2..1a241631520b 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -44,6 +44,10 @@ ["synapse.events.EventBase"], Awaitable[Union[bool, str]], ] +DROP_FEDERATED_EVENT_CALLBACK = Callable[ + ["synapse.events.EventBase"], + Awaitable[Union[bool, str]], +] USER_MAY_JOIN_ROOM_CALLBACK = Callable[[str, str, bool], Awaitable[bool]] USER_MAY_INVITE_CALLBACK = Callable[[str, str, str], Awaitable[bool]] USER_MAY_SEND_3PID_INVITE_CALLBACK = Callable[[str, str, str, str], Awaitable[bool]] @@ -93,6 +97,7 @@ def load_legacy_spam_checkers(hs: "synapse.server.HomeServer") -> None: # which name appears in this set, we'll want to register it. spam_checker_methods = { "check_event_for_spam", + "drop_federated_event", "user_may_invite", "user_may_create_room", "user_may_create_room_alias", @@ -168,6 +173,7 @@ def __init__(self, hs: "synapse.server.HomeServer") -> None: self.clock = hs.get_clock() self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = [] + self._drop_federated_event_callbacks: List[DROP_FEDERATED_EVENT_CALLBACK] = [] self._user_may_join_room_callbacks: List[USER_MAY_JOIN_ROOM_CALLBACK] = [] self._user_may_invite_callbacks: List[USER_MAY_INVITE_CALLBACK] = [] self._user_may_send_3pid_invite_callbacks: List[ @@ -191,6 +197,7 @@ def __init__(self, hs: "synapse.server.HomeServer") -> None: def register_callbacks( self, check_event_for_spam: Optional[CHECK_EVENT_FOR_SPAM_CALLBACK] = None, + drop_federated_event: Optional[DROP_FEDERATED_EVENT_CALLBACK] = None, user_may_join_room: Optional[USER_MAY_JOIN_ROOM_CALLBACK] = None, user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None, user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None, @@ -209,6 +216,9 @@ def register_callbacks( if check_event_for_spam is not None: self._check_event_for_spam_callbacks.append(check_event_for_spam) + if drop_federated_event is not None: + self._drop_federated_event_callbacks.append(drop_federated_event) + if user_may_join_room is not None: self._user_may_join_room_callbacks.append(user_may_join_room) @@ -268,6 +278,31 @@ async def check_event_for_spam( return False + async def drop_federated_event( + self, event: "synapse.events.EventBase" + ) -> Union[bool, str]: + """Checks if a given federated event is considered "spammy" by this + server. + + If the server considers an event spammy, it will be silently dropped, + and in doing so will split-brain our view of the room's DAG. + + Args: + event: the event to be checked + + Returns: + True if the event should be silently dropped + """ + for callback in self._drop_federated_event_callbacks: + with Measure( + self.clock, "{}.{}".format(callback.__module__, callback.__qualname__) + ): + res: Union[bool, str] = await delay_cancellation(callback(event)) + if res: + return res + + return False + async def user_may_join_room( self, user_id: str, room_id: str, is_invited: bool ) -> bool: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 884b5d60b4f9..9a6b487f4ae4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -110,6 +110,7 @@ def __init__(self, hs: "HomeServer"): self.handler = hs.get_federation_handler() self.storage = hs.get_storage() + self._spam_checker = hs.get_spam_checker() self._federation_event_handler = hs.get_federation_event_handler() self.state = hs.get_state_handler() self._event_auth_handler = hs.get_event_auth_handler() @@ -1019,6 +1020,12 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None: except SynapseError as e: raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id) + if await self._spam_checker.drop_federated_event(pdu): + logger.warning( + "Unstaged federated event contains spam, dropping %s", pdu.event_id + ) + return + # Add the event to our staging area await self.store.insert_received_event_to_staging(origin, pdu) @@ -1109,16 +1116,30 @@ async def _process_incoming_pdus_in_room_inner( (self._clock.time_msec() - received_ts) / 1000 ) - # We need to do this check outside the lock to avoid a race between - # a new event being inserted by another instance and it attempting - # to acquire the lock. - next = await self.store.get_next_staged_event_for_room( - room_id, room_version - ) - if not next: + while True: + # We need to do this check outside the lock to avoid a race between + # a new event being inserted by another instance and it attempting + # to acquire the lock. + next = await self.store.get_next_staged_event_for_room( + room_id, room_version + ) + + if next is None: + break + + origin, event = next + + if await self._spam_checker.drop_federated_event(event): + logger.warning( + "Staged federated event contains spam, dropping %s", + event.event_id, + ) + continue + break - origin, event = next + if not next: + break # Prune the event queue if it's getting large. # diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 73f92d2df8d6..993342e7aba1 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -47,6 +47,7 @@ CHECK_MEDIA_FILE_FOR_SPAM_CALLBACK, CHECK_REGISTRATION_FOR_SPAM_CALLBACK, CHECK_USERNAME_FOR_SPAM_CALLBACK, + DROP_FEDERATED_EVENT_CALLBACK, USER_MAY_CREATE_ROOM_ALIAS_CALLBACK, USER_MAY_CREATE_ROOM_CALLBACK, USER_MAY_INVITE_CALLBACK, @@ -234,6 +235,7 @@ def register_spam_checker_callbacks( self, *, check_event_for_spam: Optional[CHECK_EVENT_FOR_SPAM_CALLBACK] = None, + drop_federated_event: Optional[DROP_FEDERATED_EVENT_CALLBACK] = None, user_may_join_room: Optional[USER_MAY_JOIN_ROOM_CALLBACK] = None, user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None, user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None, @@ -254,6 +256,7 @@ def register_spam_checker_callbacks( """ return self._spam_checker.register_callbacks( check_event_for_spam=check_event_for_spam, + drop_federated_event=drop_federated_event, user_may_join_room=user_may_join_room, user_may_invite=user_may_invite, user_may_send_3pid_invite=user_may_send_3pid_invite, From e7c841c92fb761c463afbf4d83725252813e5513 Mon Sep 17 00:00:00 2001 From: jesopo Date: Mon, 16 May 2022 14:26:18 +0000 Subject: [PATCH 2/8] changelog.d/12744.feature Signed-off-by: jesopo --- changelog.d/12744.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12744.feature diff --git a/changelog.d/12744.feature b/changelog.d/12744.feature new file mode 100644 index 000000000000..9836d94f8ca6 --- /dev/null +++ b/changelog.d/12744.feature @@ -0,0 +1 @@ +Add a `drop_federated_event` callback to `SpamChecker` to disregard inbound federated events before they take up much processing power, in an emergency. From f2183dd8a1e02f351facda9ab7024b03bf6d1cf0 Mon Sep 17 00:00:00 2001 From: jesopo Date: Mon, 16 May 2022 14:59:40 +0000 Subject: [PATCH 3/8] update documentation for `reject_federated_spam_event` Signed-off-by: jesopo --- docs/modules/spam_checker_callbacks.md | 18 ++++++++++++++++++ docs/spam_checker.md | 5 +++++ 2 files changed, 23 insertions(+) diff --git a/docs/modules/spam_checker_callbacks.md b/docs/modules/spam_checker_callbacks.md index 472d95718087..0ea2121fa6fe 100644 --- a/docs/modules/spam_checker_callbacks.md +++ b/docs/modules/spam_checker_callbacks.md @@ -249,6 +249,24 @@ callback returns `False`, Synapse falls through to the next one. The value of th callback that does not return `False` will be used. If this happens, Synapse will not call any of the subsequent implementations of this callback. +### `drop_federated_event` + +_First introduced in Synapse v1.?.?_ + +```python +async def drop_federated_event(event: "synapse.events.EventBase") -> bool +``` + +Called when checking whether a remote server can federate an event with us. **Returning +`True` from this function will silently drop a federated event and split-brain our view +of a room's DAG, and thus you shouldn't use this callback unless you know what you are +doing.** + +If multiple modules implement this callback, they will be considered in order. If a +callback returns `False`, Synapse falls through to the next one. The value of the first +callback that does not return `False` will be used. If this happens, Synapse will not call +any of the subsequent implementations of this callback. + ## Example The example below is a module that implements the spam checker callback diff --git a/docs/spam_checker.md b/docs/spam_checker.md index 1b6d814937c2..ffaa5395eb6c 100644 --- a/docs/spam_checker.md +++ b/docs/spam_checker.md @@ -32,6 +32,7 @@ well as some specific methods: * `check_username_for_spam` * `check_registration_for_spam` * `check_media_file_for_spam` +* `drop_federated_event` The details of each of these methods (as well as their inputs and outputs) are documented in the `synapse.events.spamcheck.SpamChecker` class. @@ -86,6 +87,10 @@ class ExampleSpamChecker: async def check_media_file_for_spam(self, file_wrapper, file_info): return False # allow all media + + + async def drop_federated_event(self, foo): + return False # don't silently drop any inbound federated events ``` ## Configuration From 7b7d7133db3cb646bf70747b356e9463cb07876f Mon Sep 17 00:00:00 2001 From: jesopo Date: Mon, 16 May 2022 15:14:42 +0000 Subject: [PATCH 4/8] optimistically aim this change at v1.60.0 Signed-off-by: jesopo --- docs/modules/spam_checker_callbacks.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/spam_checker_callbacks.md b/docs/modules/spam_checker_callbacks.md index 0ea2121fa6fe..4b3c0655098e 100644 --- a/docs/modules/spam_checker_callbacks.md +++ b/docs/modules/spam_checker_callbacks.md @@ -251,7 +251,7 @@ any of the subsequent implementations of this callback. ### `drop_federated_event` -_First introduced in Synapse v1.?.?_ +_First introduced in Synapse v1.60.0_ ```python async def drop_federated_event(event: "synapse.events.EventBase") -> bool From a1489f527572cec5e258068b4ee1d983df540489 Mon Sep 17 00:00:00 2001 From: jesopo Date: Mon, 23 May 2022 14:02:12 +0000 Subject: [PATCH 5/8] 'drop_federated_event' -> 'should_drop_federated_event' Signed-off-by: jesopo --- docs/modules/spam_checker_callbacks.md | 4 ++-- docs/spam_checker.md | 4 ++-- synapse/events/spamcheck.py | 20 +++++++++++++------- synapse/federation/federation_server.py | 4 ++-- synapse/module_api/__init__.py | 8 +++++--- 5 files changed, 24 insertions(+), 16 deletions(-) diff --git a/docs/modules/spam_checker_callbacks.md b/docs/modules/spam_checker_callbacks.md index 4b3c0655098e..27c5a0ed5cfe 100644 --- a/docs/modules/spam_checker_callbacks.md +++ b/docs/modules/spam_checker_callbacks.md @@ -249,12 +249,12 @@ callback returns `False`, Synapse falls through to the next one. The value of th callback that does not return `False` will be used. If this happens, Synapse will not call any of the subsequent implementations of this callback. -### `drop_federated_event` +### `should_drop_federated_event` _First introduced in Synapse v1.60.0_ ```python -async def drop_federated_event(event: "synapse.events.EventBase") -> bool +async def should_drop_federated_event(event: "synapse.events.EventBase") -> bool ``` Called when checking whether a remote server can federate an event with us. **Returning diff --git a/docs/spam_checker.md b/docs/spam_checker.md index ffaa5395eb6c..df2d280f31d6 100644 --- a/docs/spam_checker.md +++ b/docs/spam_checker.md @@ -32,7 +32,7 @@ well as some specific methods: * `check_username_for_spam` * `check_registration_for_spam` * `check_media_file_for_spam` -* `drop_federated_event` +* `should_drop_federated_event` The details of each of these methods (as well as their inputs and outputs) are documented in the `synapse.events.spamcheck.SpamChecker` class. @@ -89,7 +89,7 @@ class ExampleSpamChecker: return False # allow all media - async def drop_federated_event(self, foo): + async def should_drop_federated_event(self, foo): return False # don't silently drop any inbound federated events ``` diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 1a241631520b..298465ac783f 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -44,7 +44,7 @@ ["synapse.events.EventBase"], Awaitable[Union[bool, str]], ] -DROP_FEDERATED_EVENT_CALLBACK = Callable[ +SHOULD_DROP_FEDERATED_EVENT_CALLBACK = Callable[ ["synapse.events.EventBase"], Awaitable[Union[bool, str]], ] @@ -173,7 +173,9 @@ def __init__(self, hs: "synapse.server.HomeServer") -> None: self.clock = hs.get_clock() self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = [] - self._drop_federated_event_callbacks: List[DROP_FEDERATED_EVENT_CALLBACK] = [] + self._should_drop_federated_event_callbacks: List[ + SHOULD_DROP_FEDERATED_EVENT_CALLBACK + ] = [] self._user_may_join_room_callbacks: List[USER_MAY_JOIN_ROOM_CALLBACK] = [] self._user_may_invite_callbacks: List[USER_MAY_INVITE_CALLBACK] = [] self._user_may_send_3pid_invite_callbacks: List[ @@ -197,7 +199,9 @@ def __init__(self, hs: "synapse.server.HomeServer") -> None: def register_callbacks( self, check_event_for_spam: Optional[CHECK_EVENT_FOR_SPAM_CALLBACK] = None, - drop_federated_event: Optional[DROP_FEDERATED_EVENT_CALLBACK] = None, + should_drop_federated_event: Optional[ + SHOULD_DROP_FEDERATED_EVENT_CALLBACK + ] = None, user_may_join_room: Optional[USER_MAY_JOIN_ROOM_CALLBACK] = None, user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None, user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None, @@ -216,8 +220,10 @@ def register_callbacks( if check_event_for_spam is not None: self._check_event_for_spam_callbacks.append(check_event_for_spam) - if drop_federated_event is not None: - self._drop_federated_event_callbacks.append(drop_federated_event) + if should_drop_federated_event is not None: + self._should_drop_federated_event_callbacks.append( + should_drop_federated_event + ) if user_may_join_room is not None: self._user_may_join_room_callbacks.append(user_may_join_room) @@ -278,7 +284,7 @@ async def check_event_for_spam( return False - async def drop_federated_event( + async def should_drop_federated_event( self, event: "synapse.events.EventBase" ) -> Union[bool, str]: """Checks if a given federated event is considered "spammy" by this @@ -293,7 +299,7 @@ async def drop_federated_event( Returns: True if the event should be silently dropped """ - for callback in self._drop_federated_event_callbacks: + for callback in self._should_drop_federated_event_callbacks: with Measure( self.clock, "{}.{}".format(callback.__module__, callback.__qualname__) ): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9a6b487f4ae4..8b9717459d55 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1020,7 +1020,7 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None: except SynapseError as e: raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id) - if await self._spam_checker.drop_federated_event(pdu): + if await self._spam_checker.should_drop_federated_event(pdu): logger.warning( "Unstaged federated event contains spam, dropping %s", pdu.event_id ) @@ -1129,7 +1129,7 @@ async def _process_incoming_pdus_in_room_inner( origin, event = next - if await self._spam_checker.drop_federated_event(event): + if await self._spam_checker.should_drop_federated_event(event): logger.warning( "Staged federated event contains spam, dropping %s", event.event_id, diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 993342e7aba1..c4f661bb9382 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -47,7 +47,7 @@ CHECK_MEDIA_FILE_FOR_SPAM_CALLBACK, CHECK_REGISTRATION_FOR_SPAM_CALLBACK, CHECK_USERNAME_FOR_SPAM_CALLBACK, - DROP_FEDERATED_EVENT_CALLBACK, + SHOULD_DROP_FEDERATED_EVENT_CALLBACK, USER_MAY_CREATE_ROOM_ALIAS_CALLBACK, USER_MAY_CREATE_ROOM_CALLBACK, USER_MAY_INVITE_CALLBACK, @@ -235,7 +235,9 @@ def register_spam_checker_callbacks( self, *, check_event_for_spam: Optional[CHECK_EVENT_FOR_SPAM_CALLBACK] = None, - drop_federated_event: Optional[DROP_FEDERATED_EVENT_CALLBACK] = None, + should_drop_federated_event: Optional[ + SHOULD_DROP_FEDERATED_EVENT_CALLBACK + ] = None, user_may_join_room: Optional[USER_MAY_JOIN_ROOM_CALLBACK] = None, user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None, user_may_send_3pid_invite: Optional[USER_MAY_SEND_3PID_INVITE_CALLBACK] = None, @@ -256,7 +258,7 @@ def register_spam_checker_callbacks( """ return self._spam_checker.register_callbacks( check_event_for_spam=check_event_for_spam, - drop_federated_event=drop_federated_event, + should_drop_federated_event=should_drop_federated_event, user_may_join_room=user_may_join_room, user_may_invite=user_may_invite, user_may_send_3pid_invite=user_may_send_3pid_invite, From 10ae9e3017e37c99aeae66193818e86398fe64d1 Mon Sep 17 00:00:00 2001 From: jesopo Date: Mon, 23 May 2022 14:02:24 +0000 Subject: [PATCH 6/8] don't pull in legacy spamchecker modules Signed-off-by: jesopo --- synapse/events/spamcheck.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 298465ac783f..61bcbe2abe60 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -97,7 +97,6 @@ def load_legacy_spam_checkers(hs: "synapse.server.HomeServer") -> None: # which name appears in this set, we'll want to register it. spam_checker_methods = { "check_event_for_spam", - "drop_federated_event", "user_may_invite", "user_may_create_room", "user_may_create_room_alias", From 0f8b253cf6ae4a3c81a89b99855376925c6bd2fc Mon Sep 17 00:00:00 2001 From: jesopo Date: Mon, 23 May 2022 15:41:39 +0000 Subject: [PATCH 7/8] move should_drop_federated_event while look to a helper function Signed-off-by: jesopo --- synapse/federation/federation_server.py | 53 +++++++++++++++---------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8b9717459d55..5262b511688f 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1039,6 +1039,33 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None: pdu.room_id, room_version, lock, origin, pdu ) + async def _get_next_valid_staged_event_for_room( + self, room_id: str, room_version: RoomVersion + ) -> Optional[Tuple[str, EventBase]]: + """Return the first non-spam event from staging queue.""" + + while True: + # We need to do this check outside the lock to avoid a race between + # a new event being inserted by another instance and it attempting + # to acquire the lock. + next = await self.store.get_next_staged_event_for_room( + room_id, room_version + ) + + if next is None: + return None + + origin, event = next + + if await self._spam_checker.should_drop_federated_event(event): + logger.warning( + "Staged federated event contains spam, dropping %s", + event.event_id, + ) + continue + + return next + @wrap_as_background_process("_process_incoming_pdus_in_room_inner") async def _process_incoming_pdus_in_room_inner( self, @@ -1116,31 +1143,15 @@ async def _process_incoming_pdus_in_room_inner( (self._clock.time_msec() - received_ts) / 1000 ) - while True: - # We need to do this check outside the lock to avoid a race between - # a new event being inserted by another instance and it attempting - # to acquire the lock. - next = await self.store.get_next_staged_event_for_room( - room_id, room_version - ) - - if next is None: - break - - origin, event = next - - if await self._spam_checker.should_drop_federated_event(event): - logger.warning( - "Staged federated event contains spam, dropping %s", - event.event_id, - ) - continue - - break + next = await self._get_next_valid_staged_event_for_room( + room_id, room_version + ) if not next: break + origin, event = next + # Prune the event queue if it's getting large. # # We do this *after* handling the first event as the common case is From a43ef53311cac4eca36cfcb8a9395bf98b2c2ac6 Mon Sep 17 00:00:00 2001 From: jesopo Date: Mon, 23 May 2022 16:05:19 +0000 Subject: [PATCH 8/8] response to feedback Signed-off-by: jesopo --- docs/spam_checker.md | 5 ----- synapse/federation/federation_server.py | 14 +++++++++++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/docs/spam_checker.md b/docs/spam_checker.md index df2d280f31d6..1b6d814937c2 100644 --- a/docs/spam_checker.md +++ b/docs/spam_checker.md @@ -32,7 +32,6 @@ well as some specific methods: * `check_username_for_spam` * `check_registration_for_spam` * `check_media_file_for_spam` -* `should_drop_federated_event` The details of each of these methods (as well as their inputs and outputs) are documented in the `synapse.events.spamcheck.SpamChecker` class. @@ -87,10 +86,6 @@ class ExampleSpamChecker: async def check_media_file_for_spam(self, file_wrapper, file_info): return False # allow all media - - - async def should_drop_federated_event(self, foo): - return False # don't silently drop any inbound federated events ``` ## Configuration diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5262b511688f..b8232e5257d2 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1039,10 +1039,18 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None: pdu.room_id, room_version, lock, origin, pdu ) - async def _get_next_valid_staged_event_for_room( + async def _get_next_nonspam_staged_event_for_room( self, room_id: str, room_version: RoomVersion ) -> Optional[Tuple[str, EventBase]]: - """Return the first non-spam event from staging queue.""" + """Fetch the first non-spam event from staging queue. + + Args: + room_id: the room to fetch the first non-spam event in. + room_version: the version of the room. + + Returns: + The first non-spam event in that room. + """ while True: # We need to do this check outside the lock to avoid a race between @@ -1143,7 +1151,7 @@ async def _process_incoming_pdus_in_room_inner( (self._clock.time_msec() - received_ts) / 1000 ) - next = await self._get_next_valid_staged_event_for_room( + next = await self._get_next_nonspam_staged_event_for_room( room_id, room_version )