From 1764f2585cba79985f6182c0a2048fdd82903d6b Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 3 Oct 2022 10:45:29 -0700 Subject: [PATCH 1/7] add function to create protected _PendingRoomEventEntry --- synapse/notifier.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/synapse/notifier.py b/synapse/notifier.py index c42bb8266add..3a7b26abc3e9 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -354,6 +354,25 @@ async def on_new_room_event_args( self.notify_replication() + def create_pending_room_event_entry( + self, + event_pos: PersistedEventPosition, + extra_users: Optional[Collection[UserID]], + room_id: str, + event_type: str, + state_key: Optional[str], + membership: Optional[str], + ) -> _PendingRoomEventEntry: + """Creates and returns a _PendingRoomEventEntry""" + return _PendingRoomEventEntry( + event_pos=event_pos, + extra_users=extra_users or [], + room_id=room_id, + type=event_type, + state_key=state_key, + membership=membership, + ) + def _notify_pending_new_room_events( self, max_room_stream_token: RoomStreamToken ) -> None: From 36ce06e4e8e769a4fbf68cad435e4f6ee3105cf8 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 3 Oct 2022 10:49:18 -0700 Subject: [PATCH 2/7] rewrite function to take list of events/event positions --- synapse/notifier.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 3a7b26abc3e9..e8adee1cf8cb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -294,24 +294,26 @@ def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None: """ self._new_join_in_room_callbacks.append(cb) - async def on_new_room_event( + async def on_new_room_events( self, - event: EventBase, - event_pos: PersistedEventPosition, + events_and_pos: List[Tuple[EventBase, PersistedEventPosition]], max_room_stream_token: RoomStreamToken, extra_users: Optional[Collection[UserID]] = None, ) -> None: - """Unwraps event and calls `on_new_room_event_args`.""" - await self.on_new_room_event_args( - event_pos=event_pos, - room_id=event.room_id, - event_id=event.event_id, - event_type=event.type, - state_key=event.get("state_key"), - membership=event.content.get("membership"), - max_room_stream_token=max_room_stream_token, - extra_users=extra_users or [], - ) + """Creates a _PendingRoomEventEntry for each of the listed events and calls + notify_new_room_events with the results.""" + event_entries = [] + for event, pos in events_and_pos: + entry = self.create_pending_room_event_entry( + pos, + extra_users, + event.room_id, + event.type, + event.get("state_key"), + event.content.get("membership"), + ) + event_entries.append((entry, event.event_id)) + await self.notify_new_room_events(event_entries, max_room_stream_token) async def on_new_room_event_args( self, From 6ddf50bd42572ba7cbf343592850dac0370c7aca Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 3 Oct 2022 10:49:55 -0700 Subject: [PATCH 3/7] rewrite function to take list of pending room event entries --- synapse/notifier.py | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index e8adee1cf8cb..26b97cf766c3 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -315,16 +315,10 @@ async def on_new_room_events( event_entries.append((entry, event.event_id)) await self.notify_new_room_events(event_entries, max_room_stream_token) - async def on_new_room_event_args( + async def notify_new_room_events( self, - room_id: str, - event_id: str, - event_type: str, - state_key: Optional[str], - membership: Optional[str], - event_pos: PersistedEventPosition, + event_entries: List[Tuple[_PendingRoomEventEntry, str]], max_room_stream_token: RoomStreamToken, - extra_users: Optional[Collection[UserID]] = None, ) -> None: """Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -340,19 +334,11 @@ async def on_new_room_event_args( until all previous events have been persisted before notifying the client streams. """ - self.pending_new_room_events.append( - _PendingRoomEventEntry( - event_pos=event_pos, - extra_users=extra_users or [], - room_id=room_id, - type=event_type, - state_key=state_key, - membership=membership, - ) - ) - self._notify_pending_new_room_events(max_room_stream_token) + for event_entry, event_id in event_entries: + self.pending_new_room_events.append(event_entry) + await self._third_party_rules.on_new_event(event_id) - await self._third_party_rules.on_new_event(event_id) + self._notify_pending_new_room_events(max_room_stream_token) self.notify_replication() From 30ac62dc4ff47c9c86254c79bec7406d07842f46 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 3 Oct 2022 10:50:38 -0700 Subject: [PATCH 4/7] create a pending room event entry from row data and call notify_new_room_events with it --- synapse/replication/tcp/client.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index b2522f98cade..18252a2958dc 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -210,15 +210,16 @@ async def on_rdata( max_token = self.store.get_room_max_token() event_pos = PersistedEventPosition(instance_name, token) - await self.notifier.on_new_room_event_args( - event_pos=event_pos, - max_room_stream_token=max_token, - extra_users=extra_users, - room_id=row.data.room_id, - event_id=row.data.event_id, - event_type=row.data.type, - state_key=row.data.state_key, - membership=row.data.membership, + event_entry = self.notifier.create_pending_room_event_entry( + event_pos, + extra_users, + row.data.room_id, + row.data.type, + row.data.state_key, + row.data.membership, + ) + await self.notifier.notify_new_room_events( + [(event_entry, row.data.event_id)], max_token ) # If this event is a join, make a note of it so we have an accurate From b4f247719ebfd68ecba0549efca7294e9aa5b7a0 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 3 Oct 2022 10:51:54 -0700 Subject: [PATCH 5/7] rewrite notification flow after event persistence to use on_new_room_events --- synapse/handlers/message.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 00e7645ba5cc..da1acea2755b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1872,6 +1872,7 @@ async def persist_and_notify_client_events( events_and_context, backfilled=backfilled ) + events_and_pos = [] for event in persisted_events: if self._ephemeral_events_enabled: # If there's an expiry timestamp on the event, schedule its expiry. @@ -1880,25 +1881,23 @@ async def persist_and_notify_client_events( stream_ordering = event.internal_metadata.stream_ordering assert stream_ordering is not None pos = PersistedEventPosition(self._instance_name, stream_ordering) - - async def _notify() -> None: - try: - await self.notifier.on_new_room_event( - event, pos, max_stream_token, extra_users=extra_users - ) - except Exception: - logger.exception( - "Error notifying about new room event %s", - event.event_id, - ) - - run_in_background(_notify) + events_and_pos.append((event, pos)) if event.type == EventTypes.Message: # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. run_in_background(self._bump_active_time, requester.user) + async def _notify() -> None: + try: + await self.notifier.on_new_room_events( + events_and_pos, max_stream_token, extra_users=extra_users + ) + except Exception: + logger.exception("Error notifying about new room events") + + run_in_background(_notify) + return persisted_events[-1] async def _maybe_kick_guest_users( From b82c846f684bf8c2a4083449c80b5a5c627d0611 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 3 Oct 2022 10:53:37 -0700 Subject: [PATCH 6/7] update callsite to use on_new_room_events rather than on_new_room_event --- synapse/handlers/federation_event.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 778d8869b3c7..da319943cc19 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -2240,8 +2240,8 @@ async def _notify_persisted_event( event_pos = PersistedEventPosition( self._instance_name, event.internal_metadata.stream_ordering ) - await self._notifier.on_new_room_event( - event, event_pos, max_stream_token, extra_users=extra_users + await self._notifier.on_new_room_events( + [(event, event_pos)], max_stream_token, extra_users=extra_users ) if event.type == EventTypes.Member and event.membership == Membership.JOIN: From c49da8e795568fdaa0a11c04c7ebf1354f7832a2 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 3 Oct 2022 11:11:50 -0700 Subject: [PATCH 7/7] newsfragement --- changelog.d/14033.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14033.misc diff --git a/changelog.d/14033.misc b/changelog.d/14033.misc new file mode 100644 index 000000000000..fe42852aa57c --- /dev/null +++ b/changelog.d/14033.misc @@ -0,0 +1 @@ +Don't repeatedly wake up the same users for batched events. \ No newline at end of file