Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
User directory background update speedup
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Apr 14, 2023
1 parent dabbb94 commit d7fca7b
Showing 1 changed file with 41 additions and 46 deletions.
87 changes: 41 additions & 46 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,45 +102,31 @@ async def _populate_user_directory_createtables(
) -> int:
# Get all the rooms that we want to process.
def _make_staging_area(txn: LoggingTransaction) -> None:
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
)
txn.execute(sql)

sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_position(position TEXT NOT NULL)"
)
sql = f"""
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_rooms AS(
SELECT room_id, count(*) AS events
FROM current_state_events
GROUP BY room_id
)
"""
txn.execute(sql)
txn.execute(f"CREATE INDEX ON {TEMP_TABLE}_rooms(rooms)")
txn.execute(f"CREATE INDEX ON {TEMP_TABLE}_rooms(events)")

# Get rooms we want to process from the database
sql = """
SELECT room_id, count(*) FROM current_state_events
GROUP BY room_id
sql = f"""
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_position (
position TEXT NOT NULL
)
"""
txn.execute(sql)
rooms = list(txn.fetchall())
self.db_pool.simple_insert_many_txn(
txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms
)
del rooms

sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_users(user_id TEXT NOT NULL)"
)
sql = f"""
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_users AS(
SELECT name AS user_id FROM users
)
"""
txn.execute(sql)

txn.execute("SELECT name FROM users")
users = list(txn.fetchall())

self.db_pool.simple_insert_many_txn(
txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
)
txn.execute(f"CREATE INDEX ON {TEMP_TABLE}_users(user_id)")

new_pos = await self.get_max_stream_id_in_current_state_deltas()
await self.db_pool.runInteraction(
Expand Down Expand Up @@ -222,11 +208,12 @@ def _get_next_batch(
if not rooms_to_work_on:
return None

# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
result = txn.fetchone()
assert result is not None
if "remaining" not in progress:
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
result = txn.fetchone()
assert result is not None
progress["remaining"] = result[0]

return rooms_to_work_on
Expand Down Expand Up @@ -332,7 +319,14 @@ def _get_next_batch(

if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
return processed_event_count
break

await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_rooms",
progress,
)

return processed_event_count

Expand All @@ -356,13 +350,14 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:

users_to_work_on = [x[0] for x in user_result]

# Get how many are left to process, so we can give status on how
# far we are in processing
sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
txn.execute(sql)
count_result = txn.fetchone()
assert count_result is not None
progress["remaining"] = count_result[0]
if "remaining" not in progress:
# Get how many are left to process, so we can give status on how
# far we are in processing
sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
txn.execute(sql)
count_result = txn.fetchone()
assert count_result is not None
progress["remaining"] = count_result[0]

return users_to_work_on

Expand Down

0 comments on commit d7fca7b

Please sign in to comment.