From e2fc0acdfec6d8f2f569ad1580b7b3525fdd5b89 Mon Sep 17 00:00:00 2001 From: Christoph Johannes Kleine Date: Sun, 30 May 2021 20:24:09 +0200 Subject: [PATCH 01/19] fix #3599 --- synapse/storage/databases/main/devices.py | 15 +++++++++++++ ...move_deleted_devices_from_device_inbox.sql | 21 +++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 synapse/storage/schema/main/delta/60/99remove_deleted_devices_from_device_inbox.sql diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 18f07d96dcd0..d09112d91756 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1130,6 +1130,12 @@ async def delete_device(self, user_id: str, device_id: str) -> None: desc="delete_device", ) + await self.db_pool.simple_delete_one( + table="device_inbox", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_device", + ) + self.device_id_exists_cache.invalidate((user_id, device_id)) async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: @@ -1146,6 +1152,15 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: keyvalues={"user_id": user_id, "hidden": False}, desc="delete_devices", ) + + await self.db_pool.simple_delete_many( + table="device_inbox", + column="device_id", + iterable=device_ids, + keyvalues={"user_id": user_id}, + desc="delete_devices", + ) + for device_id in device_ids: self.device_id_exists_cache.invalidate((user_id, device_id)) diff --git a/synapse/storage/schema/main/delta/60/99remove_deleted_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/60/99remove_deleted_devices_from_device_inbox.sql new file mode 100644 index 000000000000..4b069bf0a629 --- /dev/null +++ b/synapse/storage/schema/main/delta/60/99remove_deleted_devices_from_device_inbox.sql @@ -0,0 +1,21 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +--- Remove messages from the device_inbox table which where sent to an +--- allready deleted device. +--- This schould run as the last task, it may take a little bit longer +--- to finish. + +DELETE FROM device_inbox WHERE device_id NOT IN (SELECT device_id FROM devices); From 12d20b8bc7cbf4f2052ed26b003700e37ce9bf32 Mon Sep 17 00:00:00 2001 From: Christoph Johannes Kleine Date: Sun, 30 May 2021 20:37:05 +0200 Subject: [PATCH 02/19] add changelog --- changelog.d/10098.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10098.bugfix diff --git a/changelog.d/10098.bugfix b/changelog.d/10098.bugfix new file mode 100644 index 000000000000..11ab4436e1e8 --- /dev/null +++ b/changelog.d/10098.bugfix @@ -0,0 +1 @@ +Fix a bug where messages in `device_inbox` table where not deleted, when deleting device(s). From 40ee6525d5536856f7c12446ba022400ba8d7cce Mon Sep 17 00:00:00 2001 From: Christoph Johannes Kleine Date: Tue, 1 Jun 2021 17:10:11 +0200 Subject: [PATCH 03/19] move sql file --- .../14remove_deleted_devices_from_device_inbox.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename synapse/storage/schema/main/delta/{60/99remove_deleted_devices_from_device_inbox.sql => 59/14remove_deleted_devices_from_device_inbox.sql} (100%) diff --git a/synapse/storage/schema/main/delta/60/99remove_deleted_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/59/14remove_deleted_devices_from_device_inbox.sql similarity index 100% rename from synapse/storage/schema/main/delta/60/99remove_deleted_devices_from_device_inbox.sql rename to synapse/storage/schema/main/delta/59/14remove_deleted_devices_from_device_inbox.sql From a543a27330c7dd08c2e12243547b4bb51279c630 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Fri, 1 Oct 2021 13:59:37 +0200 Subject: [PATCH 04/19] convert to background task and add tests --- synapse/storage/databases/main/devices.py | 70 +++++++++++++- ...move_deleted_devices_from_device_inbox.sql | 23 +++++ tests/handlers/test_device.py | 30 ++++++ tests/storage/databases/main/test_devices.py | 95 +++++++++++++++++++ 4 files changed, 214 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql create mode 100644 tests/storage/databases/main/test_devices.py diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index ec91671e1e6c..9bdcd2c1511f 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -955,6 +955,11 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._remove_duplicate_outbound_pokes, ) + self.db_pool.updates.register_background_update_handler( + "remove_deleted_devices_from_device_inbox", + self._remove_deleted_devices_from_device_inbox, + ) + # a pair of background updates that were added during the 1.14 release cycle, # but replaced with 58/06dlols_unique_idx.py self.db_pool.updates.register_noop_background_update( @@ -1045,6 +1050,63 @@ def _txn(txn): return rows + async def _remove_deleted_devices_from_device_inbox( + self, progress: dict, batch_size: int + ) -> int: + """A background update that deletes all device_inboxes for deleted devices. + + This should only need to be run once (when users upgrade to v1.45.0) + + Args: + progress: dict used to store progress of this background update + batch_size: the maximum number of rows to retrieve in a single select query + + Returns: + The number of deleted rows + """ + + def _remove_deleted_devices_from_device_inbox_txn( + txn: LoggingTransaction, + ) -> int: + + sql = """ + SELECT user_id, device_id, stream_id + FROM device_inbox + WHERE device_id + NOT IN (SELECT device_id FROM devices) + LIMIT ?; + """ + + txn.execute(sql, (batch_size,)) + rows = txn.fetchall() + + row = None + for row in rows: + self.db_pool.simple_delete_txn( + txn, + "device_inbox", + {"user_id": row[0], "device_id": row[1], "stream_id": row[2]}, + ) + + if row: + self.db_pool.updates._background_update_progress_txn( + txn, "remove_deleted_devices_from_device_inbox", row + ) + + return len(rows) + + number_deleted = await self.db_pool.runInteraction( + "_remove_deleted_devices_from_device_inbox", + _remove_deleted_devices_from_device_inbox_txn, + ) + + if number_deleted < batch_size: + await self.db_pool.updates._end_background_update( + "remove_deleted_devices_from_device_inbox" + ) + + return number_deleted + class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): def __init__(self, database: DatabasePool, db_conn, hs): @@ -1121,7 +1183,7 @@ async def store_device( raise StoreError(500, "Problem storing device.") async def delete_device(self, user_id: str, device_id: str) -> None: - """Delete a device. + """Delete a device and this device_inbox. Args: user_id: The ID of the user which owns the device @@ -1133,10 +1195,10 @@ async def delete_device(self, user_id: str, device_id: str) -> None: desc="delete_device", ) - await self.db_pool.simple_delete_one( + await self.db_pool.simple_delete( table="device_inbox", keyvalues={"user_id": user_id, "device_id": device_id}, - desc="delete_device", + desc="delete_device_inbox", ) self.device_id_exists_cache.invalidate((user_id, device_id)) @@ -1161,7 +1223,7 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: column="device_id", iterable=device_ids, keyvalues={"user_id": user_id}, - desc="delete_devices", + desc="delete_devices_inbox", ) for device_id in device_ids: diff --git a/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql new file mode 100644 index 000000000000..c4d715500cd3 --- /dev/null +++ b/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql @@ -0,0 +1,23 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +--- Remove messages from the device_inbox table which where sent to an +--- allready deleted device. +--- This schould run as background task, it may take a little bit longer +--- to finish. + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6402, 'remove_deleted_devices_from_device_inbox', '{}'); diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 3ac48e5e95f2..7e06e74d431f 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -160,6 +160,36 @@ def test_delete_device(self): # we'd like to check the access token was invalidated, but that's a # bit of a PITA. + def test_delete_device_and_device_inbox(self): + self._record_users() + + # add an device_inbox + self.get_success( + self.store.db_pool.simple_insert( + "device_inbox", + { + "user_id": user1, + "device_id": "abc", + "stream_id": 1, + "message_json": "{}", + }, + ) + ) + + # delete the device + self.get_success(self.handler.delete_device(user1, "abc")) + + # check that the device_inbox was deleted + self.get_failure( + self.store.db_pool.simple_select_one( + table="device_inbox", + keyvalues={"user_id": user1, "device_id": "abc"}, + retcols=("user_id", "device_id"), + desc="get_device_id_from_device_inbox", + ), + synapse.api.errors.StoreError, + ) + def test_update_device(self): self._record_users() diff --git a/tests/storage/databases/main/test_devices.py b/tests/storage/databases/main/test_devices.py new file mode 100644 index 000000000000..0494edf48e8f --- /dev/null +++ b/tests/storage/databases/main/test_devices.py @@ -0,0 +1,95 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.rest import admin +from synapse.rest.client import devices + +from tests.unittest import HomeserverTestCase + + +class DevicesBackgroundUpdateStoreTestCase(HomeserverTestCase): + + servlets = [ + admin.register_servlets, + devices.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.store = hs.get_datastore() + self.user_id = self.register_user("foo", "pass") + + def test_background_remove_deleted_devices_from_device_inbox(self): + """Test that the background task to delete old device_inboxes works properly.""" + + # create a valid device + self.get_success( + self.store.store_device(self.user_id, "cur_device", "display_name") + ) + + # Add device_inbox to devices + self.get_success( + self.store.db_pool.simple_insert( + "device_inbox", + { + "user_id": self.user_id, + "device_id": "cur_device", + "stream_id": 1, + "message_json": "{}", + }, + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + "device_inbox", + { + "user_id": self.user_id, + "device_id": "old_device", + "stream_id": 2, + "message_json": "{}", + }, + ) + ) + + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": "remove_deleted_devices_from_device_inbox", + "progress_json": "{}", + }, + ) + ) + + # ... and tell the DataStore that it hasn't finished all updates yet + self.store.db_pool.updates._all_done = False + + # Now let's actually drive the updates to completion + while not self.get_success( + self.store.db_pool.updates.has_completed_background_updates() + ): + self.get_success( + self.store.db_pool.updates.do_next_background_update(100), by=0.1 + ) + + # Make sure the background task deleted old device_inbox + res = self.get_success( + self.store.db_pool.simple_select_onecol( + table="device_inbox", + keyvalues={}, + retcol="device_id", + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(1, len(res)) From 1994a2af147fb87477b7f614628e15c92b84dbdd Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Fri, 1 Oct 2021 14:06:10 +0200 Subject: [PATCH 05/19] rename newsfile --- changelog.d/{10098.bugfix => 10969.bugfix} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{10098.bugfix => 10969.bugfix} (100%) diff --git a/changelog.d/10098.bugfix b/changelog.d/10969.bugfix similarity index 100% rename from changelog.d/10098.bugfix rename to changelog.d/10969.bugfix From 6b74a0e10772d6650e0d29ea2a80d8a7e43cf63f Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Fri, 1 Oct 2021 18:54:08 +0200 Subject: [PATCH 06/19] apply changes from review --- synapse/storage/databases/main/devices.py | 9 ++++----- tests/handlers/test_device.py | 7 ++++--- tests/storage/databases/main/test_devices.py | 1 + 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 9bdcd2c1511f..4bfe421bdfee 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1051,14 +1051,14 @@ def _txn(txn): return rows async def _remove_deleted_devices_from_device_inbox( - self, progress: dict, batch_size: int + self, progress: JsonDict, batch_size: int ) -> int: """A background update that deletes all device_inboxes for deleted devices. This should only need to be run once (when users upgrade to v1.45.0) Args: - progress: dict used to store progress of this background update + progress: JsonDict used to store progress of this background update batch_size: the maximum number of rows to retrieve in a single select query Returns: @@ -1080,7 +1080,6 @@ def _remove_deleted_devices_from_device_inbox_txn( txn.execute(sql, (batch_size,)) rows = txn.fetchall() - row = None for row in rows: self.db_pool.simple_delete_txn( txn, @@ -1088,7 +1087,7 @@ def _remove_deleted_devices_from_device_inbox_txn( {"user_id": row[0], "device_id": row[1], "stream_id": row[2]}, ) - if row: + if rows: self.db_pool.updates._background_update_progress_txn( txn, "remove_deleted_devices_from_device_inbox", row ) @@ -1183,7 +1182,7 @@ async def store_device( raise StoreError(500, "Problem storing device.") async def delete_device(self, user_id: str, device_id: str) -> None: - """Delete a device and this device_inbox. + """Delete a device and its device_inbox. Args: user_id: The ID of the user which owns the device diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 7e06e74d431f..43031e07ea77 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -180,15 +180,16 @@ def test_delete_device_and_device_inbox(self): self.get_success(self.handler.delete_device(user1, "abc")) # check that the device_inbox was deleted - self.get_failure( + res = self.get_success( self.store.db_pool.simple_select_one( table="device_inbox", keyvalues={"user_id": user1, "device_id": "abc"}, retcols=("user_id", "device_id"), + allow_none=True, desc="get_device_id_from_device_inbox", - ), - synapse.api.errors.StoreError, + ) ) + self.assertIsNone(res) def test_update_device(self): self._record_users() diff --git a/tests/storage/databases/main/test_devices.py b/tests/storage/databases/main/test_devices.py index 0494edf48e8f..82f9384d2f95 100644 --- a/tests/storage/databases/main/test_devices.py +++ b/tests/storage/databases/main/test_devices.py @@ -93,3 +93,4 @@ def test_background_remove_deleted_devices_from_device_inbox(self): ) ) self.assertEqual(1, len(res)) + self.assertEqual(res[0], "cur_device") From 26faaff35f83c2c280310b8c71b7111a50113b1c Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Fri, 1 Oct 2021 19:13:45 +0200 Subject: [PATCH 07/19] Update changelog.d/10969.bugfix Co-authored-by: David Robertson --- changelog.d/10969.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/10969.bugfix b/changelog.d/10969.bugfix index 11ab4436e1e8..89c299b8e8df 100644 --- a/changelog.d/10969.bugfix +++ b/changelog.d/10969.bugfix @@ -1 +1 @@ -Fix a bug where messages in `device_inbox` table where not deleted, when deleting device(s). +Fix a long-standing bug where messages in the `device_inbox` table for deleted devices would persist indefinitely. Contributed by @dklimpel and @JohannesKleine. From d42c17c1abe16eaf75b0efe9bb18add58a480348 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 5 Oct 2021 16:13:13 +0200 Subject: [PATCH 08/19] update background job --- synapse/storage/databases/main/devices.py | 31 ++++++++++++++--------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 4bfe421bdfee..f7e3f7210a0b 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1065,34 +1065,41 @@ async def _remove_deleted_devices_from_device_inbox( The number of deleted rows """ + last_device_id = progress.get("device_id", "") + def _remove_deleted_devices_from_device_inbox_txn( txn: LoggingTransaction, ) -> int: sql = """ - SELECT user_id, device_id, stream_id + SELECT device_id FROM device_inbox WHERE device_id NOT IN (SELECT device_id FROM devices) + AND device_id > ? + ORDER BY device_id ASC LIMIT ?; """ - txn.execute(sql, (batch_size,)) - rows = txn.fetchall() + txn.execute(sql, (last_device_id, batch_size)) + device_ids_to_delete = txn.fetchall() - for row in rows: - self.db_pool.simple_delete_txn( - txn, - "device_inbox", - {"user_id": row[0], "device_id": row[1], "stream_id": row[2]}, - ) + count_deleted_devices = self.db_pool.simple_delete_many_txn( + txn, + "device_inbox", + column="device_id", + values=device_ids_to_delete, + keyvalues={}, + ) - if rows: + if device_ids_to_delete: self.db_pool.updates._background_update_progress_txn( - txn, "remove_deleted_devices_from_device_inbox", row + txn, + "remove_deleted_devices_from_device_inbox", + {"device_id": device_ids_to_delete[-1]}, ) - return len(rows) + return count_deleted_devices number_deleted = await self.db_pool.runInteraction( "_remove_deleted_devices_from_device_inbox", From f484316353a90ebbd60b7414ab48fdd40f6541e8 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 5 Oct 2021 21:17:51 +0200 Subject: [PATCH 09/19] move delete_device(s) to transaction --- synapse/storage/databases/main/devices.py | 56 +++++++++++++---------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index f7e3f7210a0b..910cff2d2171 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1082,7 +1082,7 @@ def _remove_deleted_devices_from_device_inbox_txn( """ txn.execute(sql, (last_device_id, batch_size)) - device_ids_to_delete = txn.fetchall() + device_ids_to_delete = [row[0] for row in txn] count_deleted_devices = self.db_pool.simple_delete_many_txn( txn, @@ -1195,18 +1195,21 @@ async def delete_device(self, user_id: str, device_id: str) -> None: user_id: The ID of the user which owns the device device_id: The ID of the device to delete """ - await self.db_pool.simple_delete_one( - table="devices", - keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, - desc="delete_device", - ) - await self.db_pool.simple_delete( - table="device_inbox", - keyvalues={"user_id": user_id, "device_id": device_id}, - desc="delete_device_inbox", - ) + def _delete_device_txn(txn: LoggingTransaction) -> None: + self.db_pool.simple_delete_one( + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, + desc="delete_device", + ) + self.db_pool.simple_delete( + table="device_inbox", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_device_inbox", + ) + + return await self.db_pool.runInteraction("delete_device", _delete_device_txn) self.device_id_exists_cache.invalidate((user_id, device_id)) async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: @@ -1216,22 +1219,25 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: user_id: The ID of the user which owns the devices device_ids: The IDs of the devices to delete """ - await self.db_pool.simple_delete_many( - table="devices", - column="device_id", - iterable=device_ids, - keyvalues={"user_id": user_id, "hidden": False}, - desc="delete_devices", - ) - await self.db_pool.simple_delete_many( - table="device_inbox", - column="device_id", - iterable=device_ids, - keyvalues={"user_id": user_id}, - desc="delete_devices_inbox", - ) + def _delete_devices_txn(txn: LoggingTransaction) -> None: + self.db_pool.simple_delete_many( + table="devices", + column="device_id", + iterable=device_ids, + keyvalues={"user_id": user_id, "hidden": False}, + desc="delete_devices", + ) + + self.db_pool.simple_delete_many( + table="device_inbox", + column="device_id", + iterable=device_ids, + keyvalues={"user_id": user_id}, + desc="delete_devices_inbox", + ) + return await self.db_pool.runInteraction("delete_devices", _delete_devices_txn) for device_id in device_ids: self.device_id_exists_cache.invalidate((user_id, device_id)) From 1ebfc7a507fc21df25f40e569c02e8e02b2ab09a Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 5 Oct 2021 21:20:56 +0200 Subject: [PATCH 10/19] remove wrong `return` --- synapse/storage/databases/main/devices.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 910cff2d2171..e910a08af1ad 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1209,7 +1209,7 @@ def _delete_device_txn(txn: LoggingTransaction) -> None: desc="delete_device_inbox", ) - return await self.db_pool.runInteraction("delete_device", _delete_device_txn) + await self.db_pool.runInteraction("delete_device", _delete_device_txn) self.device_id_exists_cache.invalidate((user_id, device_id)) async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: @@ -1237,7 +1237,7 @@ def _delete_devices_txn(txn: LoggingTransaction) -> None: desc="delete_devices_inbox", ) - return await self.db_pool.runInteraction("delete_devices", _delete_devices_txn) + await self.db_pool.runInteraction("delete_devices", _delete_devices_txn) for device_id in device_ids: self.device_id_exists_cache.invalidate((user_id, device_id)) From 38ca3c9cbcec9e6f5bde8892be7ed641937ea4e8 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 5 Oct 2021 21:27:30 +0200 Subject: [PATCH 11/19] remove old not needed sql file --- ...move_deleted_devices_from_device_inbox.sql | 21 ------------------- 1 file changed, 21 deletions(-) delete mode 100644 synapse/storage/schema/main/delta/59/14remove_deleted_devices_from_device_inbox.sql diff --git a/synapse/storage/schema/main/delta/59/14remove_deleted_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/59/14remove_deleted_devices_from_device_inbox.sql deleted file mode 100644 index 4b069bf0a629..000000000000 --- a/synapse/storage/schema/main/delta/59/14remove_deleted_devices_from_device_inbox.sql +++ /dev/null @@ -1,21 +0,0 @@ -/* Copyright 2021 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - ---- Remove messages from the device_inbox table which where sent to an ---- allready deleted device. ---- This schould run as the last task, it may take a little bit longer ---- to finish. - -DELETE FROM device_inbox WHERE device_id NOT IN (SELECT device_id FROM devices); From ca72c963110bc6d5b9a60394419e0999b351ed68 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 5 Oct 2021 21:58:52 +0200 Subject: [PATCH 12/19] fix wrong db functions call --- synapse/storage/databases/main/devices.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index e910a08af1ad..257f670a878b 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1197,16 +1197,16 @@ async def delete_device(self, user_id: str, device_id: str) -> None: """ def _delete_device_txn(txn: LoggingTransaction) -> None: - self.db_pool.simple_delete_one( + self.db_pool.simple_delete_one_txn( + txn, table="devices", keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, - desc="delete_device", ) - self.db_pool.simple_delete( + self.db_pool.simple_delete_txn( + txn, table="device_inbox", keyvalues={"user_id": user_id, "device_id": device_id}, - desc="delete_device_inbox", ) await self.db_pool.runInteraction("delete_device", _delete_device_txn) @@ -1221,20 +1221,20 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: """ def _delete_devices_txn(txn: LoggingTransaction) -> None: - self.db_pool.simple_delete_many( + self.db_pool.simple_delete_many_txn( + txn, table="devices", column="device_id", iterable=device_ids, keyvalues={"user_id": user_id, "hidden": False}, - desc="delete_devices", ) - self.db_pool.simple_delete_many( + self.db_pool.simple_delete_many_txn( + txn, table="device_inbox", column="device_id", iterable=device_ids, keyvalues={"user_id": user_id}, - desc="delete_devices_inbox", ) await self.db_pool.runInteraction("delete_devices", _delete_devices_txn) From 33e366d16476efea468cc660e243d106385e3613 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 5 Oct 2021 22:13:59 +0200 Subject: [PATCH 13/19] fix error with attribut names --- synapse/storage/databases/main/devices.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 257f670a878b..cecac4b5b37a 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1225,7 +1225,7 @@ def _delete_devices_txn(txn: LoggingTransaction) -> None: txn, table="devices", column="device_id", - iterable=device_ids, + values=device_ids, keyvalues={"user_id": user_id, "hidden": False}, ) @@ -1233,7 +1233,7 @@ def _delete_devices_txn(txn: LoggingTransaction) -> None: txn, table="device_inbox", column="device_id", - iterable=device_ids, + values=device_ids, keyvalues={"user_id": user_id}, ) From e6784f207ab5906e788c1ae56d97691b1829beb5 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Wed, 6 Oct 2021 20:33:31 +0200 Subject: [PATCH 14/19] move bg task from `devices` to `deviceinbox` --- synapse/storage/databases/main/deviceinbox.py | 69 +++++++++++++++++++ synapse/storage/databases/main/devices.py | 68 ------------------ .../{test_devices.py => test_deviceinbox.py} | 10 +-- 3 files changed, 71 insertions(+), 76 deletions(-) rename tests/storage/databases/main/{test_devices.py => test_deviceinbox.py} (88%) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3154906d45f6..94b98cf6afe0 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -552,6 +552,7 @@ def _add_messages_to_local_device_inbox_txn( class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" + REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox" def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) @@ -567,6 +568,11 @@ def __init__(self, database: DatabasePool, db_conn, hs): self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) + self.db_pool.updates.register_background_update_handler( + self.REMOVE_DELETED_DEVICES, + self._remove_deleted_devices_from_device_inbox, + ) + async def _background_drop_index_device_inbox(self, progress, batch_size): def reindex_txn(conn): txn = conn.cursor() @@ -579,6 +585,69 @@ def reindex_txn(conn): return 1 + async def _remove_deleted_devices_from_device_inbox( + self, progress: JsonDict, batch_size: int + ) -> int: + """A background update that deletes all device_inboxes for deleted devices. + + This should only need to be run once (when users upgrade to v1.45.0) + + Args: + progress: JsonDict used to store progress of this background update + batch_size: the maximum number of rows to retrieve in a single select query + + Returns: + The number of deleted rows + """ + + last_device_id = progress.get("device_id", "") + + def _remove_deleted_devices_from_device_inbox_txn( + txn: LoggingTransaction, + ) -> int: + + sql = """ + SELECT device_id + FROM device_inbox + WHERE device_id + NOT IN (SELECT device_id FROM devices) + AND device_id > ? + ORDER BY device_id ASC + LIMIT ?; + """ + + txn.execute(sql, (last_device_id, batch_size)) + device_ids_to_delete = [row[0] for row in txn] + + count_deleted_devices = self.db_pool.simple_delete_many_txn( + txn, + "device_inbox", + column="device_id", + values=device_ids_to_delete, + keyvalues={}, + ) + + if device_ids_to_delete: + self.db_pool.updates._background_update_progress_txn( + txn, + self.REMOVE_DELETED_DEVICES, + {"device_id": device_ids_to_delete[-1]}, + ) + + return count_deleted_devices + + number_deleted = await self.db_pool.runInteraction( + "_remove_deleted_devices_from_device_inbox", + _remove_deleted_devices_from_device_inbox_txn, + ) + + if number_deleted < batch_size: + await self.db_pool.updates._end_background_update( + self.REMOVE_DELETED_DEVICES + ) + + return number_deleted + class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): pass diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index cecac4b5b37a..7ad91b398a8b 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -955,11 +955,6 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._remove_duplicate_outbound_pokes, ) - self.db_pool.updates.register_background_update_handler( - "remove_deleted_devices_from_device_inbox", - self._remove_deleted_devices_from_device_inbox, - ) - # a pair of background updates that were added during the 1.14 release cycle, # but replaced with 58/06dlols_unique_idx.py self.db_pool.updates.register_noop_background_update( @@ -1050,69 +1045,6 @@ def _txn(txn): return rows - async def _remove_deleted_devices_from_device_inbox( - self, progress: JsonDict, batch_size: int - ) -> int: - """A background update that deletes all device_inboxes for deleted devices. - - This should only need to be run once (when users upgrade to v1.45.0) - - Args: - progress: JsonDict used to store progress of this background update - batch_size: the maximum number of rows to retrieve in a single select query - - Returns: - The number of deleted rows - """ - - last_device_id = progress.get("device_id", "") - - def _remove_deleted_devices_from_device_inbox_txn( - txn: LoggingTransaction, - ) -> int: - - sql = """ - SELECT device_id - FROM device_inbox - WHERE device_id - NOT IN (SELECT device_id FROM devices) - AND device_id > ? - ORDER BY device_id ASC - LIMIT ?; - """ - - txn.execute(sql, (last_device_id, batch_size)) - device_ids_to_delete = [row[0] for row in txn] - - count_deleted_devices = self.db_pool.simple_delete_many_txn( - txn, - "device_inbox", - column="device_id", - values=device_ids_to_delete, - keyvalues={}, - ) - - if device_ids_to_delete: - self.db_pool.updates._background_update_progress_txn( - txn, - "remove_deleted_devices_from_device_inbox", - {"device_id": device_ids_to_delete[-1]}, - ) - - return count_deleted_devices - - number_deleted = await self.db_pool.runInteraction( - "_remove_deleted_devices_from_device_inbox", - _remove_deleted_devices_from_device_inbox_txn, - ) - - if number_deleted < batch_size: - await self.db_pool.updates._end_background_update( - "remove_deleted_devices_from_device_inbox" - ) - - return number_deleted - class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): def __init__(self, database: DatabasePool, db_conn, hs): diff --git a/tests/storage/databases/main/test_devices.py b/tests/storage/databases/main/test_deviceinbox.py similarity index 88% rename from tests/storage/databases/main/test_devices.py rename to tests/storage/databases/main/test_deviceinbox.py index 82f9384d2f95..4cfd2677f7ae 100644 --- a/tests/storage/databases/main/test_devices.py +++ b/tests/storage/databases/main/test_deviceinbox.py @@ -18,7 +18,7 @@ from tests.unittest import HomeserverTestCase -class DevicesBackgroundUpdateStoreTestCase(HomeserverTestCase): +class DeviceInboxBackgroundUpdateStoreTestCase(HomeserverTestCase): servlets = [ admin.register_servlets, @@ -75,13 +75,7 @@ def test_background_remove_deleted_devices_from_device_inbox(self): # ... and tell the DataStore that it hasn't finished all updates yet self.store.db_pool.updates._all_done = False - # Now let's actually drive the updates to completion - while not self.get_success( - self.store.db_pool.updates.has_completed_background_updates() - ): - self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 - ) + self.wait_for_background_updates() # Make sure the background task deleted old device_inbox res = self.get_success( From b3cd3422bad579a90e2422239dac76af6f7f76c7 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Wed, 6 Oct 2021 20:35:28 +0200 Subject: [PATCH 15/19] update `delete_device` --- synapse/storage/databases/main/devices.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 7ad91b398a8b..5abe20fc1a15 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1128,21 +1128,7 @@ async def delete_device(self, user_id: str, device_id: str) -> None: device_id: The ID of the device to delete """ - def _delete_device_txn(txn: LoggingTransaction) -> None: - self.db_pool.simple_delete_one_txn( - txn, - table="devices", - keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, - ) - - self.db_pool.simple_delete_txn( - txn, - table="device_inbox", - keyvalues={"user_id": user_id, "device_id": device_id}, - ) - - await self.db_pool.runInteraction("delete_device", _delete_device_txn) - self.device_id_exists_cache.invalidate((user_id, device_id)) + await self.delete_devices(user_id, [device_id]) async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: """Deletes several devices. From 9a849bd7d6d99be3704c4f27cd3b8524fc0fd643 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Fri, 8 Oct 2021 16:59:31 +0200 Subject: [PATCH 16/19] update bg task to new sql --- synapse/storage/databases/main/deviceinbox.py | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 94b98cf6afe0..44c89d436acf 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -19,9 +19,10 @@ from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator +from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -600,41 +601,40 @@ async def _remove_deleted_devices_from_device_inbox( The number of deleted rows """ - last_device_id = progress.get("device_id", "") - def _remove_deleted_devices_from_device_inbox_txn( txn: LoggingTransaction, ) -> int: sql = """ - SELECT device_id - FROM device_inbox - WHERE device_id - NOT IN (SELECT device_id FROM devices) - AND device_id > ? - ORDER BY device_id ASC - LIMIT ?; + WITH get_devices AS + (SELECT device_inbox.device_id, device_inbox.user_id + FROM device_inbox + WHERE (device_inbox.device_id, device_inbox.user_id) + NOT IN + (SELECT device_id, user_id FROM devices) + LIMIT ?) + SELECT DISTINCT * FROM get_devices; """ - txn.execute(sql, (last_device_id, batch_size)) - device_ids_to_delete = [row[0] for row in txn] + txn.execute(sql, (batch_size,)) + rows = txn.fetchall() - count_deleted_devices = self.db_pool.simple_delete_many_txn( - txn, - "device_inbox", - column="device_id", - values=device_ids_to_delete, - keyvalues={}, - ) + num_deleted = 0 + for row in rows: + num_deleted += self.db_pool.simple_delete_txn( + txn, + "device_inbox", + {"device_id": row[0], "user_id": row[1]}, + ) - if device_ids_to_delete: + if rows: self.db_pool.updates._background_update_progress_txn( txn, self.REMOVE_DELETED_DEVICES, - {"device_id": device_ids_to_delete[-1]}, + {"device_id": rows[-1][0], "user_id": rows[-1][1]}, ) - return count_deleted_devices + return num_deleted number_deleted = await self.db_pool.runInteraction( "_remove_deleted_devices_from_device_inbox", From c4e92f31ad6bb8530471fa62864f9172f071a6e9 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Fri, 8 Oct 2021 21:08:02 +0200 Subject: [PATCH 17/19] fix end condition of bg task --- synapse/storage/databases/main/deviceinbox.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 44c89d436acf..15d90bc31551 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -641,7 +641,12 @@ def _remove_deleted_devices_from_device_inbox_txn( _remove_deleted_devices_from_device_inbox_txn, ) - if number_deleted < batch_size: + # The task is finished when no more lines are deleted. + # The `batch_size` only specifies how many devices are cleaned per run. + # More than one line is deleted in the deviceinbox per run and device, + # so it is possible that the number of deleted lines is larger + # than the batch size. + if not number_deleted: await self.db_pool.updates._end_background_update( self.REMOVE_DELETED_DEVICES ) From c17eb78a056afcb9c6d76c9baa2c49b91a8a0444 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Thu, 14 Oct 2021 16:06:31 +0200 Subject: [PATCH 18/19] Apply suggestions from code review Co-authored-by: Patrick Cloke --- .../64/02remove_deleted_devices_from_device_inbox.sql | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql index c4d715500cd3..efe702f6210b 100644 --- a/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql +++ b/synapse/storage/schema/main/delta/64/02remove_deleted_devices_from_device_inbox.sql @@ -14,10 +14,9 @@ */ ---- Remove messages from the device_inbox table which where sent to an ---- allready deleted device. ---- This schould run as background task, it may take a little bit longer ---- to finish. +-- Remove messages from the device_inbox table which were orphaned +-- when a device was deleted using Synapse earlier than 1.46.0. +-- This runs as background task, but may take a bit to finish. INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (6402, 'remove_deleted_devices_from_device_inbox', '{}'); From 8e7f8fb9729e2d78c203658939a5ba9bc9bdf2a7 Mon Sep 17 00:00:00 2001 From: dklimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 26 Oct 2021 12:32:31 +0200 Subject: [PATCH 19/19] update bg task after review to use `stream_id` --- synapse/storage/databases/main/deviceinbox.py | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 15d90bc31551..42793e81f8ef 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -591,7 +591,7 @@ async def _remove_deleted_devices_from_device_inbox( ) -> int: """A background update that deletes all device_inboxes for deleted devices. - This should only need to be run once (when users upgrade to v1.45.0) + This should only need to be run once (when users upgrade to v1.46.0) Args: progress: JsonDict used to store progress of this background update @@ -604,19 +604,31 @@ async def _remove_deleted_devices_from_device_inbox( def _remove_deleted_devices_from_device_inbox_txn( txn: LoggingTransaction, ) -> int: + """stream_id is not unique + we need to use an inclusive `stream_id >= ?` clause, + since we might not have deleted all dead device messages for the stream_id + returned from the previous query + + Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, + to avoid problems of deleting a large number of rows all at once + due to a single device having lots of device messages. + """ + + last_stream_id = progress.get("stream_id", 0) sql = """ - WITH get_devices AS - (SELECT device_inbox.device_id, device_inbox.user_id - FROM device_inbox - WHERE (device_inbox.device_id, device_inbox.user_id) - NOT IN - (SELECT device_id, user_id FROM devices) - LIMIT ?) - SELECT DISTINCT * FROM get_devices; + SELECT device_id, user_id, stream_id + FROM device_inbox + WHERE + stream_id >= ? + AND (device_id, user_id) NOT IN ( + SELECT device_id, user_id FROM devices + ) + ORDER BY stream_id + LIMIT ? """ - txn.execute(sql, (batch_size,)) + txn.execute(sql, (last_stream_id, batch_size)) rows = txn.fetchall() num_deleted = 0 @@ -624,14 +636,22 @@ def _remove_deleted_devices_from_device_inbox_txn( num_deleted += self.db_pool.simple_delete_txn( txn, "device_inbox", - {"device_id": row[0], "user_id": row[1]}, + {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, ) if rows: + # send more than stream_id to progress + # otherwise it can happen in large deployments that + # no change of status is visible in the log file + # it may be that the stream_id does not change in several runs self.db_pool.updates._background_update_progress_txn( txn, self.REMOVE_DELETED_DEVICES, - {"device_id": rows[-1][0], "user_id": rows[-1][1]}, + { + "device_id": rows[-1][0], + "user_id": rows[-1][1], + "stream_id": rows[-1][2], + }, ) return num_deleted @@ -642,10 +662,6 @@ def _remove_deleted_devices_from_device_inbox_txn( ) # The task is finished when no more lines are deleted. - # The `batch_size` only specifies how many devices are cleaned per run. - # More than one line is deleted in the deviceinbox per run and device, - # so it is possible that the number of deleted lines is larger - # than the batch size. if not number_deleted: await self.db_pool.updates._end_background_update( self.REMOVE_DELETED_DEVICES