Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't pull event from DB when handling replication traffic. #8669

Merged
merged 10 commits into from
Oct 28, 2020
1 change: 1 addition & 0 deletions changelog.d/8669.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Don't pull event from DB when handling replication traffic.
68 changes: 55 additions & 13 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Union,
)

import attr
from prometheus_client import Counter

from twisted.internet import defer
Expand Down Expand Up @@ -173,6 +174,17 @@ def __bool__(self):
return bool(self.events)


@attr.s(slots=True, frozen=True)
class _PendingRoomEventEntry:
event_pos = attr.ib(type=PersistedEventPosition)
extra_users = attr.ib(type=Collection[UserID])

room_id = attr.ib(type=str)
type = attr.ib(type=str)
state_key = attr.ib(type=Optional[str])
membership = attr.ib(type=Optional[str])


class Notifier:
""" This class is responsible for notifying any listeners when there are
new events available for it.
Expand All @@ -190,9 +202,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):
self.storage = hs.get_storage()
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = (
[]
) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
self.pending_new_room_events = [] # type: List[_PendingRoomEventEntry]

# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
Expand Down Expand Up @@ -255,7 +265,29 @@ def on_new_room_event(
max_room_stream_token: RoomStreamToken,
extra_users: Collection[UserID] = [],
):
""" Used by handlers to inform the notifier something has happened
"""Unrwaps event and calls `on_new_room_event_args`.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""
self.on_new_room_event_args(
event_pos=event_pos,
room_id=event.room_id,
event_type=event.type,
state_key=event.get("state_key"),
membership=event.get("membership"),
max_room_stream_token=max_room_stream_token,
extra_users=extra_users,
)

def on_new_room_event_args(
self,
room_id: str,
event_type: str,
state_key: Optional[str],
membership: Optional[str],
event_pos: PersistedEventPosition,
max_room_stream_token: RoomStreamToken,
extra_users: Collection[UserID] = [],
):
"""Used by handlers to inform the notifier something has happened
in the room, room event wise.

This triggers the notifier to wake up any listeners that are
Expand All @@ -266,7 +298,16 @@ def on_new_room_event(
until all previous events have been persisted before notifying
the client streams.
"""
self.pending_new_room_events.append((event_pos, event, extra_users))
self.pending_new_room_events.append(
_PendingRoomEventEntry(
event_pos=event_pos,
extra_users=extra_users,
room_id=room_id,
type=event_type,
state_key=state_key,
membership=membership,
)
)
self._notify_pending_new_room_events(max_room_stream_token)

self.notify_replication()
Expand All @@ -284,18 +325,19 @@ def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken
users = set() # type: Set[UserID]
rooms = set() # type: Set[str]

for event_pos, event, extra_users in pending:
if event_pos.persisted_after(max_room_stream_token):
self.pending_new_room_events.append((event_pos, event, extra_users))
for entry in pending:
if entry.event_pos.persisted_after(max_room_stream_token):
self.pending_new_room_events.append(entry)
else:
if (
event.type == EventTypes.Member
and event.membership == Membership.JOIN
entry.type == EventTypes.Member
and entry.membership == Membership.JOIN
and entry.state_key
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is checking the state key a bug fix or would we not have previously reached this otherwise?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event.state_key raises if its not a state event, so its kinda a bit of both

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so previously this would have bubbled into an exception, which was actually unwanted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

):
self._user_joined_room(event.state_key, event.room_id)
self._user_joined_room(entry.state_key, entry.room_id)

users.update(extra_users)
rooms.add(event.room_id)
users.update(entry.extra_users)
rooms.add(entry.room_id)

if users or rooms:
self.on_new_event(
Expand Down
21 changes: 11 additions & 10 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,22 @@ async def on_rdata(
if row.type != EventsStreamEventRow.TypeId:
continue
assert isinstance(row, EventsStreamRow)

event = await self.store.get_event(
row.data.event_id, allow_rejected=True
)
if event.rejected_reason:
continue
clokep marked this conversation as resolved.
Show resolved Hide resolved
assert isinstance(row.data, EventsStreamEventRow)

extra_users = () # type: Tuple[UserID, ...]
if event.type == EventTypes.Member:
extra_users = (UserID.from_string(event.state_key),)
if row.data.type == EventTypes.Member and row.data.state_key:
extra_users = (UserID.from_string(row.data.state_key),)

max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
self.notifier.on_new_room_event(
event, event_pos, max_token, extra_users
self.notifier.on_new_room_event_args(
event_pos=event_pos,
max_room_stream_token=max_token,
extra_users=extra_users,
room_id=row.data.room_id,
event_type=row.data.type,
state_key=row.data.state_key,
membership=row.data.membership,
)

# Notify any waiting deferreds. The list is ordered by position so we
Expand Down
21 changes: 13 additions & 8 deletions synapse/replication/tcp/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
# limitations under the License.
import heapq
from collections.abc import Iterable
from typing import List, Tuple, Type
from typing import TYPE_CHECKING, List, Optional, Tuple, Type

import attr

from ._base import Stream, StreamUpdateResult, Token

if TYPE_CHECKING:
from synapse.server import HomeServer

"""Handling of the 'events' replication stream

This stream contains rows of various types. Each row therefore contains a 'type'
Expand Down Expand Up @@ -81,12 +84,14 @@ def from_data(cls, data):
class EventsStreamEventRow(BaseEventsStreamRow):
TypeId = "ev"

event_id = attr.ib() # str
room_id = attr.ib() # str
type = attr.ib() # str
state_key = attr.ib() # str, optional
redacts = attr.ib() # str, optional
relates_to = attr.ib() # str, optional
event_id = attr.ib(type=str)
room_id = attr.ib(type=str)
type = attr.ib(type=str)
state_key = attr.ib(type=Optional[str])
redacts = attr.ib(type=Optional[str])
relates_to = attr.ib(type=Optional[str])
membership = attr.ib(type=Optional[str])
rejected = attr.ib(type=bool)


@attr.s(slots=True, frozen=True)
Expand All @@ -113,7 +118,7 @@ class EventsStream(Stream):

NAME = "events"

def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
Expand Down
8 changes: 6 additions & 2 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,11 +1065,13 @@ async def get_all_new_forward_event_rows(
def get_all_new_forward_event_rows(txn):
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
" state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
Expand Down Expand Up @@ -1100,12 +1102,14 @@ async def get_ex_outlier_stream_rows(
def get_ex_outlier_stream_rows_txn(txn):
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
" state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
Expand Down