Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix caching devices for remote servers in worker. #6332

Merged
merged 3 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6332.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix caching devices for remote users when using workers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit cryptic. Perhaps you can explain what the user-visible symptoms were?

19 changes: 16 additions & 3 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import (
UserID,
get_domain_from_id,
Expand All @@ -53,6 +54,12 @@ def __init__(self, hs):

self._edu_updater = SigningKeyEduUpdater(hs, self)

self._is_master = hs.config.worker_app is None
if not self._is_master:
self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
hs
)

federation_registry = hs.get_federation_registry()

# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
Expand Down Expand Up @@ -191,9 +198,15 @@ def do_remote_query(destination):
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
if self._is_master:
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
else:
user_devices = yield self._user_device_resync_client(
user_id=user_id
)

user_devices = user_devices["devices"]
for device in user_devices:
results[user_id] = {device["device_id"]: device["keys"]}
Expand Down
10 changes: 9 additions & 1 deletion synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@
# limitations under the License.

from synapse.http.server import JsonResource
from synapse.replication.http import federation, login, membership, register, send_event
from synapse.replication.http import (
devices,
federation,
login,
membership,
register,
send_event,
)

REPLICATION_PREFIX = "/_synapse/replication"

Expand All @@ -30,3 +37,4 @@ def register_servlets(self, hs):
federation.register_servlets(hs, self)
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
69 changes: 69 additions & 0 deletions synapse/replication/http/devices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from synapse.replication.http._base import ReplicationEndpoint

logger = logging.getLogger(__name__)


class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Notifies that a user has joined or left the room
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't seem to be about a notification? It seems to be a request to fetch the user's devices from the remote server?

We should probably also record here (or at least somewhere in the code comments, rather than just the PR where it will get lost) why it is important for this to happen on the master.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, failed to delete the docstring when c+p'ed and then was like "yay i've already written a docstring" 🤦‍♂️


Request format:

POST /_synapse/replication/user_device_resync/:user_id

{}

Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
response, e.g.:

{
"user_id": "@alice:example.org",
"devices": [
{
"device_id": "JLAFKJWSCS",
"keys": { ... },
"device_display_name": "Alice's Mobile Phone"
}
]
}
"""

NAME = "user_device_resync"
PATH_ARGS = ("user_id",)
CACHE = False

def __init__(self, hs):
super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)

self.device_list_updater = hs.get_device_handler().device_list_updater
self.store = hs.get_datastore()
self.clock = hs.get_clock()

@staticmethod
def _serialize_payload(user_id):
return {}

async def _handle_request(self, request, user_id):
user_devices = await self.device_list_updater.user_device_resync(user_id)

return 200, user_devices


def register_servlets(hs, http_server):
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)