From 2efa64c4891cc13f0208d2f1a2af63f3d7d5618f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Apr 2020 16:50:09 +0100 Subject: [PATCH 1/8] Remove as_event from PresenceHandler.get_state(s) methods which return different things depending on their arguments are bad karma. --- synapse/api/constants.py | 2 ++ synapse/handlers/events.py | 20 +++++++++++-------- synapse/handlers/initial_sync.py | 10 ++++++++-- synapse/handlers/presence.py | 34 +++++++++----------------------- 4 files changed, 31 insertions(+), 35 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index fda2c2e5bbf8..bcaf2c3600e4 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -97,6 +97,8 @@ class EventTypes(object): Retention = "m.room.retention" + Presence = "m.presence" + class RejectedReason(object): AUTH_ERROR = "auth_error" diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index ec18a42a68b8..71a89f09c765 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -19,6 +19,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase +from synapse.handlers.presence import format_user_presence_state from synapse.logging.utils import log_function from synapse.types import UserID from synapse.visibility import filter_events_for_client @@ -97,6 +98,8 @@ async def get_stream( explicit_room_id=room_id, ) + time_now = self.clock.time_msec() + # When the user joins a new room, or another user joins a currently # joined room, we need to send down presence for those users. to_add = [] @@ -112,19 +115,20 @@ async def get_stream( users = await self.state.get_current_users_in_room( event.room_id ) - states = await presence_handler.get_states(users, as_event=True) - to_add.extend(states) else: + users = [event.state_key] - ev = await presence_handler.get_state( - UserID.from_string(event.state_key), as_event=True - ) - to_add.append(ev) + states = await presence_handler.get_states(users) + to_add.extend( + { + "type": EventTypes.Presence, + "content": format_user_presence_state(state, time_now), + } + for state in states + ) events.extend(to_add) - time_now = self.clock.time_msec() - chunks = await self._event_serializer.serialize_events( events, time_now, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index b116500c7dc8..f88bad5f2512 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -381,10 +381,16 @@ async def get_presence(): return [] states = await presence_handler.get_states( - [m.user_id for m in room_members], as_event=True + [m.user_id for m in room_members] ) - return states + return [ + { + "type": EventTypes.Presence, + "content": format_user_presence_state(s, time_now), + } + for s in states + ] async def get_receipts(): receipts = await self.store.get_linearized_receipts_for_room( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6912165622ec..04b82782b910 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,7 +25,7 @@ import logging from contextlib import contextmanager -from typing import Dict, List, Set +from typing import Dict, Iterable, List, Set from six import iteritems, itervalues @@ -669,21 +670,14 @@ async def incoming_presence(self, origin, content): federation_presence_counter.inc(len(updates)) await self._update_states(updates) - async def get_state(self, target_user, as_event=False): - results = await self.get_states([target_user.to_string()], as_event=as_event) - + async def get_state(self, target_user: UserID) -> UserPresenceState: + results = await self.get_states([target_user.to_string()]) return results[0] - async def get_states(self, target_user_ids, as_event=False): - """Get the presence state for users. - - Args: - target_user_ids (list) - as_event (bool): Whether to format it as a client event or not. - - Returns: - list - """ + async def get_states( + self, target_user_ids: Iterable[str] + ) -> List[UserPresenceState]: + """Get the presence state for users.""" updates = await self.current_state_for_users(target_user_ids) updates = list(updates.values()) @@ -691,17 +685,7 @@ async def get_states(self, target_user_ids, as_event=False): for user_id in set(target_user_ids) - {u.user_id for u in updates}: updates.append(UserPresenceState.default(user_id)) - now = self.clock.time_msec() - if as_event: - return [ - { - "type": "m.presence", - "content": format_user_presence_state(state, now), - } - for state in updates - ] - else: - return updates + return updates async def set_state(self, target_user, state, ignore_status_msg=False): """Set the presence state of the user. From c1029c8bfa13a8fcbcaab93bffa7f02f9ec928be Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Apr 2020 15:35:39 +0100 Subject: [PATCH 2/8] Implement an AbstractPresenceHandler ... to make it clearer what is part of the interface, and what is not. --- synapse/app/generic_worker.py | 8 +++-- synapse/handlers/presence.py | 68 +++++++++++++++++++++++++++++++++-- synapse/server.pyi | 4 ++- 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index dcd0709a02ea..05a4ce23ee86 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -38,7 +38,11 @@ from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer -from synapse.handlers.presence import PresenceHandler, get_interested_parties +from synapse.handlers.presence import ( + AbstractPresenceHandler, + PresenceHandler, + get_interested_parties, +) from synapse.http.server import JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite @@ -224,7 +228,7 @@ async def on_POST(self, request, device_id): UPDATE_SYNCING_USERS_MS = 10 * 1000 -class GenericWorkerPresence(object): +class GenericWorkerPresence(AbstractPresenceHandler): def __init__(self, hs): self.hs = hs self.is_mine_id = hs.is_mine_id diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 04b82782b910..7812cd100ccb 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -22,7 +22,7 @@ - PresenceHandler._handle_timeouts - should_notify """ - +import abc import logging from contextlib import contextmanager from typing import Dict, Iterable, List, Set @@ -42,7 +42,7 @@ from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.presence import UserPresenceState -from synapse.types import UserID, get_domain_from_id +from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import cached from synapse.util.metrics import Measure @@ -100,7 +100,69 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER -class PresenceHandler(object): +class AbstractPresenceHandler(abc.ABC): + """The base interface for things implementing PresenceHandler""" + + @abc.abstractmethod + async def user_syncing( + self, user_id: str, affect_presence: bool = True + ) -> ContextManager[None]: + """Returns a context manager that should surround any stream requests + from the user. + + This allows us to keep track of who is currently streaming and who isn't + without having to have timers outside of this module to avoid flickering + when users disconnect/reconnect. + + Args: + user_id (str) + affect_presence (bool): If false this function will be a no-op. + Useful for streams that are not associated with an actual + client that is being used by a user. + """ + + @abc.abstractmethod + def get_currently_syncing_users(self) -> Set[str]: + """Get the set of user ids that are currently syncing on this HS. + Returns: + set(str): A set of user_id strings. + """ + + @abc.abstractmethod + async def current_state_for_users( + self, user_ids: Iterable[str] + ) -> Dict[str, UserPresenceState]: + """Get the current presence state for multiple users. + + Returns: + dict: `user_id` -> `UserPresenceState` + """ + + @abc.abstractmethod + async def get_state(self, target_user: UserID) -> UserPresenceState: + ... + + @abc.abstractmethod + async def get_states( + self, target_user_ids: Iterable[str] + ) -> List[UserPresenceState]: + """Get the presence state for users. + + Args: + target_user_ids (list) + + Returns: + list + """ + + @abc.abstractmethod + async def set_state( + self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False + ) -> None: + """Set the presence state of the user. """ + + +class PresenceHandler(AbstractPresenceHandler): def __init__(self, hs: "synapse.server.HomeServer"): self.hs = hs self.is_mine_id = hs.is_mine_id diff --git a/synapse/server.pyi b/synapse/server.pyi index 9013e9bac9dd..d539d3c3ec55 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -97,7 +97,9 @@ class HomeServer(object): pass def get_notifier(self) -> synapse.notifier.Notifier: pass - def get_presence_handler(self) -> synapse.handlers.presence.PresenceHandler: + def get_presence_handler( + self, + ) -> synapse.handlers.presence.AbstractPresenceHandler: pass def get_clock(self) -> synapse.util.Clock: pass From 27ccbc17091924e312f5a3a2f51161f415a7d8f4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Apr 2020 16:09:42 +0100 Subject: [PATCH 3/8] Factor out BasePresenceHandler Define a common base class for GenericWorkerPresence and PresenceHandler. --- synapse/app/generic_worker.py | 21 ++---- synapse/handlers/presence.py | 126 ++++++++++++++-------------------- synapse/server.pyi | 4 +- 3 files changed, 56 insertions(+), 95 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 05a4ce23ee86..e546e88a20b6 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -17,6 +17,7 @@ import contextlib import logging import sys +from typing import Iterable from twisted.internet import defer, reactor from twisted.web.resource import NoResource @@ -38,18 +39,14 @@ from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer -from synapse.handlers.presence import ( - AbstractPresenceHandler, - PresenceHandler, - get_interested_parties, -) +from synapse.handlers.presence import BasePresenceHandler, get_interested_parties from synapse.http.server import JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.replication.slave.storage._base import BaseSlavedStore, __func__ +from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -228,20 +225,16 @@ async def on_POST(self, request, device_id): UPDATE_SYNCING_USERS_MS = 10 * 1000 -class GenericWorkerPresence(AbstractPresenceHandler): +class GenericWorkerPresence(BasePresenceHandler): def __init__(self, hs): + super().__init__(hs) self.hs = hs self.is_mine_id = hs.is_mine_id self.http_client = hs.get_simple_http_client() - self.store = hs.get_datastore() self.user_to_num_current_syncs = {} - self.clock = hs.get_clock() self.notifier = hs.get_notifier() self.instance_id = hs.get_instance_id() - active_presence = self.store.take_presence_startup_info() - self.user_to_current_state = {state.user_id: state for state in active_presence} - # user_id -> last_sync_ms. Lists the users that have stopped syncing # but we haven't notified the master of that yet self.users_going_offline = {} @@ -307,10 +300,6 @@ def set_state(self, user, state, ignore_status_msg=False): # TODO Hows this supposed to work? return defer.succeed(None) - get_states = __func__(PresenceHandler.get_states) - get_state = __func__(PresenceHandler.get_state) - current_state_for_users = __func__(PresenceHandler.current_state_for_users) - def user_syncing(self, user_id, affect_presence): if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7812cd100ccb..7fa38602cf71 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -100,12 +100,19 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER -class AbstractPresenceHandler(abc.ABC): - """The base interface for things implementing PresenceHandler""" +class BasePresenceHandler(abc.ABC): + """Parts of the PresenceHandler that are shared between workers and master""" + + def __init__(self, hs: "synapse.server.HomeServer"): + self.clock = hs.get_clock() + self.store = hs.get_datastore() + + active_presence = self.store.take_presence_startup_info() + self.user_to_current_state = {state.user_id: state for state in active_presence} @abc.abstractmethod async def user_syncing( - self, user_id: str, affect_presence: bool = True + self, user_id: str, affect_presence: bool ) -> ContextManager[None]: """Returns a context manager that should surround any stream requests from the user. @@ -128,7 +135,23 @@ def get_currently_syncing_users(self) -> Set[str]: set(str): A set of user_id strings. """ - @abc.abstractmethod + async def get_state(self, target_user: UserID) -> UserPresenceState: + results = await self.get_states([target_user.to_string()]) + return results[0] + + async def get_states( + self, target_user_ids: Iterable[str] + ) -> List[UserPresenceState]: + """Get the presence state for users.""" + + updates_d = await self.current_state_for_users(target_user_ids) + updates = list(updates_d.values()) + + for user_id in set(target_user_ids) - {u.user_id for u in updates}: + updates.append(UserPresenceState.default(user_id)) + + return updates + async def current_state_for_users( self, user_ids: Iterable[str] ) -> Dict[str, UserPresenceState]: @@ -137,23 +160,27 @@ async def current_state_for_users( Returns: dict: `user_id` -> `UserPresenceState` """ + states = { + user_id: self.user_to_current_state.get(user_id, None) + for user_id in user_ids + } - @abc.abstractmethod - async def get_state(self, target_user: UserID) -> UserPresenceState: - ... - - @abc.abstractmethod - async def get_states( - self, target_user_ids: Iterable[str] - ) -> List[UserPresenceState]: - """Get the presence state for users. + missing = [user_id for user_id, state in iteritems(states) if not state] + if missing: + # There are things not in our in memory cache. Lets pull them out of + # the database. + res = await self.store.get_presence_for_users(missing) + states.update(res) - Args: - target_user_ids (list) + missing = [user_id for user_id, state in iteritems(states) if not state] + if missing: + new = { + user_id: UserPresenceState.default(user_id) for user_id in missing + } + states.update(new) + self.user_to_current_state.update(new) - Returns: - list - """ + return states @abc.abstractmethod async def set_state( @@ -162,13 +189,12 @@ async def set_state( """Set the presence state of the user. """ -class PresenceHandler(AbstractPresenceHandler): +class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "synapse.server.HomeServer"): + super().__init__(hs) self.hs = hs self.is_mine_id = hs.is_mine_id self.server_name = hs.hostname - self.clock = hs.get_clock() - self.store = hs.get_datastore() self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() @@ -178,13 +204,6 @@ def __init__(self, hs: "synapse.server.HomeServer"): federation_registry.register_edu_handler("m.presence", self.incoming_presence) - active_presence = self.store.take_presence_startup_info() - - # A dictionary of the current state of users. This is prefilled with - # non-offline presence from the DB. We should fetch from the DB if - # we can't find a users presence in here. - self.user_to_current_state = {state.user_id: state for state in active_presence} - LaterGauge( "synapse_handlers_presence_user_to_current_state_size", "", @@ -193,7 +212,7 @@ def __init__(self, hs: "synapse.server.HomeServer"): ) now = self.clock.time_msec() - for state in active_presence: + for state in self.user_to_current_state.values(): self.wheel_timer.insert( now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER ) @@ -617,34 +636,6 @@ async def current_state_for_user(self, user_id): res = await self.current_state_for_users([user_id]) return res[user_id] - async def current_state_for_users(self, user_ids): - """Get the current presence state for multiple users. - - Returns: - dict: `user_id` -> `UserPresenceState` - """ - states = { - user_id: self.user_to_current_state.get(user_id, None) - for user_id in user_ids - } - - missing = [user_id for user_id, state in iteritems(states) if not state] - if missing: - # There are things not in our in memory cache. Lets pull them out of - # the database. - res = await self.store.get_presence_for_users(missing) - states.update(res) - - missing = [user_id for user_id, state in iteritems(states) if not state] - if missing: - new = { - user_id: UserPresenceState.default(user_id) for user_id in missing - } - states.update(new) - self.user_to_current_state.update(new) - - return states - async def _persist_and_notify(self, states): """Persist states in the database, poke the notifier and send to interested remote servers @@ -732,23 +723,6 @@ async def incoming_presence(self, origin, content): federation_presence_counter.inc(len(updates)) await self._update_states(updates) - async def get_state(self, target_user: UserID) -> UserPresenceState: - results = await self.get_states([target_user.to_string()]) - return results[0] - - async def get_states( - self, target_user_ids: Iterable[str] - ) -> List[UserPresenceState]: - """Get the presence state for users.""" - - updates = await self.current_state_for_users(target_user_ids) - updates = list(updates.values()) - - for user_id in set(target_user_ids) - {u.user_id for u in updates}: - updates.append(UserPresenceState.default(user_id)) - - return updates - async def set_state(self, target_user, state, ignore_status_msg=False): """Set the presence state of the user. """ @@ -935,7 +909,7 @@ async def _on_user_joined_room(self, room_id, user_id): user_ids = await self.state.get_current_users_in_room(room_id) user_ids = list(filter(self.is_mine_id, user_ids)) - states = await self.current_state_for_users(user_ids) + states_d = await self.current_state_for_users(user_ids) # Filter out old presence, i.e. offline presence states where # the user hasn't been active for a week. We can change this @@ -945,7 +919,7 @@ async def _on_user_joined_room(self, room_id, user_id): now = self.clock.time_msec() states = [ state - for state in states.values() + for state in states_d.values() if state.state != PresenceState.OFFLINE or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 or state.status_msg is not None diff --git a/synapse/server.pyi b/synapse/server.pyi index d539d3c3ec55..f1a5717028c8 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -97,9 +97,7 @@ class HomeServer(object): pass def get_notifier(self) -> synapse.notifier.Notifier: pass - def get_presence_handler( - self, - ) -> synapse.handlers.presence.AbstractPresenceHandler: + def get_presence_handler(self) -> synapse.handlers.presence.BasePresenceHandler: pass def get_clock(self) -> synapse.util.Clock: pass From f47ea0f9e3bd16d77a5e6dbd20d52bf37f1538c2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Apr 2020 17:40:51 +0100 Subject: [PATCH 4/8] Cleanups for GenericWorkerPresence --- synapse/app/generic_worker.py | 72 ++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 26 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index e546e88a20b6..9fc745b11803 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -17,7 +17,9 @@ import contextlib import logging import sys -from typing import Iterable +from typing import Dict, Iterable + +from typing_extensions import ContextManager from twisted.internet import defer, reactor from twisted.web.resource import NoResource @@ -222,6 +224,13 @@ async def on_POST(self, request, device_id): return 200, {"one_time_key_counts": result} +class _NullContextManager(ContextManager[None]): + """A context manager which does nothing.""" + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + UPDATE_SYNCING_USERS_MS = 10 * 1000 @@ -231,7 +240,13 @@ def __init__(self, hs): self.hs = hs self.is_mine_id = hs.is_mine_id self.http_client = hs.get_simple_http_client() - self.user_to_num_current_syncs = {} + + self._presence_enabled = hs.config.use_presence + + # The number of ongoing syncs on this process, by user id. + # Empty if _presence_enabled is false. + self._user_to_num_current_syncs = {} # type: Dict[str, int] + self.notifier = hs.get_notifier() self.instance_id = hs.get_instance_id() @@ -252,13 +267,13 @@ def __init__(self, hs): ) def _on_shutdown(self): - if self.hs.config.use_presence: + if self._presence_enabled: self.hs.get_tcp_replication().send_command( ClearUserSyncsCommand(self.instance_id) ) def send_user_sync(self, user_id, is_syncing, last_sync_ms): - if self.hs.config.use_presence: + if self._presence_enabled: self.hs.get_tcp_replication().send_user_sync( self.instance_id, user_id, is_syncing, last_sync_ms ) @@ -300,24 +315,33 @@ def set_state(self, user, state, ignore_status_msg=False): # TODO Hows this supposed to work? return defer.succeed(None) - def user_syncing(self, user_id, affect_presence): - if affect_presence: - curr_sync = self.user_to_num_current_syncs.get(user_id, 0) - self.user_to_num_current_syncs[user_id] = curr_sync + 1 + async def user_syncing( + self, user_id: str, affect_presence: bool + ) -> ContextManager[None]: + """Record that a user is syncing. + + Called by the sync and events servlets to record that a user has connected to + this worker and is waiting for some events. + """ + if not affect_presence or not self._presence_enabled: + return _NullContextManager() - # If we went from no in flight sync to some, notify replication - if self.user_to_num_current_syncs[user_id] == 1: - self.mark_as_coming_online(user_id) + curr_sync = self._user_to_num_current_syncs.get(user_id, 0) + self._user_to_num_current_syncs[user_id] = curr_sync + 1 + + # If we went from no in flight sync to some, notify replication + if self._user_to_num_current_syncs[user_id] == 1: + self.mark_as_coming_online(user_id) def _end(): # We check that the user_id is in user_to_num_current_syncs because # user_to_num_current_syncs may have been cleared if we are # shutting down. - if affect_presence and user_id in self.user_to_num_current_syncs: - self.user_to_num_current_syncs[user_id] -= 1 + if user_id in self._user_to_num_current_syncs: + self._user_to_num_current_syncs[user_id] -= 1 # If we went from one in flight sync to non, notify replication - if self.user_to_num_current_syncs[user_id] == 0: + if self._user_to_num_current_syncs[user_id] == 0: self.mark_as_going_offline(user_id) @contextlib.contextmanager @@ -327,7 +351,7 @@ def _user_syncing(): finally: _end() - return defer.succeed(_user_syncing()) + return _user_syncing() @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): @@ -362,15 +386,12 @@ def process_replication_rows(self, token, rows): stream_id = token yield self.notify_from_replication(states, stream_id) - def get_currently_syncing_users(self): - if self.hs.config.use_presence: - return [ - user_id - for user_id, count in self.user_to_num_current_syncs.items() - if count > 0 - ] - else: - return set() + def get_currently_syncing_users(self) -> Set[str]: + return { + user_id + for user_id, count in self._user_to_num_current_syncs.items() + if count > 0 + } class GenericWorkerTyping(object): @@ -612,8 +633,7 @@ def __init__(self, hs): self.store = hs.get_datastore() self.typing_handler = hs.get_typing_handler() - # NB this is a SynchrotronPresence, not a normal PresenceHandler - self.presence_handler = hs.get_presence_handler() + self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence self.notifier = hs.get_notifier() self.notify_pushers = hs.config.start_pushers From bb892bdb93d2dc96eccfd0ec98100475851075a0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Apr 2020 18:03:49 +0100 Subject: [PATCH 5/8] Stop sending USER_SYNC commands for syncs on other processes --- docs/tcp_replication.md | 6 +---- synapse/app/generic_worker.py | 6 ++--- synapse/handlers/presence.py | 40 ++++++++++++++--------------- synapse/replication/tcp/commands.py | 7 +++-- synapse/replication/tcp/handler.py | 15 ++++------- 5 files changed, 34 insertions(+), 40 deletions(-) diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md index 3be8e50c4c6f..b922d9cf7e66 100644 --- a/docs/tcp_replication.md +++ b/docs/tcp_replication.md @@ -196,7 +196,7 @@ Asks the server for the current position of all streams. #### USER_SYNC (C) - A user has started or stopped syncing + A user has started or stopped syncing on this process. #### CLEAR_USER_SYNC (C) @@ -216,10 +216,6 @@ Asks the server for the current position of all streams. Inform the server a cache should be invalidated -#### SYNC (S, C) - - Used exclusively in tests - ### REMOTE_SERVER_UP (S, C) Inform other processes that a remote server may have come back online. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 9fc745b11803..fba5b20ed933 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -386,12 +386,12 @@ def process_replication_rows(self, token, rows): stream_id = token yield self.notify_from_replication(states, stream_id) - def get_currently_syncing_users(self) -> Set[str]: - return { + def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + return [ user_id for user_id, count in self._user_to_num_current_syncs.items() if count > 0 - } + ] class GenericWorkerTyping(object): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7fa38602cf71..e55b92a569f0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -129,10 +129,15 @@ async def user_syncing( """ @abc.abstractmethod - def get_currently_syncing_users(self) -> Set[str]: - """Get the set of user ids that are currently syncing on this HS. + def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + """Get the set of syncing users on this worker, to send to the presence handler + + This is called when a replication connection is established. It should return + a list of user ids, which are then sent as USER_SYNC commands to inform the + process handling presence about those users. + Returns: - set(str): A set of user_id strings. + A set of user_id strings. """ async def get_state(self, target_user: UserID) -> UserPresenceState: @@ -443,10 +448,18 @@ async def _handle_timeouts(self): timers_fired_counter.inc(len(states)) + syncing_user_ids = { + user_id + for user_id, count in self.user_to_num_current_syncs.items() + if count + } + for user_ids in self.external_process_to_current_syncs.values(): + syncing_user_ids.update(user_ids) + changes = handle_timeouts( states, is_mine_fn=self.is_mine_id, - syncing_user_ids=self.get_currently_syncing_users(), + syncing_user_ids=syncing_user_ids, now=now, ) @@ -544,22 +557,9 @@ def _user_syncing(): return _user_syncing() - def get_currently_syncing_users(self): - """Get the set of user ids that are currently syncing on this HS. - Returns: - set(str): A set of user_id strings. - """ - if self.hs.config.use_presence: - syncing_user_ids = { - user_id - for user_id, count in self.user_to_num_current_syncs.items() - if count - } - for user_ids in self.external_process_to_current_syncs.values(): - syncing_user_ids.update(user_ids) - return syncing_user_ids - else: - return set() + def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + # since we are the process handling presence, there is nothing to do here. + return [] async def update_external_syncs_row( self, process_id, user_id, is_syncing, sync_time_msec diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 5ec89d0fb875..8721f37ba722 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -210,7 +210,10 @@ def to_line(self): class UserSyncCommand(Command): """Sent by the client to inform the server that a user has started or - stopped syncing. Used to calculate presence on the master. + stopped syncing on this process. + + This is used by the process handling presence (typically the master) to + calculate who is online and who is not. Includes a timestamp of when the last user sync was. @@ -218,7 +221,7 @@ class UserSyncCommand(Command): USER_SYNC - Where is either "start" or "stop" + Where is either "start" or "end" """ NAME = "USER_SYNC" diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index e32e68e8c4bb..40e2d09299c3 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -301,13 +301,6 @@ async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): if self._is_master: self._notifier.notify_remote_server_up(cmd.data) - def get_currently_syncing_users(self): - """Get the list of currently syncing users (if any). This is called - when a connection has been established and we need to send the - currently syncing users. - """ - return self._presence_handler.get_currently_syncing_users() - def new_connection(self, connection: AbstractConnection): """Called when we have a new connection. """ @@ -325,9 +318,11 @@ def new_connection(self, connection: AbstractConnection): if self._factory: self._factory.resetDelay() - # Tell the server if we have any users currently syncing (should only - # happen on synchrotrons) - currently_syncing = self.get_currently_syncing_users() + # Tell the other end if we have any users currently syncing. + currently_syncing = ( + self._presence_handler.get_currently_syncing_users_for_replication() + ) + now = self._clock.time_msec() for user_id in currently_syncing: connection.send_command( From 84b6ccfef60c2036968ff1ca87bb00c3e94f073f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 21 Apr 2020 18:52:29 +0100 Subject: [PATCH 6/8] changelog --- changelog.d/7318.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7318.misc diff --git a/changelog.d/7318.misc b/changelog.d/7318.misc new file mode 100644 index 000000000000..676f285377f5 --- /dev/null +++ b/changelog.d/7318.misc @@ -0,0 +1 @@ +Move catchup of replication streams logic to worker. From c925ccc4f6d34b8f946be760fc122226a56c3221 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 22 Apr 2020 15:17:45 +0100 Subject: [PATCH 7/8] Apply suggestions from code review Co-Authored-By: Patrick Cloke --- synapse/handlers/presence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e55b92a569f0..ef96361b42d0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -130,14 +130,14 @@ async def user_syncing( @abc.abstractmethod def get_currently_syncing_users_for_replication(self) -> Iterable[str]: - """Get the set of syncing users on this worker, to send to the presence handler + """Get an iterable of syncing users on this worker, to send to the presence handler This is called when a replication connection is established. It should return a list of user ids, which are then sent as USER_SYNC commands to inform the process handling presence about those users. Returns: - A set of user_id strings. + An iterable of user_id strings. """ async def get_state(self, target_user: UserID) -> UserPresenceState: From 2a7df614a6ef7704271b510af71cd606d5351416 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 22 Apr 2020 15:20:49 +0100 Subject: [PATCH 8/8] remove duplicate type annotation --- synapse/handlers/presence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ef96361b42d0..5cbefae1777a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -122,8 +122,8 @@ async def user_syncing( when users disconnect/reconnect. Args: - user_id (str) - affect_presence (bool): If false this function will be a no-op. + user_id: the user that is starting a sync + affect_presence: If false this function will be a no-op. Useful for streams that are not associated with an actual client that is being used by a user. """