Skip to content

Commit

Permalink
Fixups to new push stream (#17038)
Browse files Browse the repository at this point in the history
Follow on from #17037
  • Loading branch information
erikjohnston committed Mar 28, 2024
1 parent ea6bfae commit fd48fc4
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 12 deletions.
2 changes: 1 addition & 1 deletion changelog.d/17037.feature
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Add support for moving `/push_rules` off of main process.
Add support for moving `/pushrules` off of main process.
1 change: 1 addition & 0 deletions changelog.d/17038.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for moving `/pushrules` off of main process.
8 changes: 8 additions & 0 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"push_rules": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
}

# Templates for sections that may be inserted multiple times in config files
Expand Down Expand Up @@ -401,6 +408,7 @@ def add_worker_roles_to_shared_config(
"receipts",
"to_device",
"typing",
"push_rules",
]

# Worker-type specific sharding config. Now a single worker can fulfill multiple
Expand Down
4 changes: 2 additions & 2 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,12 +532,12 @@ the stream writer for the `presence` stream:

^/_matrix/client/(api/v1|r0|v3|unstable)/presence/

##### The `push` stream
##### The `push_rules` stream

The following endpoints should be routed directly to the worker configured as
the stream writer for the `push` stream:

^/_matrix/client/(api/v1|r0|v3|unstable)/push_rules/
^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/

#### Restrict outbound federation traffic to a specific set of workers

Expand Down
8 changes: 4 additions & 4 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class WriterLocations:
can only be a single instance.
presence: The instances that write to the presence stream. Currently
can only be a single instance.
push: The instances that write to the push stream. Currently
push_rules: The instances that write to the push stream. Currently
can only be a single instance.
"""

Expand Down Expand Up @@ -184,7 +184,7 @@ class WriterLocations:
default=["master"],
converter=_instance_to_list_converter,
)
push: List[str] = attr.ib(
push_rules: List[str] = attr.ib(
default=["master"],
converter=_instance_to_list_converter,
)
Expand Down Expand Up @@ -347,7 +347,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"account_data",
"receipts",
"presence",
"push",
"push_rules",
):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
Expand Down Expand Up @@ -385,7 +385,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"Must only specify one instance to handle `presence` messages."
)

if len(self.writers.push) != 1:
if len(self.writers.push_rules) != 1:
raise ConfigError(
"Must only specify one instance to handle `push` messages."
)
Expand Down
6 changes: 4 additions & 2 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ def __init__(self, hs: "HomeServer"):
hs.config.server.forgotten_room_retention_period
)

self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push
self._push_writer = hs.config.worker.writers.push[0]
self._is_push_writer = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)
self._push_writer = hs.config.worker.writers.push_rules[0]
self._copy_push_client = ReplicationCopyPusherRestServlet.make_client(hs)

def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def __init__(self, hs: "HomeServer"):
continue

if isinstance(stream, PushRulesStream):
if hs.get_instance_name() in hs.config.worker.writers.push:
if hs.get_instance_name() in hs.config.worker.writers.push_rules:
self._streams_to_replicate.append(stream)

continue
Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
self._is_push_worker = hs.get_instance_name() in hs.config.worker.writers.push
self._is_push_worker = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)
self._push_rules_handler = hs.get_push_rules_handler()
self._push_rule_linearizer = Linearizer(name="push_rules")

Expand Down
4 changes: 3 additions & 1 deletion synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ def __init__(
):
super().__init__(database, db_conn, hs)

self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push
self._is_push_writer = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
Expand Down

0 comments on commit fd48fc4

Please sign in to comment.