From d7fca7b68af8aa65316208e7be2f9408051e59b2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Apr 2023 14:22:39 +0100 Subject: [PATCH] User directory background update speedup --- .../storage/databases/main/user_directory.py | 87 +++++++++---------- 1 file changed, 41 insertions(+), 46 deletions(-) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 9fced4b99718..11c399311c72 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -102,45 +102,31 @@ async def _populate_user_directory_createtables( ) -> int: # Get all the rooms that we want to process. def _make_staging_area(txn: LoggingTransaction) -> None: - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)" - ) - txn.execute(sql) - - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_position(position TEXT NOT NULL)" - ) + sql = f""" + CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_rooms AS( + SELECT room_id, count(*) AS events + FROM current_state_events + GROUP BY room_id + ) + """ txn.execute(sql) + txn.execute(f"CREATE INDEX ON {TEMP_TABLE}_rooms(rooms)") + txn.execute(f"CREATE INDEX ON {TEMP_TABLE}_rooms(events)") - # Get rooms we want to process from the database - sql = """ - SELECT room_id, count(*) FROM current_state_events - GROUP BY room_id + sql = f""" + CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_position ( + position TEXT NOT NULL + ) """ txn.execute(sql) - rooms = list(txn.fetchall()) - self.db_pool.simple_insert_many_txn( - txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms - ) - del rooms - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_users(user_id TEXT NOT NULL)" - ) + sql = f""" + CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_users AS( + SELECT name AS user_id FROM users + ) + """ txn.execute(sql) - - txn.execute("SELECT name FROM users") - users = list(txn.fetchall()) - - self.db_pool.simple_insert_many_txn( - txn, TEMP_TABLE + "_users", keys=("user_id",), values=users - ) + txn.execute(f"CREATE INDEX ON {TEMP_TABLE}_users(user_id)") new_pos = await self.get_max_stream_id_in_current_state_deltas() await self.db_pool.runInteraction( @@ -222,11 +208,12 @@ def _get_next_batch( if not rooms_to_work_on: return None - # Get how many are left to process, so we can give status on how - # far we are in processing - txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") - result = txn.fetchone() - assert result is not None + if "remaining" not in progress: + # Get how many are left to process, so we can give status on how + # far we are in processing + txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") + result = txn.fetchone() + assert result is not None progress["remaining"] = result[0] return rooms_to_work_on @@ -332,7 +319,14 @@ def _get_next_batch( if processed_event_count > batch_size: # Don't process any more rooms, we've hit our batch size. - return processed_event_count + break + + await self.db_pool.runInteraction( + "populate_user_directory", + self.db_pool.updates._background_update_progress_txn, + "populate_user_directory_process_rooms", + progress, + ) return processed_event_count @@ -356,13 +350,14 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: users_to_work_on = [x[0] for x in user_result] - # Get how many are left to process, so we can give status on how - # far we are in processing - sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users" - txn.execute(sql) - count_result = txn.fetchone() - assert count_result is not None - progress["remaining"] = count_result[0] + if "remaining" not in progress: + # Get how many are left to process, so we can give status on how + # far we are in processing + sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users" + txn.execute(sql) + count_result = txn.fetchone() + assert count_result is not None + progress["remaining"] = count_result[0] return users_to_work_on