Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix caching devices for remote servers in worker. #6332

Merged
merged 3 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6332.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix caching devices for remote users when using workers, so that we don't attempt to refetch (and potentially fail) each time a user requests devices.
19 changes: 16 additions & 3 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import (
UserID,
get_domain_from_id,
Expand All @@ -53,6 +54,12 @@ def __init__(self, hs):

self._edu_updater = SigningKeyEduUpdater(hs, self)

self._is_master = hs.config.worker_app is None
if not self._is_master:
self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
hs
)

federation_registry = hs.get_federation_registry()

# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
Expand Down Expand Up @@ -191,9 +198,15 @@ def do_remote_query(destination):
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
if self._is_master:
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
else:
user_devices = yield self._user_device_resync_client(
user_id=user_id
)

user_devices = user_devices["devices"]
for device in user_devices:
results[user_id] = {device["device_id"]: device["keys"]}
Expand Down
10 changes: 9 additions & 1 deletion synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@
# limitations under the License.

from synapse.http.server import JsonResource
from synapse.replication.http import federation, login, membership, register, send_event
from synapse.replication.http import (
devices,
federation,
login,
membership,
register,
send_event,
)

REPLICATION_PREFIX = "/_synapse/replication"

Expand All @@ -30,3 +37,4 @@ def register_servlets(self, hs):
federation.register_servlets(hs, self)
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
73 changes: 73 additions & 0 deletions synapse/replication/http/devices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from synapse.replication.http._base import ReplicationEndpoint

logger = logging.getLogger(__name__)


class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for a user by contacting their
server.
This must happen on master so that the results can be correctly cached in
the database and streamed to workers.
Request format:
POST /_synapse/replication/user_device_resync/:user_id
{}
Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
response, e.g.:
{
"user_id": "@alice:example.org",
"devices": [
{
"device_id": "JLAFKJWSCS",
"keys": { ... },
"device_display_name": "Alice's Mobile Phone"
}
]
}
"""

NAME = "user_device_resync"
PATH_ARGS = ("user_id",)
CACHE = False

def __init__(self, hs):
super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)

self.device_list_updater = hs.get_device_handler().device_list_updater
self.store = hs.get_datastore()
self.clock = hs.get_clock()

@staticmethod
def _serialize_payload(user_id):
return {}

async def _handle_request(self, request, user_id):
user_devices = await self.device_list_updater.user_device_resync(user_id)

return 200, user_devices


def register_servlets(hs, http_server):
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)