Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove need for worker_main_http_uri setting to use /keys/upload. #14400

Merged
merged 18 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14400.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove the `worker_main_http_uri` configuration setting. This is now handled via internal replication.
realtyem marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 1 addition & 4 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,7 @@
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"shared_extra_conf": {},
"worker_extra_conf": (
"worker_main_http_uri: http://127.0.0.1:%d"
% (MAIN_PROCESS_HTTP_LISTENER_PORT,)
),
"worker_extra_conf": "",
},
"account_data": {
"app": "synapse.app.generic_worker",
Expand Down
7 changes: 3 additions & 4 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ In the config file for each worker, you must specify:
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)).
* If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option
with an `http` listener.
* If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
the main process (`worker_main_http_uri`).
* **Synapse 1.71 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.72 and newer.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

For example:

Expand Down Expand Up @@ -221,7 +221,6 @@ information.
^/_matrix/client/(api/v1|r0|v3|unstable)/search$

# Encryption requests
# Note that ^/_matrix/client/(r0|v3|unstable)/keys/upload/ requires `worker_main_http_uri`
^/_matrix/client/(r0|v3|unstable)/keys/query$
^/_matrix/client/(r0|v3|unstable)/keys/changes$
^/_matrix/client/(r0|v3|unstable)/keys/claim$
Expand Down Expand Up @@ -376,7 +375,7 @@ responsible for
- persisting them to the DB, and finally
- updating the events stream.

Because load is sharded in this way, you *must* restart all worker instances when
Because load is sharded in this way, you *must* restart all worker instances when
adding or removing event persisters.

An `event_persister` should not be mistaken for an `event_creator`.
Expand Down
103 changes: 2 additions & 101 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
# limitations under the License.
import logging
import sys
from typing import Dict, List, Optional, Tuple
from typing import Dict, List

from twisted.internet import address
from twisted.web.resource import Resource

import synapse
import synapse.events
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.api.urls import (
CLIENT_API_PREFIX,
FEDERATION_PREFIX,
Expand All @@ -43,8 +41,6 @@
from synapse.config.server import ListenerConfig
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.server import JsonResource, OptionsResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
Expand All @@ -70,12 +66,12 @@
versions,
voip,
)
from synapse.rest.client._base import client_patterns
from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet
from synapse.rest.client.devices import DevicesRestServlet
from synapse.rest.client.keys import (
KeyChangesServlet,
KeyQueryServlet,
KeyUploadServlet,
OneTimeKeyServlet,
)
from synapse.rest.client.register import (
Expand Down Expand Up @@ -132,107 +128,12 @@
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
from synapse.types import JsonDict
from synapse.util import SYNAPSE_VERSION
from synapse.util.httpresourcetree import create_resource_tree

logger = logging.getLogger("synapse.app.generic_worker")


class KeyUploadServlet(RestServlet):
"""An implementation of the `KeyUploadServlet` that responds to read only
requests, but otherwise proxies through to the master instance.
"""

PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")

def __init__(self, hs: HomeServer):
"""
Args:
hs: server
"""
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.http_client = hs.get_simple_http_client()
self.main_uri = hs.config.worker.worker_main_http_uri

async def on_POST(
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
self, request: SynapseRequest, device_id: Optional[str]
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
user_id = requester.user.to_string()
body = parse_json_object_from_request(request)

if device_id is not None:
# passing the device_id here is deprecated; however, we allow it
# for now for compatibility with older clients.
if requester.device_id is not None and device_id != requester.device_id:
logger.warning(
"Client uploading keys for a different device "
"(logged in as %s, uploading for %s)",
requester.device_id,
device_id,
)
else:
device_id = requester.device_id

if device_id is None:
raise SynapseError(
400, "To upload keys, you must pass device_id when authenticating"
)

if body:
# They're actually trying to upload something, proxy to main synapse.

# Proxy headers from the original request, such as the auth headers
# (in case the access token is there) and the original IP /
# User-Agent of the request.
headers: Dict[bytes, List[bytes]] = {
header: list(request.requestHeaders.getRawHeaders(header, []))
for header in (b"Authorization", b"User-Agent")
}
# Add the previous hop to the X-Forwarded-For header.
x_forwarded_for = list(
request.requestHeaders.getRawHeaders(b"X-Forwarded-For", [])
)
# we use request.client here, since we want the previous hop, not the
# original client (as returned by request.getClientAddress()).
if isinstance(request.client, (address.IPv4Address, address.IPv6Address)):
previous_host = request.client.host.encode("ascii")
# If the header exists, add to the comma-separated list of the first
# instance of the header. Otherwise, generate a new header.
if x_forwarded_for:
x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host]
x_forwarded_for.extend(x_forwarded_for[1:])
else:
x_forwarded_for = [previous_host]
headers[b"X-Forwarded-For"] = x_forwarded_for

# Replicate the original X-Forwarded-Proto header. Note that
# XForwardedForRequest overrides isSecure() to give us the original protocol
# used by the client, as opposed to the protocol used by our upstream proxy
# - which is what we want here.
headers[b"X-Forwarded-Proto"] = [
b"https" if request.isSecure() else b"http"
]

try:
result = await self.http_client.post_json_get_json(
self.main_uri + request.uri.decode("ascii"), body, headers=headers
)
except HttpResponseException as e:
raise e.to_synapse_error() from e
except RequestSendFailed as e:
raise SynapseError(502, "Failed to talk to master") from e

return 200, result
else:
# Just interested in counts.
result = await self.store.count_e2e_one_time_keys(user_id, device_id)
return 200, {"one_time_key_counts": result}


class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
Expand Down
6 changes: 6 additions & 0 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,13 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.worker_name = config.get("worker_name", self.worker_app)
self.instance_name = self.worker_name or "master"

# FIXME: Remove this check after a suitable amount of time.
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
if self.worker_main_http_uri is not None:
logger.warning(
"The config option worker_main_http_uri is unused since Synapse 1.72. "
"It can be safely removed from your configuration."
)

# This option is really only here to support `--manhole` command line
# argument.
Expand Down
67 changes: 67 additions & 0 deletions synapse/replication/http/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from twisted.web.server import Request

from synapse.http.server import HttpServer
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict

Expand Down Expand Up @@ -78,5 +79,71 @@ async def _handle_request( # type: ignore[override]
return 200, user_devices


class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
"""Ask master to upload keys for the user and send them out over federation to
update other servers.

This must happen on master so that the results can be correctly cached in
the database and streamed to workers.( Is this accurate?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect the point is:

  • for now, there must be exactly one worker writing keys (to avoid write races)
  • we haven't found the key writing to be heavy enough that it needs pulling off the master worker
  • so it's written by the master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that specifically the "cached in the database and streamed to workers." is the part I wanted to make sure was accurate. This is a copy-paste dangler. The Replication HTTP Servlet itself does no caching of it's own(the CACHE modifier is set to False below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadly speaking, the database doesn't cache anything; it is the authoritative source of truth for the state of the homeserver. The application maintains various in-memory caches of the database's data. Workers typically receive cache invalidation messages over redis rather than new updates.

DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on
the main process to accomplish this.

Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
Request format(borrowed and expanded from KeyUploadServlet):

POST /_synapse/replication/upload_keys_for_user

{
"user_id": "<user_id>",
"device_id": "<device_id>",
"keys": {
....this part can be found in KeyUploadServlet in rest/client/keys.py....
}
}

Response is equivalent to ` /_matrix/client/v3/keys/upload` found in KeyUploadServlet

"""

NAME = "upload_keys_for_user"
PATH_ARGS = ()
CACHE = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.e2e_keys_handler = hs.get_e2e_keys_handler()
self.store = hs.get_datastores().main
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload( # type: ignore[override]
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
user_id: str, device_id: str, keys: JsonDict
) -> JsonDict:

return {
"user_id": user_id,
"device_id": device_id,
"keys": keys,
}

async def _handle_request( # type: ignore[override]
self, request: Request
) -> Tuple[int, JsonDict]:
content = parse_json_object_from_request(request)

user_id = content["user_id"]
device_id = content["device_id"]
keys = content["keys"]

results = await self.e2e_keys_handler.upload_keys_for_user(
user_id, device_id, keys
)

return 200, results


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationUploadKeysForUserRestServlet(hs).register(http_server)
68 changes: 50 additions & 18 deletions synapse/rest/client/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import log_kv, set_tag
from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet
from synapse.rest.client._base import client_patterns, interactive_auth_handler
from synapse.types import JsonDict, StreamToken
from synapse.util.cancellation import cancellable
Expand All @@ -43,24 +44,48 @@ class KeyUploadServlet(RestServlet):
Content-Type: application/json

{
"device_keys": {
"user_id": "<user_id>",
"device_id": "<device_id>",
"valid_until_ts": <millisecond_timestamp>,
"algorithms": [
"m.olm.curve25519-aes-sha2",
]
"keys": {
"<algorithm>:<device_id>": "<key_base64>",
"device_keys": {
"user_id": "<user_id>",
"device_id": "<device_id>",
"valid_until_ts": <millisecond_timestamp>,
"algorithms": [
"m.olm.curve25519-aes-sha2",
]
"keys": {
"<algorithm>:<device_id>": "<key_base64>",
},
"signatures:" {
"<user_id>" {
"<algorithm>:<device_id>": "<signature_base64>"
}
}
},
"fallback_keys": {
"<algorithm>:<device_id>": "<key_base64>",
"signed_<algorithm>:<device_id>": {
"fallback": true,
"key": "<key_base64>",
"signatures": {
"<user_id>": {
"<algorithm>:<device_id>": "<key_base64>"
}
}
}
}
"one_time_keys": {
"<algorithm>:<key_id>": "<key_base64>"
},
"signatures:" {
"<user_id>" {
"<algorithm>:<device_id>": "<signature_base64>"
} } },
"one_time_keys": {
"<algorithm>:<key_id>": "<key_base64>"
},
}

response, e.g.:

{
"one_time_key_counts": {
"curve25519": 10,
"signed_curve25519": 20
}
}

"""

PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
Expand All @@ -71,6 +96,13 @@ def __init__(self, hs: "HomeServer"):
self.e2e_keys_handler = hs.get_e2e_keys_handler()
self.device_handler = hs.get_device_handler()

if hs.config.worker.worker_app is None:
# if main process
self.key_uploader = self.e2e_keys_handler.upload_keys_for_user
else:
# then a worker
self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs)

async def on_POST(
self, request: SynapseRequest, device_id: Optional[str]
) -> Tuple[int, JsonDict]:
Expand Down Expand Up @@ -109,8 +141,8 @@ async def on_POST(
400, "To upload keys, you must pass device_id when authenticating"
)

result = await self.e2e_keys_handler.upload_keys_for_user(
user_id, device_id, body
result = await self.key_uploader(
user_id=user_id, device_id=device_id, keys=body
)
return 200, result

Expand Down