Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Only send out device list updates for our own users (#12465)
Browse files Browse the repository at this point in the history
Broke in #12365
  • Loading branch information
erikjohnston authored Apr 14, 2022
1 parent 535a689 commit 0b014eb
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 8 deletions.
1 change: 1 addition & 0 deletions changelog.d/12465.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Enable processing of device list updates asynchronously.
10 changes: 7 additions & 3 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,9 +649,13 @@ async def _handle_new_device_update_async(self) -> None:
return

for user_id, device_id, room_id, stream_id, opentracing_context in rows:
joined_user_ids = await self.store.get_users_in_room(room_id)
hosts = {get_domain_from_id(u) for u in joined_user_ids}
hosts.discard(self.server_name)
hosts = set()

# Ignore any users that aren't ours
if self.hs.is_mine_id(user_id):
joined_user_ids = await self.store.get_users_in_room(room_id)
hosts = {get_domain_from_id(u) for u in joined_user_ids}
hosts.discard(self.server_name)

# Check if we've already sent this update to some hosts
if current_stream_id == stream_id:
Expand Down
4 changes: 3 additions & 1 deletion synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1703,7 +1703,9 @@ def _add_device_outbound_poke_to_stream_txn(
next(stream_id_iterator),
user_id,
device_id,
False,
not self.hs.is_mine_id(
user_id
), # We only need to send out update for *our* users
now,
encoded_context if whitelisted_homeserver(destination) else "{}",
)
Expand Down
43 changes: 42 additions & 1 deletion tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):

def make_homeserver(self, reactor, clock):
return self.setup_test_homeserver(
federation_transport_client=Mock(spec=["send_transaction"]),
federation_transport_client=Mock(
spec=["send_transaction", "query_user_devices"]
),
)

def default_config(self):
Expand Down Expand Up @@ -218,6 +220,45 @@ def test_send_device_updates(self):
self.assertEqual(len(self.edus), 1)
self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id)

def test_dont_send_device_updates_for_remote_users(self):
"""Check that we don't send device updates for remote users"""

# Send the server a device list EDU for the other user, this will cause
# it to try and resync the device lists.
self.hs.get_federation_transport_client().query_user_devices.return_value = (
defer.succeed(
{
"stream_id": "1",
"user_id": "@user2:host2",
"devices": [{"device_id": "D1"}],
}
)
)

self.get_success(
self.hs.get_device_handler().device_list_updater.incoming_device_list_update(
"host2",
{
"user_id": "@user2:host2",
"device_id": "D1",
"stream_id": "1",
"prev_ids": [],
},
)
)

self.reactor.advance(1)

# We shouldn't see an EDU for that update
self.assertEqual(self.edus, [])

# Check that we did successfully process the inbound EDU (otherwise this
# test would pass if we failed to process the EDU)
devices = self.get_success(
self.hs.get_datastores().main.get_cached_devices_for_user("@user2:host2")
)
self.assertIn("D1", devices)

def test_upload_signatures(self):
"""Uploading signatures on some devices should produce updates for that user"""

Expand Down
6 changes: 3 additions & 3 deletions tests/storage/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_get_device_updates_by_remote(self):
device_ids = ["device_id1", "device_id2"]

# Add two device updates with sequential `stream_id`s
self.add_device_change("user_id", device_ids, "somehost")
self.add_device_change("@user_id:test", device_ids, "somehost")

# Get all device updates ever meant for this remote
now_stream_id, device_updates = self.get_success(
Expand All @@ -142,7 +142,7 @@ def test_get_device_updates_by_remote_can_limit_properly(self):
"device_id4",
"device_id5",
]
self.add_device_change("user_id", device_ids, "somehost")
self.add_device_change("@user_id:test", device_ids, "somehost")

# Get device updates meant for this remote
next_stream_id, device_updates = self.get_success(
Expand All @@ -162,7 +162,7 @@ def test_get_device_updates_by_remote_can_limit_properly(self):

# Add some more device updates to ensure it still resumes properly
device_ids = ["device_id6", "device_id7"]
self.add_device_change("user_id", device_ids, "somehost")
self.add_device_change("@user_id:test", device_ids, "somehost")

# Get the next batch of device updates
next_stream_id, device_updates = self.get_success(
Expand Down

0 comments on commit 0b014eb

Please sign in to comment.