Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clean up ShardedWorkerHandlingConfig #9466

Merged
merged 5 commits into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9466.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix deleting pushers when using sharded pushers.
2 changes: 2 additions & 0 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,9 @@ def start(config_options):
config.update_user_directory = False
config.run_background_tasks = False
config.start_pushers = False
config.pusher_shard_config.instances = []
config.send_federation = False
config.federation_shard_config.instances = []

synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

Expand Down
32 changes: 0 additions & 32 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,22 +919,6 @@ def start(config_options):
# For other worker types we force this to off.
config.appservice.notify_appservices = False

if config.worker_app == "synapse.app.pusher":
if config.server.start_pushers:
sys.stderr.write(
"\nThe pushers must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``start_pushers: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the pushers to start since they will be disabled in the main config
config.server.start_pushers = True
else:
# For other worker types we force this to off.
config.server.start_pushers = False

if config.worker_app == "synapse.app.user_dir":
if config.server.update_user_directory:
sys.stderr.write(
Expand All @@ -951,22 +935,6 @@ def start(config_options):
# For other worker types we force this to off.
config.server.update_user_directory = False

if config.worker_app == "synapse.app.federation_sender":
if config.worker.send_federation:
sys.stderr.write(
"\nThe send_federation must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``send_federation: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the pushers to start since they will be disabled in the main config
config.worker.send_federation = True
else:
# For other worker types we force this to off.
config.worker.send_federation = False

synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

hs = GenericWorkerServer(
Expand Down
36 changes: 27 additions & 9 deletions synapse/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,22 +844,23 @@ class ShardedWorkerHandlingConfig:

def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key."""
# If multiple instances are not defined we always return true
if not self.instances or len(self.instances) == 1:
return True
# If no instances are defined we assume some other worker is handling
# this.
if not self.instances:
return False

return self.get_instance(key) == instance_name
return self._get_instance(key) == instance_name

def get_instance(self, key: str) -> str:
def _get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key.

Note: For things like federation sending the config for which instance
is sending is known only to the sender instance if there is only one.
Therefore `should_handle` should be used where possible.
Note: For federation sending and pushers the config for which instance
is sending is known only to the sender instance, so we don't expose this
method by default.
"""

if not self.instances:
return "master"
raise Exception("Unknown worker")

if len(self.instances) == 1:
return self.instances[0]
Expand All @@ -876,4 +877,21 @@ def get_instance(self, key: str) -> str:
return self.instances[remainder]


@attr.s
class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
"""A version of `ShardedWorkerHandlingConfig` that is used for config
options where all instances know which instances are responsible for the
sharded work.
"""

def __attrs_post_init__(self):
# We require that `self.instances` is non-empty.
if not self.instances:
raise Exception("Got empty list of instances for shard config")

def get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key."""
return self._get_instance(key)
clokep marked this conversation as resolved.
Show resolved Hide resolved


__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
2 changes: 2 additions & 0 deletions synapse/config/_base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,6 @@ class ShardedWorkerHandlingConfig:
instances: List[str]
def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ...

class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
def get_instance(self, key: str) -> str: ...
clokep marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 1 addition & 4 deletions synapse/config/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ._base import Config, ShardedWorkerHandlingConfig
from ._base import Config


class PushConfig(Config):
Expand All @@ -27,9 +27,6 @@ def read_config(self, config, **kwargs):
"group_unread_count_by_room", True
)

pusher_instances = config.get("pusher_instances") or []
self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)

# There was a a 'redact_content' setting but mistakenly read from the
# 'email'section'. Check for the flag in the 'push' section, and log,
# but do not honour it to avoid nasty surprises when people upgrade.
Expand Down
1 change: 0 additions & 1 deletion synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ def read_config(self, config, **kwargs):
if self.public_baseurl is not None:
if self.public_baseurl[-1] != "/":
self.public_baseurl += "/"
self.start_pushers = config.get("start_pushers", True)

# (undocumented) option for torturing the worker-mode replication a bit,
# for testing. The value defines the number of milliseconds to pause before
Expand Down
93 changes: 86 additions & 7 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,28 @@

import attr

from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
from ._base import (
Config,
ConfigError,
RoutableShardedWorkerHandlingConfig,
ShardedWorkerHandlingConfig,
)
from .server import ListenerConfig, parse_listener_def

_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
The send_federation config option must be disabled in the main
synapse process before they can be run in a separate worker.

Please add ``send_federation: false`` to the main config
"""

_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
The start_pushers config option must be disabled in the main
synapse process before they can be run in a separate worker.

Please add ``start_pushers: false`` to the main config
"""


def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
"""Helper for allowing parsing a string or list of strings to a config
Expand Down Expand Up @@ -103,6 +122,7 @@ def read_config(self, config, **kwargs):
self.worker_replication_secret = config.get("worker_replication_secret", None)

self.worker_name = config.get("worker_name", self.worker_app)
self.instance_name = self.worker_name or "master"

self.worker_main_http_uri = config.get("worker_main_http_uri", None)

Expand All @@ -118,12 +138,41 @@ def read_config(self, config, **kwargs):
)
)

# Whether to send federation traffic out in this process. This only
# applies to some federation traffic, and so shouldn't be used to
# "disable" federation
self.send_federation = config.get("send_federation", True)
# Handle federation sender configuration.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the logic for sharding federation sender and push is identical, can we abstract this somehow?

I think something like self.send_federation = self._handle_shared_config("send_federation", "federation_sender_instances", "synapse.app.federation_sender") might do it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly in two minds about this. On the one had, yes, on the other we should never have more than these two, and so I think it might actually be clearer just writing it out rather than factoring it out and passing a bunch of stuff in etc 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there isn't additional workers that we'll need to apply this to than that sounds OK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this only for working around the old style config options, new stuff shouldn't have that problem

#
# There are two ways of configuring which instances handle federation
# sending:
# 1. The old way where "send_federation" is set to false and running a
# `synapse.app.federation_sender` worker app.
# 2. Specifying the workers sending federation in
# `federation_sender_instances`.
#

send_federation = config.get("send_federation", True)

federation_sender_instances = config.get("federation_sender_instances")
if federation_sender_instances is None:
# Default to an empty list, which means "another, unknown, worker is
# responsible for it".
federation_sender_instances = []
Comment on lines +155 to +157
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens if you're using send_federation, but this config is being parsed on a non-federation sender worker. Is that correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly.


federation_sender_instances = config.get("federation_sender_instances") or []
# If no federation sender instances are set we check if
# `send_federation` is set, which means use master
if send_federation:
federation_sender_instances = ["master"]

if self.worker_app == "synapse.app.federation_sender":
if send_federation:
# If we're running federation senders, and not using
# `federation_sender_instances`, then we should have
# explicitly set `send_federation` to false.
raise ConfigError(
_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
)
clokep marked this conversation as resolved.
Show resolved Hide resolved

federation_sender_instances = [self.worker_name]

self.send_federation = self.instance_name in federation_sender_instances
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
Expand Down Expand Up @@ -164,7 +213,37 @@ def read_config(self, config, **kwargs):
"Must only specify one instance to handle `receipts` messages."
)

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
if len(self.writers.events) == 0:
raise ConfigError("Must specify at least one instance to handle `events`.")
Comment on lines +216 to +217
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would help us remember to add these if we used validators on the WriterLocations attributes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, though I'm not sure how easy it is to make ConfigError out of validation failures?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a few places that we raise a ConfigError from another exception, but not sure if it is worth refactoring that here. 🤷


self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)

# Handle sharded push
start_pushers = config.get("start_pushers", True)
pusher_instances = config.get("pusher_instances")
if pusher_instances is None:
# Default to an empty list, which means "another, unknown, worker is
# responsible for it".
pusher_instances = []

# If no pushers instances are set we check if `start_pushers` is
# set, which means use master
if start_pushers:
pusher_instances = ["master"]

if self.worker_app == "synapse.app.pusher":
if start_pushers:
# If we're running pushers, and not using
# `pusher_instances`, then we should have explicitly set
# `start_pushers` to false.
raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)

pusher_instances = [self.instance_name]

self.start_pushers = self.instance_name in pusher_instances
self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)

# Whether this worker should run background tasks or not.
#
Expand Down
4 changes: 3 additions & 1 deletion synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class PusherPool:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.pusher_factory = PusherFactory(hs)
self._should_start_pushers = hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()

Expand All @@ -68,6 +67,9 @@ def __init__(self, hs: "HomeServer"):
# We shard the handling of push notifications by user ID.
self._pusher_shard_config = hs.config.push.pusher_shard_config
self._instance_name = hs.get_instance_name()
self._should_start_pushers = (
self._instance_name in self._pusher_shard_config.instances
)

# We can only delete pushers on master.
self._remove_pusher_client = None
Expand Down
7 changes: 2 additions & 5 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def __init__(
self.start_time = None # type: Optional[int]

self._instance_id = random_string(5)
self._instance_name = config.worker_name or "master"
self._instance_name = config.worker.instance_name

self.version_string = version_string

Expand Down Expand Up @@ -760,7 +760,4 @@ def get_outbound_redis_connection(self) -> Optional["RedisProtocol"]:

def should_send_federation(self) -> bool:
"Should this server be sending federation traffic directly?"
return self.config.send_federation and (
not self.config.worker_app
or self.config.worker_app == "synapse.app.federation_sender"
)
return self.config.send_federation
2 changes: 1 addition & 1 deletion tests/replication/tcp/streams/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _get_worker_hs_config(self) -> dict:
# enable federation sending on the worker
config = super()._get_worker_hs_config()
# TODO: make it so we don't need both of these
config["send_federation"] = True
config["send_federation"] = False
config["worker_app"] = "synapse.app.federation_sender"
return config

Expand Down
2 changes: 1 addition & 1 deletion tests/replication/test_federation_ack.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class FederationAckTestCase(HomeserverTestCase):
def default_config(self) -> dict:
config = super().default_config()
config["worker_app"] = "synapse.app.federation_sender"
config["send_federation"] = True
config["send_federation"] = False
return config

def make_homeserver(self, reactor, clock):
Expand Down
2 changes: 1 addition & 1 deletion tests/replication/test_federation_sender_shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_send_event_single_sender(self):

self.make_worker_hs(
"synapse.app.federation_sender",
{"send_federation": True},
{"send_federation": False},
federation_http_client=mock_client,
)

Expand Down
2 changes: 1 addition & 1 deletion tests/replication/test_pusher_shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_send_push_single_worker(self):

self.make_worker_hs(
"synapse.app.pusher",
{"start_pushers": True},
{"start_pushers": False},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused why we need to provide this? Does the test config default to starting push?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_pushers should be false in the config file, then previously it got set to true during generic_worker.start (which doesn't get run from here) and now gets set to true in the worker config class (which does get run)

proxied_blacklisted_http_client=http_client_mock,
)

Expand Down