Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor federation_sender and pusher configuration loading. #14496

Merged
merged 13 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14496.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `federation_sender` and `pusher` configuration loading.
135 changes: 61 additions & 74 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,6 @@
)
from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def

_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
The send_federation config option must be disabled in the main
synapse process before they can be run in a separate worker.

Please add ``send_federation: false`` to the main config
"""

_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
The start_pushers config option must be disabled in the main
synapse process before they can be run in a separate worker.

Please add ``start_pushers: false`` to the main config
"""

_DEPRECATED_WORKER_DUTY_OPTION_USED = """
The '%s' configuration option is deprecated and will be removed in a future
Synapse version. Please use ``%s: name_of_worker`` instead.
Expand Down Expand Up @@ -182,45 +168,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
)
)

# Handle federation sender configuration.
#
# There are two ways of configuring which instances handle federation
# sending:
# 1. The old way where "send_federation" is set to false and running a
# `synapse.app.federation_sender` worker app.
# 2. Specifying the workers sending federation in
# `federation_sender_instances`.
#

send_federation = config.get("send_federation", True)

federation_sender_instances = config.get("federation_sender_instances")
if federation_sender_instances is None:
# Default to an empty list, which means "another, unknown, worker is
# responsible for it".
federation_sender_instances = []

# If no federation sender instances are set we check if
# `send_federation` is set, which means use master
if send_federation:
federation_sender_instances = ["master"]

if self.worker_app == "synapse.app.federation_sender":
if send_federation:
# If we're running federation senders, and not using
# `federation_sender_instances`, then we should have
# explicitly set `send_federation` to false.
raise ConfigError(
_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
)

federation_sender_instances = [self.worker_name]

self.send_federation = self.instance_name in federation_sender_instances
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)

# A map from instance name to host/port of their HTTP replication endpoint.
instance_map = config.get("instance_map") or {}
self.instance_map = {
Expand Down Expand Up @@ -281,28 +228,24 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.writers.events
)

# Handle sharded push
start_pushers = config.get("start_pushers", True)
pusher_instances = config.get("pusher_instances")
if pusher_instances is None:
# Default to an empty list, which means "another, unknown, worker is
# responsible for it".
pusher_instances = []

# If no pushers instances are set we check if `start_pushers` is
# set, which means use master
if start_pushers:
pusher_instances = ["master"]

if self.worker_app == "synapse.app.pusher":
if start_pushers:
# If we're running pushers, and not using
# `pusher_instances`, then we should have explicitly set
# `start_pushers` to false.
raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)

pusher_instances = [self.instance_name]
federation_sender_instances = self._worker_names_performing_this_duty(
config,
"send_federation",
"synapse.app.federation_sender",
"federation_sender_instances",
)
self.send_federation = self.instance_name in federation_sender_instances
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
realtyem marked this conversation as resolved.
Show resolved Hide resolved

# Handle sharded push
pusher_instances = self._worker_names_performing_this_duty(
config,
"start_pushers",
"synapse.app.pusher",
"pusher_instances",
)
self.start_pushers = self.instance_name in pusher_instances
self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)

Expand Down Expand Up @@ -425,6 +368,50 @@ def _should_this_worker_perform_duty(
# (By this point, these are either the same value or only one is not None.)
return bool(new_option_should_run_here or legacy_option_should_run_here)

def _worker_names_performing_this_duty(
self,
config: Dict[str, Any],
legacy_option_name: str,
legacy_app_name: str,
modern_instance_map_name: str,
) -> List[str]:
"""
retrieves the names of the workers handling a given duty, by either legacy option or instance_map
config: settings read from yaml.
legacy_option_name: the old way of enabling options. e.g. 'start_pushers'
legacy_app_name: The historical app name. e.g. 'synapse.app.pusher'
modern_instance_map_name: the string name of the new instance_map. e.g. 'pusher_instances'
realtyem marked this conversation as resolved.
Show resolved Hide resolved
"""
realtyem marked this conversation as resolved.
Show resolved Hide resolved

legacy_option = config.get(legacy_option_name, True)

worker_instance_map = config.get(modern_instance_map_name)
realtyem marked this conversation as resolved.
Show resolved Hide resolved
if worker_instance_map is None:
# Default to an empty list, which means "another, unknown, worker is
# responsible for it".
worker_instance_map = []

# If no worker instances are set we check if
# `legacy_option_name` is set, which means use master
realtyem marked this conversation as resolved.
Show resolved Hide resolved
if legacy_option:
worker_instance_map = ["master"]

if self.worker_app == legacy_app_name:
if legacy_option:
# If we're using `legacy_app_name`, and not using
# `modern_instance_map_name`, then we should have
# explicitly set `legacy_option_name` to false.
raise ConfigError(
f"The '{legacy_option_name}' config option must be disabled in "
"the main synapse process before they can be run in a separate "
"worker.\n"
f"Please add `{legacy_option_name}: false` to the main config.\n",
realtyem marked this conversation as resolved.
Show resolved Hide resolved
)

worker_instance_map = [self.worker_name]

return worker_instance_map

def read_arguments(self, args: argparse.Namespace) -> None:
# We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running
Expand Down