Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove need for worker_main_http_uri setting to use /keys/upload. #14400

Merged
merged 18 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14400.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove the `worker_main_http_uri` configuration setting. This is now handled via internal replication.
realtyem marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 1 addition & 4 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,7 @@
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"shared_extra_conf": {},
"worker_extra_conf": (
"worker_main_http_uri: http://127.0.0.1:%d"
% (MAIN_PROCESS_HTTP_LISTENER_PORT,)
),
"worker_extra_conf": "",
},
"account_data": {
"app": "synapse.app.generic_worker",
Expand Down
5 changes: 2 additions & 3 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ In the config file for each worker, you must specify:
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)).
* If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option
with an `http` listener.
* If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
* **Deprecated as of Synapse v1.72.** If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
the main process (`worker_main_http_uri`).
realtyem marked this conversation as resolved.
Show resolved Hide resolved

For example:
Expand Down Expand Up @@ -221,7 +221,6 @@ information.
^/_matrix/client/(api/v1|r0|v3|unstable)/search$

# Encryption requests
# Note that ^/_matrix/client/(r0|v3|unstable)/keys/upload/ requires `worker_main_http_uri`
^/_matrix/client/(r0|v3|unstable)/keys/query$
^/_matrix/client/(r0|v3|unstable)/keys/changes$
^/_matrix/client/(r0|v3|unstable)/keys/claim$
Expand Down Expand Up @@ -376,7 +375,7 @@ responsible for
- persisting them to the DB, and finally
- updating the events stream.

Because load is sharded in this way, you *must* restart all worker instances when
Because load is sharded in this way, you *must* restart all worker instances when
adding or removing event persisters.

An `event_persister` should not be mistaken for an `event_creator`.
Expand Down
104 changes: 3 additions & 101 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
# limitations under the License.
import logging
import sys
from typing import Dict, List, Optional, Tuple
from typing import Dict, List

from twisted.internet import address
from twisted.web.resource import Resource

import synapse
import synapse.events
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.api.urls import (
CLIENT_API_PREFIX,
FEDERATION_PREFIX,
Expand All @@ -43,8 +41,7 @@
from synapse.config.server import ListenerConfig
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.server import JsonResource, OptionsResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
Expand All @@ -70,12 +67,12 @@
versions,
voip,
)
from synapse.rest.client._base import client_patterns
from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet
from synapse.rest.client.devices import DevicesRestServlet
from synapse.rest.client.keys import (
KeyChangesServlet,
KeyQueryServlet,
KeyUploadServlet,
OneTimeKeyServlet,
)
from synapse.rest.client.register import (
Expand Down Expand Up @@ -132,107 +129,12 @@
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
from synapse.types import JsonDict
from synapse.util import SYNAPSE_VERSION
from synapse.util.httpresourcetree import create_resource_tree

logger = logging.getLogger("synapse.app.generic_worker")


class KeyUploadServlet(RestServlet):
"""An implementation of the `KeyUploadServlet` that responds to read only
requests, but otherwise proxies through to the master instance.
"""

PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")

def __init__(self, hs: HomeServer):
"""
Args:
hs: server
"""
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.http_client = hs.get_simple_http_client()
self.main_uri = hs.config.worker.worker_main_http_uri

async def on_POST(
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
self, request: SynapseRequest, device_id: Optional[str]
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
user_id = requester.user.to_string()
body = parse_json_object_from_request(request)

if device_id is not None:
# passing the device_id here is deprecated; however, we allow it
# for now for compatibility with older clients.
if requester.device_id is not None and device_id != requester.device_id:
logger.warning(
"Client uploading keys for a different device "
"(logged in as %s, uploading for %s)",
requester.device_id,
device_id,
)
else:
device_id = requester.device_id

if device_id is None:
raise SynapseError(
400, "To upload keys, you must pass device_id when authenticating"
)

if body:
# They're actually trying to upload something, proxy to main synapse.

# Proxy headers from the original request, such as the auth headers
# (in case the access token is there) and the original IP /
# User-Agent of the request.
headers: Dict[bytes, List[bytes]] = {
header: list(request.requestHeaders.getRawHeaders(header, []))
for header in (b"Authorization", b"User-Agent")
}
# Add the previous hop to the X-Forwarded-For header.
x_forwarded_for = list(
request.requestHeaders.getRawHeaders(b"X-Forwarded-For", [])
)
# we use request.client here, since we want the previous hop, not the
# original client (as returned by request.getClientAddress()).
if isinstance(request.client, (address.IPv4Address, address.IPv6Address)):
previous_host = request.client.host.encode("ascii")
# If the header exists, add to the comma-separated list of the first
# instance of the header. Otherwise, generate a new header.
if x_forwarded_for:
x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host]
x_forwarded_for.extend(x_forwarded_for[1:])
else:
x_forwarded_for = [previous_host]
headers[b"X-Forwarded-For"] = x_forwarded_for

# Replicate the original X-Forwarded-Proto header. Note that
# XForwardedForRequest overrides isSecure() to give us the original protocol
# used by the client, as opposed to the protocol used by our upstream proxy
# - which is what we want here.
headers[b"X-Forwarded-Proto"] = [
b"https" if request.isSecure() else b"http"
]

try:
result = await self.http_client.post_json_get_json(
self.main_uri + request.uri.decode("ascii"), body, headers=headers
)
except HttpResponseException as e:
raise e.to_synapse_error() from e
except RequestSendFailed as e:
raise SynapseError(502, "Failed to talk to master") from e

return 200, result
else:
# Just interested in counts.
result = await self.store.count_e2e_one_time_keys(user_id, device_id)
return 200, {"one_time_key_counts": result}


class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
Expand Down
2 changes: 0 additions & 2 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.worker_name = config.get("worker_name", self.worker_app)
self.instance_name = self.worker_name or "master"

self.worker_main_http_uri = config.get("worker_main_http_uri", None)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log a warning if "worker_main_http_uri" in config?

Something like "The config option worker_main_http_uri is unused since Synapse 1.72. It can be safely removed from your configuration."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Level INFO ok? Or would DEBUG be better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning, please.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.warning(NON_SQLITE_DATABASE_PATH_WARNING)
for example. More generally, grep for logger.warning in synapse/config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 8b4e099
That look ok?

# This option is really only here to support `--manhole` command line
# argument.
manhole = config.get("worker_manhole")
Expand Down
103 changes: 103 additions & 0 deletions synapse/replication/http/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from twisted.web.server import Request

from synapse.http.server import HttpServer
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict

Expand Down Expand Up @@ -78,5 +79,107 @@ async def _handle_request( # type: ignore[override]
return 200, user_devices


class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
"""Ask master to upload keys for the user and send them out over federation to
update other servers.

This must happen on master so that the results can be correctly cached in
the database and streamed to workers.( Is this accurate?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect the point is:

  • for now, there must be exactly one worker writing keys (to avoid write races)
  • we haven't found the key writing to be heavy enough that it needs pulling off the master worker
  • so it's written by the master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that specifically the "cached in the database and streamed to workers." is the part I wanted to make sure was accurate. This is a copy-paste dangler. The Replication HTTP Servlet itself does no caching of it's own(the CACHE modifier is set to False below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadly speaking, the database doesn't cache anything; it is the authoritative source of truth for the state of the homeserver. The application maintains various in-memory caches of the database's data. Workers typically receive cache invalidation messages over redis rather than new updates.

DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on
master to accomplish this.

Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
Request format(borrowed and expanded from KeyUploadServlet):

POST /_synapse/replication/upload_keys_for_user

{
"user_id": "<user_id>",
"device_id": "<device_id>",
"keys": {
"device_keys": {
"user_id": "<user_id>",
"device_id": "<device_id>",
"valid_until_ts": <millisecond_timestamp>,
"algorithms": [
"m.olm.curve25519-aes-sha2",
]
"keys": {
"<algorithm>:<device_id>": "<key_base64>",
},
"signatures:" {
"<user_id>" {
"<algorithm>:<device_id>": "<signature_base64>"
}
}
},
"fallback_keys": {
"<algorithm>:<device_id>": "<key_base64>",
"signed_<algorithm>:<device_id>": {
"fallback": true,
"key": "<key_base64>",
"signatures": {
"<user_id>": {
"<algorithm>:<device_id>": "<key_base64>"
}
}
}
}
"one_time_keys": {
"<algorithm>:<key_id>": "<key_base64>"
},
}
}
Response is equivalent to ` /_matrix/client/v3/keys/upload`
response, e.g.:

{
"one_time_key_counts": {
"curve25519": 10,
"signed_curve25519": 20
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

borrowed and expanded from KeyUploadServlet

KeyUploadServlet presumably refers to client/keys.py and not generic_worker (whose KeyUploadServelet was removed above).

I think it makes more sense to

  • update the docstring for KeyUploadServelet with your changes
  • have this docstring point us to that one. "See KeyUploadServelet for format of the request body and response body."
  • keep the POST /_synapse/replication/upload_keys_for_user line here

Copy link
Contributor Author

@realtyem realtyem Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's slightly different though. Because of the passing of the information in the _serialize_json() _serialize_payload() includes having the user_id and the device_id which isn't part of the same request as the original prepended to the beginning of the request...(I think). Plus it wraps it into a keys object as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a minute, but I see what you mean. How about f2c780b?

"""

NAME = "upload_keys_for_user"
PATH_ARGS = ()
CACHE = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.e2e_keys_handler = hs.get_e2e_keys_handler()
self.store = hs.get_datastores().main
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload( # type: ignore[override]
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
user_id: str, device_id: str, keys: JsonDict
) -> JsonDict:

return {
"user_id": user_id,
"device_id": device_id,
"keys": keys,
}

async def _handle_request( # type: ignore[override]
self, request: Request
) -> Tuple[int, JsonDict]:
content = parse_json_object_from_request(request)

user_id = content["user_id"]
device_id = content["device_id"]
keys = content["keys"]

results = await self.e2e_keys_handler.upload_keys_for_user(
user_id, device_id, keys
)

return 200, results


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationUploadKeysForUserRestServlet(hs).register(http_server)
12 changes: 10 additions & 2 deletions synapse/rest/client/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import log_kv, set_tag
from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet
from synapse.rest.client._base import client_patterns, interactive_auth_handler
from synapse.types import JsonDict, StreamToken
from synapse.util.cancellation import cancellable
Expand Down Expand Up @@ -71,6 +72,13 @@ def __init__(self, hs: "HomeServer"):
self.e2e_keys_handler = hs.get_e2e_keys_handler()
self.device_handler = hs.get_device_handler()

if hs.config.worker.worker_app is None:
# if main process
self.key_uploader = self.e2e_keys_handler.upload_keys_for_user
else:
# then a worker
self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs)

async def on_POST(
self, request: SynapseRequest, device_id: Optional[str]
) -> Tuple[int, JsonDict]:
Expand Down Expand Up @@ -109,8 +117,8 @@ async def on_POST(
400, "To upload keys, you must pass device_id when authenticating"
)

result = await self.e2e_keys_handler.upload_keys_for_user(
user_id, device_id, body
result = await self.key_uploader(
user_id=user_id, device_id=device_id, keys=body
)
return 200, result

Expand Down