Skip to content

Commit

Permalink
Clean out invalid destinations from outbox (element-hq#17242)
Browse files Browse the repository at this point in the history
We started ensuring we only insert valid destinations:
element-hq#17240
  • Loading branch information
erikjohnston authored and H-Shay committed May 31, 2024
1 parent 0c4451a commit 8e0e1d5
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/17242.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean out invalid destinations from `device_federation_outbox` table.
76 changes: 76 additions & 0 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from synapse.util import json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.stringutils import parse_and_validate_server_name

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -964,6 +965,7 @@ def _add_messages_to_local_device_inbox_txn(
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
CLEANUP_DEVICE_FEDERATION_OUTBOX = "cleanup_device_federation_outbox"

def __init__(
self,
Expand All @@ -989,6 +991,11 @@ def __init__(
self._remove_dead_devices_from_device_inbox,
)

self.db_pool.updates.register_background_update_handler(
self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
self._cleanup_device_federation_outbox,
)

async def _background_drop_index_device_inbox(
self, progress: JsonDict, batch_size: int
) -> int:
Expand Down Expand Up @@ -1080,6 +1087,75 @@ def _remove_dead_devices_from_device_inbox_txn(

return batch_size

async def _cleanup_device_federation_outbox(
self,
progress: JsonDict,
batch_size: int,
) -> int:
def _cleanup_device_federation_outbox_txn(
txn: LoggingTransaction,
) -> bool:
if "max_stream_id" in progress:
max_stream_id = progress["max_stream_id"]
else:
txn.execute("SELECT max(stream_id) FROM device_federation_outbox")
res = cast(Tuple[Optional[int]], txn.fetchone())
if res[0] is None:
# this can only happen if the `device_inbox` table is empty, in which
# case we have no work to do.
return True
else:
max_stream_id = res[0]

start = progress.get("stream_id", 0)
stop = start + batch_size

sql = """
SELECT destination FROM device_federation_outbox
WHERE ? < stream_id AND stream_id <= ?
"""

txn.execute(sql, (start, stop))

destinations = {d for d, in txn}
to_remove = set()
for d in destinations:
try:
parse_and_validate_server_name(d)
except ValueError:
to_remove.add(d)

self.db_pool.simple_delete_many_txn(
txn,
table="device_federation_outbox",
column="destination",
values=to_remove,
keyvalues={},
)

self.db_pool.updates._background_update_progress_txn(
txn,
self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
{
"stream_id": stop,
"max_stream_id": max_stream_id,
},
)

return stop >= max_stream_id

finished = await self.db_pool.runInteraction(
"_cleanup_device_federation_outbox",
_cleanup_device_federation_outbox_txn,
)

if finished:
await self.db_pool.updates._end_background_update(
self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
)

return batch_size


class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8504, 'cleanup_device_federation_outbox', '{}');

0 comments on commit 8e0e1d5

Please sign in to comment.