From 823246d042275ee04fd32d07d15793d6bdaae2a7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 24 Nov 2021 14:47:48 +0000 Subject: [PATCH 01/17] Improve performance of remove_deleted_devices_from_device_inbox --- synapse/storage/databases/main/deviceinbox.py | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 7c0f95336561..b732550796b6 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -653,7 +653,7 @@ async def _remove_deleted_devices_from_device_inbox( def _remove_deleted_devices_from_device_inbox_txn( txn: LoggingTransaction, - ) -> int: + ) -> bool: """stream_id is not unique we need to use an inclusive `stream_id >= ?` clause, since we might not have deleted all dead device messages for the stream_id @@ -664,21 +664,28 @@ def _remove_deleted_devices_from_device_inbox_txn( due to a single device having lots of device messages. """ - last_stream_id = progress.get("stream_id", 0) + txn.execute("SELECT max(stream_id) FROM device_inbox") + row = txn.fetchone() + if row[0] is None: + max_stream_id = 0 + else: + max_stream_id = row[0] + + start = progress.get("stream_id", 0) + stop = start + batch_size sql = """ SELECT device_id, user_id, stream_id FROM device_inbox WHERE - stream_id >= ? + stream_id >= ? AND stream_id < ? AND (device_id, user_id) NOT IN ( SELECT device_id, user_id FROM devices ) ORDER BY stream_id - LIMIT ? """ - txn.execute(sql, (last_stream_id, batch_size)) + txn.execute(sql, (start, stop)) rows = txn.fetchall() num_deleted = 0 @@ -704,20 +711,20 @@ def _remove_deleted_devices_from_device_inbox_txn( }, ) - return num_deleted + return num_deleted, stop >= max_stream_id - number_deleted = await self.db_pool.runInteraction( + number_deleted, finished = await self.db_pool.runInteraction( "_remove_deleted_devices_from_device_inbox", _remove_deleted_devices_from_device_inbox_txn, ) # The task is finished when no more lines are deleted. - if not number_deleted: + if finished: await self.db_pool.updates._end_background_update( self.REMOVE_DELETED_DEVICES ) - return number_deleted + return (number_deleted) async def _remove_hidden_devices_from_device_inbox( self, progress: JsonDict, batch_size: int From e35157f8507f8ea4a6a20af8fde38265f7599457 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 24 Nov 2021 14:48:37 +0000 Subject: [PATCH 02/17] Changelog --- changelog.d/11421.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11421.bugfix diff --git a/changelog.d/11421.bugfix b/changelog.d/11421.bugfix new file mode 100644 index 000000000000..94cf73e03da9 --- /dev/null +++ b/changelog.d/11421.bugfix @@ -0,0 +1 @@ +Improve performance of various background updates related to device inboxes. From 3dd04815f12bd8e98ecf55df8cd3d6da5695d0d5 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 24 Nov 2021 15:04:33 +0000 Subject: [PATCH 03/17] Lint --- synapse/storage/databases/main/deviceinbox.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index b732550796b6..79264a139a34 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -653,7 +653,7 @@ async def _remove_deleted_devices_from_device_inbox( def _remove_deleted_devices_from_device_inbox_txn( txn: LoggingTransaction, - ) -> bool: + ) -> Tuple[int, bool]: """stream_id is not unique we need to use an inclusive `stream_id >= ?` clause, since we might not have deleted all dead device messages for the stream_id @@ -666,7 +666,7 @@ def _remove_deleted_devices_from_device_inbox_txn( txn.execute("SELECT max(stream_id) FROM device_inbox") row = txn.fetchone() - if row[0] is None: + if row is None or row[0] is None: max_stream_id = 0 else: max_stream_id = row[0] @@ -724,7 +724,7 @@ def _remove_deleted_devices_from_device_inbox_txn( self.REMOVE_DELETED_DEVICES ) - return (number_deleted) + return number_deleted async def _remove_hidden_devices_from_device_inbox( self, progress: JsonDict, batch_size: int From 164111f1438edb59b235a7e500fbb03cd25c609f Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 24 Nov 2021 17:58:14 +0100 Subject: [PATCH 04/17] Update changelog.d/11421.bugfix Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/11421.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/11421.bugfix b/changelog.d/11421.bugfix index 94cf73e03da9..28ac65ea7ce3 100644 --- a/changelog.d/11421.bugfix +++ b/changelog.d/11421.bugfix @@ -1 +1 @@ -Improve performance of various background updates related to device inboxes. +Improve performance of various background database schema updates. From 23b0f5e543e4cd6375f7b6849e3276bba7d9e7db Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 24 Nov 2021 17:34:13 +0000 Subject: [PATCH 05/17] Incorporate review comments --- synapse/storage/databases/main/deviceinbox.py | 82 ++++++++++++------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 79264a139a34..cae5bbb93c06 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -654,47 +654,66 @@ async def _remove_deleted_devices_from_device_inbox( def _remove_deleted_devices_from_device_inbox_txn( txn: LoggingTransaction, ) -> Tuple[int, bool]: - """stream_id is not unique - we need to use an inclusive `stream_id >= ?` clause, - since we might not have deleted all dead device messages for the stream_id - returned from the previous query - - Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, + """We delete only rows matching the `(user_id, device_id, stream_id)` tuple, to avoid problems of deleting a large number of rows all at once due to a single device having lots of device messages. """ - txn.execute("SELECT max(stream_id) FROM device_inbox") - row = txn.fetchone() - if row is None or row[0] is None: - max_stream_id = 0 + if "max_stream_id" in progress: + max_stream_id = progress["max_stream_id"] else: - max_stream_id = row[0] + txn.execute("SELECT max(stream_id) FROM device_inbox") + # There's a type mismatch here between how we want to type the row and + # what fetchone says it returns, but we silence it because we know that + # res can't be None. + res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment] + if res[0] is None: + return 0, True + else: + max_stream_id = res[0] start = progress.get("stream_id", 0) stop = start + batch_size - sql = """ - SELECT device_id, user_id, stream_id - FROM device_inbox - WHERE - stream_id >= ? AND stream_id < ? - AND (device_id, user_id) NOT IN ( - SELECT device_id, user_id FROM devices + if self.database_engine.supports_returning: + # If the database engine supports the RETURNING clause, use it and do + # everything in one go. + sql = """ + DELETE FROM device_inbox + WHERE + stream_id >= ? AND stream_id < ? + AND (device_id, user_id) NOT IN ( + SELECT device_id, user_id FROM devices + ) + RETURNING device_id, user_id, stream_id + """ + + txn.execute(sql, (start, stop)) + num_deleted = txn.rowcount + rows = txn.fetchall() + else: + # Otherwise do the select and delete separately. + sql = """ + SELECT device_id, user_id, stream_id + FROM device_inbox + WHERE + stream_id >= ? AND stream_id < ? + AND (device_id, user_id) NOT IN ( + SELECT device_id, user_id FROM devices + ) + ORDER BY stream_id + """ + + txn.execute(sql, (start, stop)) + rows = txn.fetchall() + + num_deleted = 0 + for row in rows: + num_deleted += self.db_pool.simple_delete_txn( + txn, + "device_inbox", + {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, ) - ORDER BY stream_id - """ - - txn.execute(sql, (start, stop)) - rows = txn.fetchall() - - num_deleted = 0 - for row in rows: - num_deleted += self.db_pool.simple_delete_txn( - txn, - "device_inbox", - {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, - ) if rows: # send more than stream_id to progress @@ -708,6 +727,7 @@ def _remove_deleted_devices_from_device_inbox_txn( "device_id": rows[-1][0], "user_id": rows[-1][1], "stream_id": rows[-1][2], + "max_stream_id": max_stream_id, }, ) From 9279e5dca090895a38a995574b473fbffb4815c7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 24 Nov 2021 17:53:11 +0000 Subject: [PATCH 06/17] Do the same change for remove_hidden_devices_from_device_inbox --- synapse/storage/databases/main/deviceinbox.py | 236 ++++++++---------- 1 file changed, 106 insertions(+), 130 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index cae5bbb93c06..f7b6f85ec6d2 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -650,92 +650,12 @@ async def _remove_deleted_devices_from_device_inbox( Returns: The number of deleted rows """ - - def _remove_deleted_devices_from_device_inbox_txn( - txn: LoggingTransaction, - ) -> Tuple[int, bool]: - """We delete only rows matching the `(user_id, device_id, stream_id)` tuple, - to avoid problems of deleting a large number of rows all at once - due to a single device having lots of device messages. - """ - - if "max_stream_id" in progress: - max_stream_id = progress["max_stream_id"] - else: - txn.execute("SELECT max(stream_id) FROM device_inbox") - # There's a type mismatch here between how we want to type the row and - # what fetchone says it returns, but we silence it because we know that - # res can't be None. - res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment] - if res[0] is None: - return 0, True - else: - max_stream_id = res[0] - - start = progress.get("stream_id", 0) - stop = start + batch_size - - if self.database_engine.supports_returning: - # If the database engine supports the RETURNING clause, use it and do - # everything in one go. - sql = """ - DELETE FROM device_inbox - WHERE - stream_id >= ? AND stream_id < ? - AND (device_id, user_id) NOT IN ( - SELECT device_id, user_id FROM devices - ) - RETURNING device_id, user_id, stream_id - """ - - txn.execute(sql, (start, stop)) - num_deleted = txn.rowcount - rows = txn.fetchall() - else: - # Otherwise do the select and delete separately. - sql = """ - SELECT device_id, user_id, stream_id - FROM device_inbox - WHERE - stream_id >= ? AND stream_id < ? - AND (device_id, user_id) NOT IN ( - SELECT device_id, user_id FROM devices - ) - ORDER BY stream_id - """ - - txn.execute(sql, (start, stop)) - rows = txn.fetchall() - - num_deleted = 0 - for row in rows: - num_deleted += self.db_pool.simple_delete_txn( - txn, - "device_inbox", - {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, - ) - - if rows: - # send more than stream_id to progress - # otherwise it can happen in large deployments that - # no change of status is visible in the log file - # it may be that the stream_id does not change in several runs - self.db_pool.updates._background_update_progress_txn( - txn, - self.REMOVE_DELETED_DEVICES, - { - "device_id": rows[-1][0], - "user_id": rows[-1][1], - "stream_id": rows[-1][2], - "max_stream_id": max_stream_id, - }, - ) - - return num_deleted, stop >= max_stream_id - number_deleted, finished = await self.db_pool.runInteraction( "_remove_deleted_devices_from_device_inbox", - _remove_deleted_devices_from_device_inbox_txn, + self._remove_devices_from_device_inbox_txn, + self.REMOVE_DELETED_DEVICES, + progress, + batch_size, ) # The task is finished when no more lines are deleted. @@ -760,35 +680,103 @@ async def _remove_hidden_devices_from_device_inbox( Returns: The number of deleted rows """ + number_deleted, finished = await self.db_pool.runInteraction( + "_remove_hidden_devices_from_device_inbox_txn", + self._remove_devices_from_device_inbox_txn, + self.REMOVE_HIDDEN_DEVICES, + progress, + batch_size, + ) - def _remove_hidden_devices_from_device_inbox_txn( - txn: LoggingTransaction, - ) -> int: - """stream_id is not unique - we need to use an inclusive `stream_id >= ?` clause, - since we might not have deleted all hidden device messages for the stream_id - returned from the previous query - - Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, - to avoid problems of deleting a large number of rows all at once - due to a single device having lots of device messages. - """ + # The task is finished when no more lines are deleted. + if finished: + await self.db_pool.updates._end_background_update( + self.REMOVE_HIDDEN_DEVICES + ) - last_stream_id = progress.get("stream_id", 0) + return number_deleted + + def _remove_devices_from_device_inbox_txn( + self, + txn: LoggingTransaction, + update_name: str, + progress: JsonDict, + batch_size: int, + ) -> Tuple[int, bool]: + """Remove devices that were either deleted or hidden from the device_inbox table. - sql = """ + Args: + update_name: The name of the update to run, used to determine how to read the + devices table. + progress: The update's progress dict. + batch_size: The batch size for this update. + + Returns: + The number of rows deleted, and whether the update should be ended. + """ + + if "max_stream_id" in progress: + max_stream_id = progress["max_stream_id"] + else: + txn.execute("SELECT max(stream_id) FROM device_inbox") + # There's a type mismatch here between how we want to type the row and + # what fetchone says it returns, but we silence it because we know that + # res can't be None. + res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment] + if res[0] is None: + return 0, True + else: + max_stream_id = res[0] + + start = progress.get("stream_id", 0) + stop = start + batch_size + + nested_select = "SELECT device_id, user_id FROM devices" + args = (start, stop) + if update_name == self.REMOVE_HIDDEN_DEVICES: + # If we want to remove hidden devices, select only rows from the devices table + # that have `hidden = TRUE`. + nested_select += " WHERE hidden = ?" + # We need to ignore mypy's error here, otherwise we can't use different + # arguments depending on whether we want to filter for hidden devices. + args += (True,) # type: ignore[assignment] + + if self.database_engine.supports_returning: + # If the database engine supports the RETURNING clause, use it and do + # everything in one go. + sql = ( + """ + DELETE FROM device_inbox + WHERE + stream_id >= ? AND stream_id < ? + AND (device_id, user_id) NOT IN ( + %s + ) + RETURNING device_id, user_id, stream_id + """ + % nested_select + ) + + txn.execute(sql, args) + num_deleted = txn.rowcount + rows = txn.fetchall() + else: + # Otherwise do the select and delete separately. + sql = ( + """ SELECT device_id, user_id, stream_id FROM device_inbox WHERE - stream_id >= ? - AND (device_id, user_id) IN ( - SELECT device_id, user_id FROM devices WHERE hidden = ? + stream_id >= ? AND stream_id < ? + AND (device_id, user_id) NOT IN ( + %s ) ORDER BY stream_id - LIMIT ? """ + % nested_select + ) - txn.execute(sql, (last_stream_id, True, batch_size)) + txn.execute(sql, args) rows = txn.fetchall() num_deleted = 0 @@ -799,35 +787,23 @@ def _remove_hidden_devices_from_device_inbox_txn( {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, ) - if rows: - # We don't just save the `stream_id` in progress as - # otherwise it can happen in large deployments that - # no change of status is visible in the log file, as - # it may be that the stream_id does not change in several runs - self.db_pool.updates._background_update_progress_txn( - txn, - self.REMOVE_HIDDEN_DEVICES, - { - "device_id": rows[-1][0], - "user_id": rows[-1][1], - "stream_id": rows[-1][2], - }, - ) - - return num_deleted - - number_deleted = await self.db_pool.runInteraction( - "_remove_hidden_devices_from_device_inbox", - _remove_hidden_devices_from_device_inbox_txn, - ) - - # The task is finished when no more lines are deleted. - if not number_deleted: - await self.db_pool.updates._end_background_update( - self.REMOVE_HIDDEN_DEVICES + if rows: + # send more than stream_id to progress + # otherwise it can happen in large deployments that + # no change of status is visible in the log file + # it may be that the stream_id does not change in several runs + self.db_pool.updates._background_update_progress_txn( + txn, + update_name, + { + "device_id": rows[-1][0], + "user_id": rows[-1][1], + "stream_id": rows[-1][2], + "max_stream_id": max_stream_id, + }, ) - return number_deleted + return num_deleted, stop >= max_stream_id class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): From 8853a4e1839993dbb090eba67658b61442ff98f7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 24 Nov 2021 18:01:52 +0000 Subject: [PATCH 07/17] Don't use RETURNING --- synapse/storage/databases/main/deviceinbox.py | 64 ++++--------------- 1 file changed, 14 insertions(+), 50 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index f7b6f85ec6d2..1b7c5e85b180 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -741,64 +741,28 @@ def _remove_devices_from_device_inbox_txn( # arguments depending on whether we want to filter for hidden devices. args += (True,) # type: ignore[assignment] - if self.database_engine.supports_returning: - # If the database engine supports the RETURNING clause, use it and do - # everything in one go. - sql = ( - """ - DELETE FROM device_inbox - WHERE - stream_id >= ? AND stream_id < ? - AND (device_id, user_id) NOT IN ( - %s - ) - RETURNING device_id, user_id, stream_id - """ - % nested_select - ) - - txn.execute(sql, args) - num_deleted = txn.rowcount - rows = txn.fetchall() - else: - # Otherwise do the select and delete separately. - sql = ( - """ - SELECT device_id, user_id, stream_id - FROM device_inbox - WHERE - stream_id >= ? AND stream_id < ? - AND (device_id, user_id) NOT IN ( - %s - ) - ORDER BY stream_id + sql = ( """ - % nested_select - ) - - txn.execute(sql, args) - rows = txn.fetchall() - - num_deleted = 0 - for row in rows: - num_deleted += self.db_pool.simple_delete_txn( - txn, - "device_inbox", - {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, + DELETE FROM device_inbox + WHERE + stream_id >= ? AND stream_id < ? + AND (device_id, user_id) NOT IN ( + %s ) + """ + % nested_select + ) + + txn.execute(sql, args) + num_deleted = txn.rowcount + rows = txn.fetchall() if rows: - # send more than stream_id to progress - # otherwise it can happen in large deployments that - # no change of status is visible in the log file - # it may be that the stream_id does not change in several runs self.db_pool.updates._background_update_progress_txn( txn, update_name, { - "device_id": rows[-1][0], - "user_id": rows[-1][1], - "stream_id": rows[-1][2], + "stream_id": stop, "max_stream_id": max_stream_id, }, ) From 6f3f95a1c44931ffef13e4002e7efa75cf40cf9b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 11:44:20 +0000 Subject: [PATCH 08/17] Merge both background updates --- synapse/storage/databases/main/deviceinbox.py | 152 ++++++++---------- .../65/08_device_inbox_background_updates.sql | 18 +++ 2 files changed, 87 insertions(+), 83 deletions(-) create mode 100644 synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 1b7c5e85b180..2f54c12ee0ba 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -599,6 +599,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox" REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox" + REMOVE_DEVICES_FROM_INBOX = "remove_devices_from_device_inbox" def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) @@ -624,6 +625,11 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): self._remove_hidden_devices_from_device_inbox, ) + self.db_pool.updates.register_background_update_handler( + self.REMOVE_DEVICES_FROM_INBOX, + self._remove_devices_from_device_inbox, + ) + async def _background_drop_index_device_inbox(self, progress, batch_size): def reindex_txn(conn): txn = conn.cursor() @@ -650,21 +656,9 @@ async def _remove_deleted_devices_from_device_inbox( Returns: The number of deleted rows """ - number_deleted, finished = await self.db_pool.runInteraction( - "_remove_deleted_devices_from_device_inbox", - self._remove_devices_from_device_inbox_txn, - self.REMOVE_DELETED_DEVICES, - progress, - batch_size, - ) - - # The task is finished when no more lines are deleted. - if finished: - await self.db_pool.updates._end_background_update( - self.REMOVE_DELETED_DEVICES - ) + await self.db_pool.updates._end_background_update(self.REMOVE_DELETED_DEVICES) - return number_deleted + return 0 async def _remove_hidden_devices_from_device_inbox( self, progress: JsonDict, batch_size: int @@ -680,94 +674,86 @@ async def _remove_hidden_devices_from_device_inbox( Returns: The number of deleted rows """ - number_deleted, finished = await self.db_pool.runInteraction( - "_remove_hidden_devices_from_device_inbox_txn", - self._remove_devices_from_device_inbox_txn, - self.REMOVE_HIDDEN_DEVICES, - progress, - batch_size, - ) - - # The task is finished when no more lines are deleted. - if finished: - await self.db_pool.updates._end_background_update( - self.REMOVE_HIDDEN_DEVICES - ) + await self.db_pool.updates._end_background_update(self.REMOVE_HIDDEN_DEVICES) - return number_deleted + return 0 - def _remove_devices_from_device_inbox_txn( + async def _remove_devices_from_device_inbox( self, - txn: LoggingTransaction, - update_name: str, progress: JsonDict, batch_size: int, - ) -> Tuple[int, bool]: - """Remove devices that were either deleted or hidden from the device_inbox table. + ) -> int: + """A background update to remove devices that were either deleted or hidden from + the device_inbox table. Args: - update_name: The name of the update to run, used to determine how to read the - devices table. progress: The update's progress dict. batch_size: The batch size for this update. Returns: - The number of rows deleted, and whether the update should be ended. + The number of rows deleted. """ - if "max_stream_id" in progress: - max_stream_id = progress["max_stream_id"] - else: - txn.execute("SELECT max(stream_id) FROM device_inbox") - # There's a type mismatch here between how we want to type the row and - # what fetchone says it returns, but we silence it because we know that - # res can't be None. - res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment] - if res[0] is None: - return 0, True + def _remove_devices_from_device_inbox_txn( + txn: LoggingTransaction, + ) -> Tuple[int, bool]: + + if "max_stream_id" in progress: + max_stream_id = progress["max_stream_id"] else: - max_stream_id = res[0] - - start = progress.get("stream_id", 0) - stop = start + batch_size - - nested_select = "SELECT device_id, user_id FROM devices" - args = (start, stop) - if update_name == self.REMOVE_HIDDEN_DEVICES: - # If we want to remove hidden devices, select only rows from the devices table - # that have `hidden = TRUE`. - nested_select += " WHERE hidden = ?" - # We need to ignore mypy's error here, otherwise we can't use different - # arguments depending on whether we want to filter for hidden devices. - args += (True,) # type: ignore[assignment] - - sql = ( - """ - DELETE FROM device_inbox - WHERE - stream_id >= ? AND stream_id < ? - AND (device_id, user_id) NOT IN ( - %s + txn.execute("SELECT max(stream_id) FROM device_inbox") + # There's a type mismatch here between how we want to type the row and + # what fetchone says it returns, but we silence it because we know that + # res can't be None. + res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment] + if res[0] is None: + return 0, True + else: + max_stream_id = res[0] + + start = progress.get("stream_id", 0) + stop = start + batch_size + + sql = """ + DELETE FROM device_inbox di + WHERE + stream_id >= ? AND stream_id < ? + AND NOT EXISTS ( + SELECT * FROM devices d + WHERE + d.device_id=di.device_id + AND d.user_id=di.user_id + AND NOT hidden + ) + """ + + txn.execute(sql, (stop, start)) + num_deleted = txn.rowcount + rows = txn.fetchall() + + if rows: + self.db_pool.updates._background_update_progress_txn( + txn, + self.REMOVE_DEVICES_FROM_INBOX, + { + "stream_id": stop, + "max_stream_id": max_stream_id, + }, ) - """ - % nested_select - ) - txn.execute(sql, args) - num_deleted = txn.rowcount - rows = txn.fetchall() + return num_deleted, stop >= max_stream_id - if rows: - self.db_pool.updates._background_update_progress_txn( - txn, - update_name, - { - "stream_id": stop, - "max_stream_id": max_stream_id, - }, + num_deleted, finished = await self.db_pool.runInteraction( + "_remove_devices_from_device_inbox_txn", + _remove_devices_from_device_inbox_txn, + ) + + if finished: + await self.db_pool.updates._end_background_update( + self.REMOVE_DEVICES_FROM_INBOX, ) - return num_deleted, stop >= max_stream_id + return num_deleted class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): diff --git a/synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql b/synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql new file mode 100644 index 000000000000..24814e7f8af7 --- /dev/null +++ b/synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Background update to clear the inboxes of hidden and deleted devices. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6508, 'remove_devices_from_device_inbox', '{}'); From 8547f722981a0edcbb2f7afd370327ef66a68ed6 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 11:48:18 +0000 Subject: [PATCH 09/17] Fix query for SQLite --- synapse/storage/databases/main/deviceinbox.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 2f54c12ee0ba..4a6748c1d282 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -715,11 +715,11 @@ def _remove_devices_from_device_inbox_txn( stop = start + batch_size sql = """ - DELETE FROM device_inbox di + DELETE FROM device_inbox AS di WHERE stream_id >= ? AND stream_id < ? AND NOT EXISTS ( - SELECT * FROM devices d + SELECT * FROM devices AS d WHERE d.device_id=di.device_id AND d.user_id=di.user_id From 9fdaf52d441fea99dcb5bf7bed8cc8c75a2f51ff Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 12:19:28 +0000 Subject: [PATCH 10/17] Fix docstring and simplify code --- synapse/storage/databases/main/deviceinbox.py | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 4a6748c1d282..84b126008bce 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -645,9 +645,9 @@ def reindex_txn(conn): async def _remove_deleted_devices_from_device_inbox( self, progress: JsonDict, batch_size: int ) -> int: - """A background update that deletes all device_inboxes for deleted devices. + """No-op. - This should only need to be run once (when users upgrade to v1.47.0) + Used to be a background update that deletes all device_inboxes for deleted devices. Args: progress: JsonDict used to store progress of this background update @@ -663,9 +663,9 @@ async def _remove_deleted_devices_from_device_inbox( async def _remove_hidden_devices_from_device_inbox( self, progress: JsonDict, batch_size: int ) -> int: - """A background update that deletes all device_inboxes for hidden devices. + """No-op. - This should only need to be run once (when users upgrade to v1.47.0) + Used to be a background update that deletes all device_inboxes for hidden devices. Args: progress: JsonDict used to store progress of this background update @@ -729,17 +729,15 @@ def _remove_devices_from_device_inbox_txn( txn.execute(sql, (stop, start)) num_deleted = txn.rowcount - rows = txn.fetchall() - if rows: - self.db_pool.updates._background_update_progress_txn( - txn, - self.REMOVE_DEVICES_FROM_INBOX, - { - "stream_id": stop, - "max_stream_id": max_stream_id, - }, - ) + self.db_pool.updates._background_update_progress_txn( + txn, + self.REMOVE_DEVICES_FROM_INBOX, + { + "stream_id": stop, + "max_stream_id": max_stream_id, + }, + ) return num_deleted, stop >= max_stream_id From 4236fcd0e8c7e783c15364bfb4e407bd734f9200 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 12:28:20 +0000 Subject: [PATCH 11/17] Make tests use the right update --- tests/storage/databases/main/test_deviceinbox.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/storage/databases/main/test_deviceinbox.py b/tests/storage/databases/main/test_deviceinbox.py index 4b67bd15b75b..561d7bfc04f8 100644 --- a/tests/storage/databases/main/test_deviceinbox.py +++ b/tests/storage/databases/main/test_deviceinbox.py @@ -66,7 +66,7 @@ def test_background_remove_deleted_devices_from_device_inbox(self): self.store.db_pool.simple_insert( "background_updates", { - "update_name": "remove_deleted_devices_from_device_inbox", + "update_name": "remove_devices_from_device_inbox", "progress_json": "{}", }, ) @@ -140,7 +140,7 @@ def test_background_remove_hidden_devices_from_device_inbox(self): self.store.db_pool.simple_insert( "background_updates", { - "update_name": "remove_hidden_devices_from_device_inbox", + "update_name": "remove_devices_from_device_inbox", "progress_json": "{}", }, ) From 78cbee82f254520e061fb8be3aed469c526df424 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 12:31:14 +0000 Subject: [PATCH 12/17] Fix arguments order --- synapse/storage/databases/main/deviceinbox.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 84b126008bce..867616fb7d79 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -727,7 +727,7 @@ def _remove_devices_from_device_inbox_txn( ) """ - txn.execute(sql, (stop, start)) + txn.execute(sql, (start, stop)) num_deleted = txn.rowcount self.db_pool.updates._background_update_progress_txn( From c294bbb3c777031b1c55bb9c0522b010bd622d1c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 12:50:51 +0000 Subject: [PATCH 13/17] Don't use table aliases on DELETE Because old sqlite doesn't support it --- synapse/storage/databases/main/deviceinbox.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 867616fb7d79..3d81fe430f17 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -715,14 +715,14 @@ def _remove_devices_from_device_inbox_txn( stop = start + batch_size sql = """ - DELETE FROM device_inbox AS di + DELETE FROM device_inbox WHERE stream_id >= ? AND stream_id < ? AND NOT EXISTS ( - SELECT * FROM devices AS d + SELECT * FROM devices d WHERE - d.device_id=di.device_id - AND d.user_id=di.user_id + d.device_id=device_inbox.device_id + AND d.user_id=device_inbox.user_id AND NOT hidden ) """ From 55d60fd380a56698c4ad3e4a6073130da2606f84 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 13:57:24 +0100 Subject: [PATCH 14/17] Apply suggestions from code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/databases/main/deviceinbox.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3d81fe430f17..c59665dee8bc 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -707,6 +707,8 @@ def _remove_devices_from_device_inbox_txn( # res can't be None. res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment] if res[0] is None: + # this can only happen if the `device_inbox` table is empty, in which + # case we have no work to do. return 0, True else: max_stream_id = res[0] @@ -714,6 +716,8 @@ def _remove_devices_from_device_inbox_txn( start = progress.get("stream_id", 0) stop = start + batch_size + # delete rows in `device_inbox` which do *not* correspond to a known, + # unhidden device. sql = """ DELETE FROM device_inbox WHERE @@ -751,7 +755,7 @@ def _remove_devices_from_device_inbox_txn( self.REMOVE_DEVICES_FROM_INBOX, ) - return num_deleted + return batch_size class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): From 7e290a3482bfc1e6b45db08b567a73f3ae58f3aa Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 13:02:15 +0000 Subject: [PATCH 15/17] Incorporate review comments --- synapse/storage/databases/main/deviceinbox.py | 67 +++++-------------- .../65/08_device_inbox_background_updates.sql | 2 +- .../databases/main/test_deviceinbox.py | 4 +- 3 files changed, 18 insertions(+), 55 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index c59665dee8bc..f624a2188775 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -599,7 +599,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox" REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox" - REMOVE_DEVICES_FROM_INBOX = "remove_devices_from_device_inbox" + REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox" def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) @@ -615,19 +615,18 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) - self.db_pool.updates.register_background_update_handler( - self.REMOVE_DELETED_DEVICES, - self._remove_deleted_devices_from_device_inbox, - ) - - self.db_pool.updates.register_background_update_handler( - self.REMOVE_HIDDEN_DEVICES, - self._remove_hidden_devices_from_device_inbox, + # Used to be a background update that deletes all device_inboxes for deleted + # devices. + self.db_pool.updates.register_noop_background_update( + self.REMOVE_DELETED_DEVICES ) + # Used to be a background update that deletes all device_inboxes for hidden + # devices. + self.db_pool.updates.register_noop_background_update(self.REMOVE_HIDDEN_DEVICES) self.db_pool.updates.register_background_update_handler( - self.REMOVE_DEVICES_FROM_INBOX, - self._remove_devices_from_device_inbox, + self.REMOVE_DEAD_DEVICES_FROM_INBOX, + self._remove_dead_devices_from_device_inbox, ) async def _background_drop_index_device_inbox(self, progress, batch_size): @@ -642,43 +641,7 @@ def reindex_txn(conn): return 1 - async def _remove_deleted_devices_from_device_inbox( - self, progress: JsonDict, batch_size: int - ) -> int: - """No-op. - - Used to be a background update that deletes all device_inboxes for deleted devices. - - Args: - progress: JsonDict used to store progress of this background update - batch_size: the maximum number of rows to retrieve in a single select query - - Returns: - The number of deleted rows - """ - await self.db_pool.updates._end_background_update(self.REMOVE_DELETED_DEVICES) - - return 0 - - async def _remove_hidden_devices_from_device_inbox( - self, progress: JsonDict, batch_size: int - ) -> int: - """No-op. - - Used to be a background update that deletes all device_inboxes for hidden devices. - - Args: - progress: JsonDict used to store progress of this background update - batch_size: the maximum number of rows to retrieve in a single select query - - Returns: - The number of deleted rows - """ - await self.db_pool.updates._end_background_update(self.REMOVE_HIDDEN_DEVICES) - - return 0 - - async def _remove_devices_from_device_inbox( + async def _remove_dead_devices_from_device_inbox( self, progress: JsonDict, batch_size: int, @@ -694,7 +657,7 @@ async def _remove_devices_from_device_inbox( The number of rows deleted. """ - def _remove_devices_from_device_inbox_txn( + def _remove_dead_devices_from_device_inbox_txn( txn: LoggingTransaction, ) -> Tuple[int, bool]: @@ -736,7 +699,7 @@ def _remove_devices_from_device_inbox_txn( self.db_pool.updates._background_update_progress_txn( txn, - self.REMOVE_DEVICES_FROM_INBOX, + self.REMOVE_DEAD_DEVICES_FROM_INBOX, { "stream_id": stop, "max_stream_id": max_stream_id, @@ -747,12 +710,12 @@ def _remove_devices_from_device_inbox_txn( num_deleted, finished = await self.db_pool.runInteraction( "_remove_devices_from_device_inbox_txn", - _remove_devices_from_device_inbox_txn, + _remove_dead_devices_from_device_inbox_txn, ) if finished: await self.db_pool.updates._end_background_update( - self.REMOVE_DEVICES_FROM_INBOX, + self.REMOVE_DEAD_DEVICES_FROM_INBOX, ) return batch_size diff --git a/synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql b/synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql index 24814e7f8af7..d79455c2cedc 100644 --- a/synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql +++ b/synapse/storage/schema/main/delta/65/08_device_inbox_background_updates.sql @@ -15,4 +15,4 @@ -- Background update to clear the inboxes of hidden and deleted devices. INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (6508, 'remove_devices_from_device_inbox', '{}'); + (6508, 'remove_dead_devices_from_device_inbox', '{}'); diff --git a/tests/storage/databases/main/test_deviceinbox.py b/tests/storage/databases/main/test_deviceinbox.py index 561d7bfc04f8..36c933b9e91a 100644 --- a/tests/storage/databases/main/test_deviceinbox.py +++ b/tests/storage/databases/main/test_deviceinbox.py @@ -66,7 +66,7 @@ def test_background_remove_deleted_devices_from_device_inbox(self): self.store.db_pool.simple_insert( "background_updates", { - "update_name": "remove_devices_from_device_inbox", + "update_name": "remove_dead_devices_from_device_inbox", "progress_json": "{}", }, ) @@ -140,7 +140,7 @@ def test_background_remove_hidden_devices_from_device_inbox(self): self.store.db_pool.simple_insert( "background_updates", { - "update_name": "remove_devices_from_device_inbox", + "update_name": "remove_dead_devices_from_device_inbox", "progress_json": "{}", }, ) From 4d93444762e2b477b4c6f420a3aa2028cc319ac2 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 13:05:36 +0000 Subject: [PATCH 16/17] num_deleted isn't used anymore --- synapse/storage/databases/main/deviceinbox.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index f624a2188775..c97b02cbfd4f 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -695,7 +695,6 @@ def _remove_dead_devices_from_device_inbox_txn( """ txn.execute(sql, (start, stop)) - num_deleted = txn.rowcount self.db_pool.updates._background_update_progress_txn( txn, @@ -706,9 +705,9 @@ def _remove_dead_devices_from_device_inbox_txn( }, ) - return num_deleted, stop >= max_stream_id + return stop >= max_stream_id - num_deleted, finished = await self.db_pool.runInteraction( + finished = await self.db_pool.runInteraction( "_remove_devices_from_device_inbox_txn", _remove_dead_devices_from_device_inbox_txn, ) From 1c901485df0faf885ac54e85e1bec72281a19e3c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Nov 2021 14:32:45 +0000 Subject: [PATCH 17/17] Fix finish condition --- synapse/storage/databases/main/deviceinbox.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index c97b02cbfd4f..ab8766c75b62 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -705,7 +705,7 @@ def _remove_dead_devices_from_device_inbox_txn( }, ) - return stop >= max_stream_id + return stop > max_stream_id finished = await self.db_pool.runInteraction( "_remove_devices_from_device_inbox_txn",