Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve performance of remove_{hidden,deleted}_devices_from_device_inbox #11421

Merged
merged 19 commits into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11421.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of various background database schema updates.
195 changes: 73 additions & 122 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox"
REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox"
REMOVE_DEVICES_FROM_INBOX = "remove_devices_from_device_inbox"

def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
Expand All @@ -624,6 +625,11 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
self._remove_hidden_devices_from_device_inbox,
)

self.db_pool.updates.register_background_update_handler(
self.REMOVE_DEVICES_FROM_INBOX,
self._remove_devices_from_device_inbox,
)

async def _background_drop_index_device_inbox(self, progress, batch_size):
def reindex_txn(conn):
txn = conn.cursor()
Expand All @@ -639,9 +645,9 @@ def reindex_txn(conn):
async def _remove_deleted_devices_from_device_inbox(
self, progress: JsonDict, batch_size: int
) -> int:
"""A background update that deletes all device_inboxes for deleted devices.
"""No-op.
babolivier marked this conversation as resolved.
Show resolved Hide resolved

This should only need to be run once (when users upgrade to v1.47.0)
Used to be a background update that deletes all device_inboxes for deleted devices.

Args:
progress: JsonDict used to store progress of this background update
Expand All @@ -650,81 +656,16 @@ async def _remove_deleted_devices_from_device_inbox(
Returns:
The number of deleted rows
"""
await self.db_pool.updates._end_background_update(self.REMOVE_DELETED_DEVICES)

def _remove_deleted_devices_from_device_inbox_txn(
txn: LoggingTransaction,
) -> int:
"""stream_id is not unique
we need to use an inclusive `stream_id >= ?` clause,
since we might not have deleted all dead device messages for the stream_id
returned from the previous query

Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
to avoid problems of deleting a large number of rows all at once
due to a single device having lots of device messages.
"""

last_stream_id = progress.get("stream_id", 0)

sql = """
SELECT device_id, user_id, stream_id
FROM device_inbox
WHERE
stream_id >= ?
AND (device_id, user_id) NOT IN (
SELECT device_id, user_id FROM devices
)
ORDER BY stream_id
LIMIT ?
"""

txn.execute(sql, (last_stream_id, batch_size))
rows = txn.fetchall()

num_deleted = 0
for row in rows:
num_deleted += self.db_pool.simple_delete_txn(
txn,
"device_inbox",
{"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
)

if rows:
# send more than stream_id to progress
# otherwise it can happen in large deployments that
# no change of status is visible in the log file
# it may be that the stream_id does not change in several runs
self.db_pool.updates._background_update_progress_txn(
txn,
self.REMOVE_DELETED_DEVICES,
{
"device_id": rows[-1][0],
"user_id": rows[-1][1],
"stream_id": rows[-1][2],
},
)

return num_deleted

number_deleted = await self.db_pool.runInteraction(
"_remove_deleted_devices_from_device_inbox",
_remove_deleted_devices_from_device_inbox_txn,
)

# The task is finished when no more lines are deleted.
if not number_deleted:
await self.db_pool.updates._end_background_update(
self.REMOVE_DELETED_DEVICES
)

return number_deleted
return 0

async def _remove_hidden_devices_from_device_inbox(
self, progress: JsonDict, batch_size: int
) -> int:
"""A background update that deletes all device_inboxes for hidden devices.
"""No-op.

This should only need to be run once (when users upgrade to v1.47.0)
Used to be a background update that deletes all device_inboxes for hidden devices.

Args:
progress: JsonDict used to store progress of this background update
Expand All @@ -733,74 +674,84 @@ async def _remove_hidden_devices_from_device_inbox(
Returns:
The number of deleted rows
"""
await self.db_pool.updates._end_background_update(self.REMOVE_HIDDEN_DEVICES)

return 0

async def _remove_devices_from_device_inbox(
self,
progress: JsonDict,
batch_size: int,
) -> int:
"""A background update to remove devices that were either deleted or hidden from
the device_inbox table.

Args:
progress: The update's progress dict.
batch_size: The batch size for this update.

Returns:
The number of rows deleted.
"""

def _remove_hidden_devices_from_device_inbox_txn(
def _remove_devices_from_device_inbox_txn(
txn: LoggingTransaction,
) -> int:
"""stream_id is not unique
we need to use an inclusive `stream_id >= ?` clause,
since we might not have deleted all hidden device messages for the stream_id
returned from the previous query
) -> Tuple[int, bool]:

Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
to avoid problems of deleting a large number of rows all at once
due to a single device having lots of device messages.
"""
if "max_stream_id" in progress:
max_stream_id = progress["max_stream_id"]
else:
txn.execute("SELECT max(stream_id) FROM device_inbox")
# There's a type mismatch here between how we want to type the row and
# what fetchone says it returns, but we silence it because we know that
# res can't be None.
res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment]
if res[0] is None:
return 0, True
babolivier marked this conversation as resolved.
Show resolved Hide resolved
else:
max_stream_id = res[0]

last_stream_id = progress.get("stream_id", 0)
start = progress.get("stream_id", 0)
stop = start + batch_size

sql = """
babolivier marked this conversation as resolved.
Show resolved Hide resolved
SELECT device_id, user_id, stream_id
FROM device_inbox
DELETE FROM device_inbox AS di
WHERE
stream_id >= ?
AND (device_id, user_id) IN (
SELECT device_id, user_id FROM devices WHERE hidden = ?
stream_id >= ? AND stream_id < ?
AND NOT EXISTS (
SELECT * FROM devices AS d
WHERE
d.device_id=di.device_id
AND d.user_id=di.user_id
AND NOT hidden
)
ORDER BY stream_id
LIMIT ?
"""

txn.execute(sql, (last_stream_id, True, batch_size))
rows = txn.fetchall()
"""

num_deleted = 0
for row in rows:
num_deleted += self.db_pool.simple_delete_txn(
txn,
"device_inbox",
{"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
)
txn.execute(sql, (start, stop))
num_deleted = txn.rowcount

if rows:
# We don't just save the `stream_id` in progress as
# otherwise it can happen in large deployments that
# no change of status is visible in the log file, as
# it may be that the stream_id does not change in several runs
self.db_pool.updates._background_update_progress_txn(
txn,
self.REMOVE_HIDDEN_DEVICES,
{
"device_id": rows[-1][0],
"user_id": rows[-1][1],
"stream_id": rows[-1][2],
},
)
self.db_pool.updates._background_update_progress_txn(
txn,
self.REMOVE_DEVICES_FROM_INBOX,
{
"stream_id": stop,
"max_stream_id": max_stream_id,
},
)

return num_deleted
return num_deleted, stop >= max_stream_id

number_deleted = await self.db_pool.runInteraction(
"_remove_hidden_devices_from_device_inbox",
_remove_hidden_devices_from_device_inbox_txn,
num_deleted, finished = await self.db_pool.runInteraction(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"_remove_devices_from_device_inbox_txn",
_remove_devices_from_device_inbox_txn,
)

# The task is finished when no more lines are deleted.
if not number_deleted:
if finished:
await self.db_pool.updates._end_background_update(
self.REMOVE_HIDDEN_DEVICES
self.REMOVE_DEVICES_FROM_INBOX,
)

return number_deleted
return num_deleted
babolivier marked this conversation as resolved.
Show resolved Hide resolved


class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Background update to clear the inboxes of hidden and deleted devices.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6508, 'remove_devices_from_device_inbox', '{}');
babolivier marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions tests/storage/databases/main/test_deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_background_remove_deleted_devices_from_device_inbox(self):
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": "remove_deleted_devices_from_device_inbox",
"update_name": "remove_devices_from_device_inbox",
"progress_json": "{}",
},
)
Expand Down Expand Up @@ -140,7 +140,7 @@ def test_background_remove_hidden_devices_from_device_inbox(self):
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": "remove_hidden_devices_from_device_inbox",
"update_name": "remove_devices_from_device_inbox",
"progress_json": "{}",
},
)
Expand Down