From b7d8d15d8a923832a793cc47126b9dcaf7930a93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Mar 2020 15:06:35 +0000 Subject: [PATCH 1/4] Make ReplicationStreamer work on workers --- synapse/replication/tcp/resource.py | 33 +++++++++++++---------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index b2d6baa2a231..33d2f589ac36 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,9 +17,7 @@ import logging import random -from typing import Dict - -from six import itervalues +from typing import Dict, List from prometheus_client import Counter @@ -71,29 +69,28 @@ class ReplicationStreamer(object): def __init__(self, hs): self.store = hs.get_datastore() - self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() self.notifier = hs.get_notifier() - self._server_notices_sender = hs.get_server_notices_sender() self._replication_torture_level = hs.config.replication_torture_level - # List of streams that clients can subscribe to. - # We only support federation stream if federation sending hase been - # disabled on the master. - self.streams = [ - stream(hs) - for stream in itervalues(STREAMS_MAP) - if stream != FederationStream or not hs.config.send_federation - ] + # Work out list of streams that this instance is the source of. + self.streams = [] # type: List[Stream] + if hs.config.worker_app is None: + for stream in STREAMS_MAP.values(): + if stream == FederationStream and hs.config.send_federation: + # We only support federation stream if federation sending + # hase been disabled on the master. + continue - self.streams_by_name = {stream.NAME: stream for stream in self.streams} + self.streams.append(stream(hs)) - self.federation_sender = None - if not hs.config.send_federation: - self.federation_sender = hs.get_federation_sender() + self.streams_by_name = {stream.NAME: stream for stream in self.streams} - self.notifier.add_replication_callback(self.on_notifier_poke) + # Only bother registering the notifier callback if we have streams to + # publish. + if self.streams: + self.notifier.add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False From 9486faaf88ffc63f21ab668d32a0ee3e620f92b0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Mar 2020 11:25:02 +0000 Subject: [PATCH 2/4] Always start ReplicationStreamer on workers --- synapse/app/generic_worker.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 37afd2f81034..8ab34534c5d6 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -947,17 +947,21 @@ def start(config_options): synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts - ss = GenericWorkerServer( + hs = GenericWorkerServer( config.server_name, config=config, version_string="Synapse/" + get_version_string(synapse), ) - setup_logging(ss, config, use_worker_options=True) + setup_logging(hs, config, use_worker_options=True) + + hs.setup() + + # Ensure the replication streamer is always started. + hs.get_replication_streamer() - ss.setup() reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners + "before", "startup", _base.start, hs, config.worker_listeners ) _base.start_worker_reactor("synapse-generic-worker", config) From 69682ec79235446fd5b1894716074174226a93d4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Apr 2020 18:05:22 +0100 Subject: [PATCH 3/4] Newsfile --- changelog.d/7146.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7146.misc diff --git a/changelog.d/7146.misc b/changelog.d/7146.misc new file mode 100644 index 000000000000..facde0695951 --- /dev/null +++ b/changelog.d/7146.misc @@ -0,0 +1 @@ +Run replication streamers on workers. From 840f5eb6bcb4de1a94693ebfc7a8d90c0b06798f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Apr 2020 18:12:41 +0100 Subject: [PATCH 4/4] Update comment --- synapse/app/generic_worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 8ab34534c5d6..91acc2336a3e 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -957,7 +957,8 @@ def start(config_options): hs.setup() - # Ensure the replication streamer is always started. + # Ensure the replication streamer is always started in case we write to any + # streams. Will no-op if no streams can be written to by this worker. hs.get_replication_streamer() reactor.addSystemEventTrigger(