Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Run replication streamers on workers #7146

Merged
merged 4 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7146.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Run replication streamers on workers.
13 changes: 9 additions & 4 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,17 +947,22 @@ def start(config_options):

synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

ss = GenericWorkerServer(
hs = GenericWorkerServer(
config.server_name,
config=config,
version_string="Synapse/" + get_version_string(synapse),
)

setup_logging(ss, config, use_worker_options=True)
setup_logging(hs, config, use_worker_options=True)

hs.setup()

# Ensure the replication streamer is always started in case we write to any
# streams. Will no-op if no streams can be written to by this worker.
hs.get_replication_streamer()

ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
"before", "startup", _base.start, hs, config.worker_listeners
)

_base.start_worker_reactor("synapse-generic-worker", config)
Expand Down
33 changes: 15 additions & 18 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

import logging
import random
from typing import Dict

from six import itervalues
from typing import Dict, List

from prometheus_client import Counter

Expand Down Expand Up @@ -71,29 +69,28 @@ class ReplicationStreamer(object):

def __init__(self, hs):
self.store = hs.get_datastore()
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self._server_notices_sender = hs.get_server_notices_sender()

self._replication_torture_level = hs.config.replication_torture_level

# List of streams that clients can subscribe to.
# We only support federation stream if federation sending hase been
# disabled on the master.
self.streams = [
stream(hs)
for stream in itervalues(STREAMS_MAP)
if stream != FederationStream or not hs.config.send_federation
]
# Work out list of streams that this instance is the source of.
self.streams = [] # type: List[Stream]
if hs.config.worker_app is None:
for stream in STREAMS_MAP.values():
if stream == FederationStream and hs.config.send_federation:
# We only support federation stream if federation sending
# hase been disabled on the master.
continue

self.streams_by_name = {stream.NAME: stream for stream in self.streams}
self.streams.append(stream(hs))

self.federation_sender = None
if not hs.config.send_federation:
self.federation_sender = hs.get_federation_sender()
self.streams_by_name = {stream.NAME: stream for stream in self.streams}

self.notifier.add_replication_callback(self.on_notifier_poke)
# Only bother registering the notifier callback if we have streams to
# publish.
if self.streams:
self.notifier.add_replication_callback(self.on_notifier_poke)

# Keeps track of whether we are currently checking for updates
self.is_looping = False
Expand Down