Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Run replication streamers on workers (#7146)
Browse files Browse the repository at this point in the history
Currently we never write to streams from workers, but that will change soon
  • Loading branch information
erikjohnston committed Apr 28, 2020
1 parent 07337fe commit 38919b5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 22 deletions.
1 change: 1 addition & 0 deletions changelog.d/7146.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Run replication streamers on workers.
13 changes: 9 additions & 4 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,17 +960,22 @@ def start(config_options):

synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

ss = GenericWorkerServer(
hs = GenericWorkerServer(
config.server_name,
config=config,
version_string="Synapse/" + get_version_string(synapse),
)

setup_logging(ss, config, use_worker_options=True)
setup_logging(hs, config, use_worker_options=True)

hs.setup()

# Ensure the replication streamer is always started in case we write to any
# streams. Will no-op if no streams can be written to by this worker.
hs.get_replication_streamer()

ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
"before", "startup", _base.start, hs, config.worker_listeners
)

_base.start_worker_reactor("synapse-generic-worker", config)
Expand Down
33 changes: 15 additions & 18 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

import logging
import random
from typing import Dict

from six import itervalues
from typing import Dict, List

from prometheus_client import Counter

Expand Down Expand Up @@ -71,29 +69,28 @@ class ReplicationStreamer(object):

def __init__(self, hs):
self.store = hs.get_datastore()
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self._server_notices_sender = hs.get_server_notices_sender()

self._replication_torture_level = hs.config.replication_torture_level

# List of streams that clients can subscribe to.
# We only support federation stream if federation sending hase been
# disabled on the master.
self.streams = [
stream(hs)
for stream in itervalues(STREAMS_MAP)
if stream != FederationStream or not hs.config.send_federation
]
# Work out list of streams that this instance is the source of.
self.streams = [] # type: List[Stream]
if hs.config.worker_app is None:
for stream in STREAMS_MAP.values():
if stream == FederationStream and hs.config.send_federation:
# We only support federation stream if federation sending
# hase been disabled on the master.
continue

self.streams_by_name = {stream.NAME: stream for stream in self.streams}
self.streams.append(stream(hs))

self.federation_sender = None
if not hs.config.send_federation:
self.federation_sender = hs.get_federation_sender()
self.streams_by_name = {stream.NAME: stream for stream in self.streams}

self.notifier.add_replication_callback(self.on_notifier_poke)
# Only bother registering the notifier callback if we have streams to
# publish.
if self.streams:
self.notifier.add_replication_callback(self.on_notifier_poke)

# Keeps track of whether we are currently checking for updates
self.is_looping = False
Expand Down

0 comments on commit 38919b5

Please sign in to comment.