Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clean-up distributor code #8216

Merged
merged 8 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8216.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Simplify the distributor code to avoid unnecessary work.
9 changes: 0 additions & 9 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ class EventStreamHandler(BaseHandler):
def __init__(self, hs):
super(EventStreamHandler, self).__init__(hs)

# Count of active streams per user
self._streams_per_user = {}
# Grace timers per user to delay the "stopped" signal
self._stop_timer_per_user = {}

self.distributor = hs.get_distributor()
self.distributor.declare("started_user_eventstream")
self.distributor.declare("stopped_user_eventstream")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These variables were referenced anywhere else.


self.clock = hs.get_clock()

self.notifier = hs.get_notifier()
Expand Down
43 changes: 1 addition & 42 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
Expand All @@ -80,7 +79,6 @@
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
Expand Down Expand Up @@ -141,9 +139,6 @@ def __init__(self, hs):
self._replication = hs.get_replication_data_handler()

self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
hs
)
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
)
Expand Down Expand Up @@ -704,31 +699,10 @@ async def _process_received_pdu(
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)

try:
context = await self._handle_new_event(origin, event, state=state)
await self._handle_new_event(origin, event, state=state)
except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)

if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True

prev_state_ids = await context.get_prev_state_ids()

prev_state_id = prev_state_ids.get((event.type, event.state_key))
if prev_state_id:
prev_state = await self.store.get_event(
prev_state_id, allow_none=True
)
if prev_state and prev_state.membership == Membership.JOIN:
newly_joined = False

if newly_joined:
user = UserID.from_string(event.state_key)
await self.user_joined_room(user, room_id)

# For encrypted messages we check that we know about the sending device,
# if we don't then we mark the device cache for that user as stale.
if event.type == EventTypes.Encrypted:
Expand Down Expand Up @@ -1551,11 +1525,6 @@ async def on_send_join_request(self, origin, pdu):
event.signatures,
)

if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
await self.user_joined_room(user, event.room_id)

prev_state_ids = await context.get_prev_state_ids()

state_ids = list(prev_state_ids.values())
Expand Down Expand Up @@ -3000,16 +2969,6 @@ async def _clean_room_for_join(self, room_id: str) -> None:
else:
await self.store.clean_room_for_join(room_id)

async def user_joined_room(self, user: UserID, room_id: str) -> None:
"""Called when a new user has joined the room
"""
if self.config.worker_app:
await self._notify_user_membership_change(
room_id=room_id, user_id=user.to_string(), change="joined"
)
else:
user_joined_room(self.distributor, user, room_id)

async def get_room_complexity(
self, remote_room_hosts: List[str], room_id: str
) -> Optional[dict]:
Expand Down
42 changes: 4 additions & 38 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
from synapse.util.distributor import user_left_room

from ._base import BaseHandler

Expand Down Expand Up @@ -141,17 +141,6 @@ async def remote_reject_invite(
"""
raise NotImplementedError()

@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
room.

Args:
target
room_id
"""
raise NotImplementedError()

@abc.abstractmethod
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has left the
Expand Down Expand Up @@ -214,7 +203,6 @@ async def _local_membership_update(

prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)

newly_joined = False
if event.membership == Membership.JOIN:
newly_joined = True
if prev_member_event_id:
Expand All @@ -239,12 +227,7 @@ async def _local_membership_update(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)

if event.membership == Membership.JOIN and newly_joined:
# Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
await self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE:
if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
Expand Down Expand Up @@ -719,17 +702,7 @@ async def send_membership_event(
(EventTypes.Member, event.state_key), None
)

if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
newly_joined = True
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
await self._user_joined_room(target_user, room_id)
elif event.membership == Membership.LEAVE:
if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
Expand Down Expand Up @@ -995,10 +968,9 @@ async def _is_server_notice_room(self, room_id: str) -> bool:

class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
super(RoomMemberMasterHandler, self).__init__(hs)
super().__init__(hs)

self.distributor = hs.get_distributor()
self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room")

async def _is_remote_room_too_complex(
Expand Down Expand Up @@ -1078,7 +1050,6 @@ async def _remote_join(
event_id, stream_id = await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user.to_string(), content
)
await self._user_joined_room(user, room_id)

# Check the room we just joined wasn't too large, if we didn't fetch the
# complexity of it before.
Expand Down Expand Up @@ -1221,11 +1192,6 @@ async def _locally_reject_invite(
)
return event.event_id, stream_id

async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
"""
user_joined_room(self.distributor, target, room_id)

async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
Expand Down
9 changes: 0 additions & 9 deletions synapse/handlers/room_member_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ async def _remote_join(
content=content,
)

await self._user_joined_room(user, room_id)

return ret["event_id"], ret["stream_id"]

async def remote_reject_invite(
Expand All @@ -81,13 +79,6 @@ async def remote_reject_invite(
)
return ret["event_id"], ret["stream_id"]

async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
"""
await self._notify_change_client(
user_id=target.to_string(), room_id=room_id, change="joined"
)

async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
Expand Down
10 changes: 4 additions & 6 deletions synapse/replication/http/membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict, Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room
from synapse.util.distributor import user_left_room

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -181,9 +181,9 @@ async def _serialize_payload(room_id, user_id, change):
Args:
room_id (str)
user_id (str)
change (str): Either "joined" or "left"
change (str): "left"
"""
assert change in ("joined", "left")
assert change == "left"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to update ReplicationUserJoinedLeftRoomRestServlet to be ReplicationUserLeftRoomRestServlet?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shrugface. Maybe. Probably better to do some more complete refactoring to sort out the user_left_room case.


return {}

Expand All @@ -192,9 +192,7 @@ def _handle_request(self, request, room_id, user_id, change):

user = UserID.from_string(user_id)

if change == "joined":
user_joined_room(self.distributor, user, room_id)
elif change == "left":
if change == "left":
user_left_room(self.distributor, user, room_id)
else:
raise Exception("Unrecognized change: %r", change)
Expand Down
65 changes: 12 additions & 53 deletions synapse/util/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
import inspect
import logging

from twisted.internet import defer
from twisted.internet.defer import Deferred, fail, succeed
from twisted.python import failure

from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process

logger = logging.getLogger(__name__)
Expand All @@ -29,11 +25,6 @@ def user_left_room(distributor, user, room_id):
distributor.fire("user_left_room", user=user, room_id=room_id)


# XXX: this is no longer used. We should probably kill it.
def user_joined_room(distributor, user, room_id):
distributor.fire("user_joined_room", user=user, room_id=room_id)


class Distributor(object):
"""A central dispatch point for loosely-connected pieces of code to
register, observe, and fire signals.
Expand Down Expand Up @@ -81,28 +72,6 @@ def fire(self, name, *args, **kwargs):
run_as_background_process(name, self.signals[name].fire, *args, **kwargs)


def maybeAwaitableDeferred(f, *args, **kw):
"""
Invoke a function that may or may not return a Deferred or an Awaitable.

This is a modified version of twisted.internet.defer.maybeDeferred.
"""
try:
result = f(*args, **kw)
except Exception:
return fail(failure.Failure(captureVars=Deferred.debug))

if isinstance(result, Deferred):
return result
# Handle the additional case of an awaitable being returned.
elif inspect.isawaitable(result):
return defer.ensureDeferred(result)
elif isinstance(result, failure.Failure):
return fail(result)
else:
return succeed(result)


class Signal(object):
"""A Signal is a dispatch point that stores a list of callables as
observers of it.
Expand All @@ -128,31 +97,21 @@ def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
not an error to fire a signal with no observers.
"""

Returns a Deferred that will complete when all the observers have
completed."""

def do(observer):
def eb(failure):
async def do(observer):
try:
result = observer(*args, **kwargs)
if inspect.isawaitable(result):
result = await result
return result
except Exception as e:
logger.warning(
"%s signal observer %s failed: %r",
self.name,
observer,
failure,
exc_info=(
failure.type,
failure.value,
failure.getTracebackObject(),
),
"%s signal observer %s failed: %r", self.name, observer, e,
)

return maybeAwaitableDeferred(observer, *args, **kwargs).addErrback(eb)

deferreds = [run_in_background(do, o) for o in self.observers]

return make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
for observer in self.observers:
run_in_background(do, observer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is changing the semantics so that we no longer wait for the observers to complete. Is that deliberate? what is the logic behind it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning behind this nothing uses the Deferred returned by Signal.fire(). gatherResults and make_deferred_yieldable don't seem to actually "wait" since the caller doesn't wait on the result.

I can back out 6c27891 if you'd like.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/matrix-org/synapse/pull/8216/files/b400db3b5bd2d365d4e68b73f7d3f2a0530dd8a7#diff-755d6db2f812691f4b7ea7fd62894893R72 calls Signal.fire (and uses the deferred, via run_as_background_process) ?

I can back out 6c27891 if you'd like.

I think that would be correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good call. 👍 I guess I was looking at places that Distributor.fire is called. 😢

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted in 41ced4c


def __repr__(self):
return "<Signal name=%r>" % (self.name,)