diff --git a/changelog.d/6786.misc b/changelog.d/6786.misc new file mode 100644 index 000000000000..94c692e53a7b --- /dev/null +++ b/changelog.d/6786.misc @@ -0,0 +1 @@ +Attempt to resync remote users' devices when detected as stale. diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 5c5fe77be2e1..05c4b3eec0e9 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -21,6 +21,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( get_active_span_text_map, log_kv, @@ -48,6 +49,8 @@ def __init__(self, hs): "m.direct_to_device", self.on_direct_to_device_edu ) + self._device_list_updater = hs.get_device_handler().device_list_updater + @defer.inlineCallbacks def on_direct_to_device_edu(self, origin, content): local_messages = {} @@ -134,8 +137,11 @@ def _check_for_unknown_devices( unknown_devices, ) yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id) - # TODO: Poke something to start trying to refetch user's - # keys. + + # Immediately attempt a resync in the background + run_in_background( + self._device_list_updater.user_device_resync, sender_user_id + ) @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 754fd802c57c..3e668c2424c7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -57,6 +57,7 @@ run_in_background, ) from synapse.logging.utils import log_function +from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, ReplicationFederationSendEventsRestServlet, @@ -156,6 +157,13 @@ def __init__(self, hs): hs ) + if hs.config.worker_app: + self._user_device_resync = ReplicationUserDevicesResyncRestServlet.make_client( + hs + ) + else: + self._device_list_updater = hs.get_device_handler().device_list_updater + # When joining a room we need to queue any events for that room up self.room_queues = {} self._room_pdu_linearizer = Linearizer("fed_room_pdu") @@ -763,8 +771,14 @@ async def _process_received_pdu( await self.store.mark_remote_user_device_cache_as_stale( event.sender ) - # TODO: Poke something to start trying to refetch user's - # keys. + + # Immediately attempt a resync in the background + if self.config.worker_app: + return run_in_background(self._user_device_resync, event.sender) + else: + return run_in_background( + self._device_list_updater.user_device_resync, event.sender + ) @log_function async def backfill(self, dest, room_id, limit, extremities): diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 596ddc697050..68b9847bd2b2 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -81,6 +81,9 @@ def make_homeserver(self, reactor, clock): ] ) + # the tests assume that we are starting at unix time 1000 + reactor.pump((1000,)) + hs = self.setup_test_homeserver( notifier=Mock(), http_client=mock_federation_client, keyring=mock_keyring ) @@ -90,9 +93,6 @@ def make_homeserver(self, reactor, clock): return hs def prepare(self, reactor, clock, hs): - # the tests assume that we are starting at unix time 1000 - reactor.pump((1000,)) - mock_notifier = hs.get_notifier() self.on_new_event = mock_notifier.on_new_event