Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Batch up notifications after event persistence #14033

Merged
merged 7 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14033.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Don't repeatedly wake up the same users for batched events.
4 changes: 2 additions & 2 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2240,8 +2240,8 @@ async def _notify_persisted_event(
event_pos = PersistedEventPosition(
self._instance_name, event.internal_metadata.stream_ordering
)
await self._notifier.on_new_room_event(
event, event_pos, max_stream_token, extra_users=extra_users
await self._notifier.on_new_room_events(
[(event, event_pos)], max_stream_token, extra_users=extra_users
)

if event.type == EventTypes.Member and event.membership == Membership.JOIN:
Expand Down
25 changes: 12 additions & 13 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1872,6 +1872,7 @@ async def persist_and_notify_client_events(
events_and_context, backfilled=backfilled
)

events_and_pos = []
for event in persisted_events:
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
Expand All @@ -1880,25 +1881,23 @@ async def persist_and_notify_client_events(
stream_ordering = event.internal_metadata.stream_ordering
assert stream_ordering is not None
pos = PersistedEventPosition(self._instance_name, stream_ordering)

async def _notify() -> None:
try:
await self.notifier.on_new_room_event(
event, pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception(
"Error notifying about new room event %s",
event.event_id,
)

run_in_background(_notify)
events_and_pos.append((event, pos))

if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)

async def _notify() -> None:
try:
await self.notifier.on_new_room_events(
events_and_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room events")

run_in_background(_notify)

return persisted_events[-1]

async def _maybe_kick_guest_users(
Expand Down
75 changes: 41 additions & 34 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,35 +294,31 @@ def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
"""
self._new_join_in_room_callbacks.append(cb)

async def on_new_room_event(
async def on_new_room_events(
self,
event: EventBase,
event_pos: PersistedEventPosition,
events_and_pos: List[Tuple[EventBase, PersistedEventPosition]],
max_room_stream_token: RoomStreamToken,
extra_users: Optional[Collection[UserID]] = None,
) -> None:
"""Unwraps event and calls `on_new_room_event_args`."""
await self.on_new_room_event_args(
event_pos=event_pos,
room_id=event.room_id,
event_id=event.event_id,
event_type=event.type,
state_key=event.get("state_key"),
membership=event.content.get("membership"),
max_room_stream_token=max_room_stream_token,
extra_users=extra_users or [],
)
"""Creates a _PendingRoomEventEntry for each of the listed events and calls
notify_new_room_events with the results."""
event_entries = []
for event, pos in events_and_pos:
entry = self.create_pending_room_event_entry(
pos,
extra_users,
event.room_id,
event.type,
event.get("state_key"),
event.content.get("membership"),
)
event_entries.append((entry, event.event_id))
await self.notify_new_room_events(event_entries, max_room_stream_token)

async def on_new_room_event_args(
async def notify_new_room_events(
self,
room_id: str,
event_id: str,
event_type: str,
state_key: Optional[str],
membership: Optional[str],
event_pos: PersistedEventPosition,
event_entries: List[Tuple[_PendingRoomEventEntry, str]],
max_room_stream_token: RoomStreamToken,
extra_users: Optional[Collection[UserID]] = None,
) -> None:
"""Used by handlers to inform the notifier something has happened
in the room, room event wise.
Expand All @@ -338,22 +334,33 @@ async def on_new_room_event_args(
until all previous events have been persisted before notifying
the client streams.
"""
self.pending_new_room_events.append(
_PendingRoomEventEntry(
event_pos=event_pos,
extra_users=extra_users or [],
room_id=room_id,
type=event_type,
state_key=state_key,
membership=membership,
)
)
self._notify_pending_new_room_events(max_room_stream_token)
for event_entry, event_id in event_entries:
self.pending_new_room_events.append(event_entry)
await self._third_party_rules.on_new_event(event_id)

await self._third_party_rules.on_new_event(event_id)
self._notify_pending_new_room_events(max_room_stream_token)

self.notify_replication()

def create_pending_room_event_entry(
self,
event_pos: PersistedEventPosition,
extra_users: Optional[Collection[UserID]],
room_id: str,
event_type: str,
state_key: Optional[str],
membership: Optional[str],
) -> _PendingRoomEventEntry:
"""Creates and returns a _PendingRoomEventEntry"""
return _PendingRoomEventEntry(
event_pos=event_pos,
extra_users=extra_users or [],
room_id=room_id,
type=event_type,
state_key=state_key,
membership=membership,
)

def _notify_pending_new_room_events(
self, max_room_stream_token: RoomStreamToken
) -> None:
Expand Down
19 changes: 10 additions & 9 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,16 @@ async def on_rdata(

max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
await self.notifier.on_new_room_event_args(
event_pos=event_pos,
max_room_stream_token=max_token,
extra_users=extra_users,
room_id=row.data.room_id,
event_id=row.data.event_id,
event_type=row.data.type,
state_key=row.data.state_key,
membership=row.data.membership,
event_entry = self.notifier.create_pending_room_event_entry(
event_pos,
extra_users,
row.data.room_id,
row.data.type,
row.data.state_key,
row.data.membership,
)
await self.notifier.notify_new_room_events(
[(event_entry, row.data.event_id)], max_token
)

# If this event is a join, make a note of it so we have an accurate
Expand Down