Skip to content

Commit

Permalink
Limit amount of replication we send (#17358)
Browse files Browse the repository at this point in the history
Fixes up #17333, where we failed to actually send less data (the
`DISTINCT` didn't work due to `stream_id` being different).

We fix this by making it so that every device list outbound poke for a
given user ID has the same stream ID. We can't change the query to only
return e.g. max stream ID as the receivers look up the destinations to
send to by doing `SELECT WHERE stream_id = ?`
  • Loading branch information
erikjohnston committed Jun 25, 2024
1 parent 554a926 commit c89fea3
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
1 change: 1 addition & 0 deletions changelog.d/17358.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle device lists notifications for large accounts more efficiently in worker mode.
15 changes: 7 additions & 8 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -2131,19 +2131,18 @@ def _add_device_outbound_poke_to_stream_txn(
user_id: str,
device_id: str,
hosts: Collection[str],
stream_ids: List[int],
stream_id: int,
context: Optional[Dict[str, str]],
) -> None:
if self._device_list_federation_stream_cache:
for host in hosts:
txn.call_after(
self._device_list_federation_stream_cache.entity_has_changed,
host,
stream_ids[-1],
stream_id,
)

now = self._clock.time_msec()
stream_id_iterator = iter(stream_ids)

encoded_context = json_encoder.encode(context)
mark_sent = not self.hs.is_mine_id(user_id)
Expand All @@ -2152,7 +2151,7 @@ def _add_device_outbound_poke_to_stream_txn(
(
destination,
self._instance_name,
next(stream_id_iterator),
stream_id,
user_id,
device_id,
mark_sent,
Expand Down Expand Up @@ -2337,22 +2336,22 @@ async def add_device_list_outbound_pokes(
return

def add_device_list_outbound_pokes_txn(
txn: LoggingTransaction, stream_ids: List[int]
txn: LoggingTransaction, stream_id: int
) -> None:
self._add_device_outbound_poke_to_stream_txn(
txn,
user_id=user_id,
device_id=device_id,
hosts=hosts,
stream_ids=stream_ids,
stream_id=stream_id,
context=context,
)

async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
async with self._device_list_id_gen.get_next() as stream_id:
return await self.db_pool.runInteraction(
"add_device_list_outbound_pokes",
add_device_list_outbound_pokes_txn,
stream_ids,
stream_id,
)

async def add_remote_device_list_to_pending(
Expand Down

0 comments on commit c89fea3

Please sign in to comment.