Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix port script so that it can be run again after failure. #8755

Merged
merged 2 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8755.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix port script so that it can be run again after a failure. Broke in v1.21.0.
84 changes: 54 additions & 30 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,18 @@ class Porter(object):
"create_port_table", create_port_table
)

# Step 2. Get tables.
# Step 2. Set up sequences
#
# We do this before porting the tables so that event if we fail half
# way through the postgres DB always have sequences that are greater
# than their respective tables. If we don't then creating the
# `DataStore` object will fail due to the inconsistency.
self.progress.set_state("Setting up sequence generators")
await self._setup_state_group_id_seq()
await self._setup_user_id_seq()
await self._setup_events_stream_seqs()

# Step 3. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
Expand All @@ -634,7 +645,7 @@ class Porter(object):
tables = set(sqlite_tables) & set(postgres_tables)
logger.info("Found %d tables", len(tables))

# Step 3. Figure out what still needs copying
# Step 4. Figure out what still needs copying
self.progress.set_state("Checking on port progress")
setup_res = await make_deferred_yieldable(
defer.gatherResults(
Expand All @@ -651,7 +662,7 @@ class Porter(object):
# of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`.
tables_to_port_info_map = {r[0]: r[1:] for r in setup_res}

# Step 4. Do the copying.
# Step 5. Do the copying.
#
# This is slightly convoluted as we need to ensure tables are ported
# in the correct order due to foreign key constraints.
Expand Down Expand Up @@ -685,12 +696,6 @@ class Porter(object):

tables_ported.update(tables_to_port)

# Step 5. Set up sequences
self.progress.set_state("Setting up sequence generators")
await self._setup_state_group_id_seq()
await self._setup_user_id_seq()
await self._setup_events_stream_seqs()

self.progress.done()
except Exception as e:
global end_error_exec_info
Expand Down Expand Up @@ -848,43 +853,62 @@ class Porter(object):

return done, remaining + done

def _setup_state_group_id_seq(self):
async def _setup_state_group_id_seq(self):
curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
)

if not curr_id:
return

def r(txn):
txn.execute("SELECT MAX(id) FROM state_groups")
curr_id = txn.fetchone()[0]
if not curr_id:
return
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))

return self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)

async def _setup_user_id_seq(self):
curr_id = await self.sqlite_store.db_pool.runInteraction(
"setup_user_id_seq", find_max_generated_user_id_localpart
)

def _setup_user_id_seq(self):
def r(txn):
next_id = find_max_generated_user_id_localpart(txn) + 1
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))

return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)

def _setup_events_stream_seqs(self):
def r(txn):
txn.execute("SELECT MAX(stream_ordering) FROM events")
curr_id = txn.fetchone()[0]
if curr_id:
next_id = curr_id + 1
async def _setup_events_stream_seqs(self):
"""Set the event stream sequences to the correct values.
"""

# We get called before we've ported the events table, so we need to
# fetch the current positions from the SQLite store.
curr_forward_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="events", keyvalues={}, retcol="MAX(stream_ordering)", allow_none=True
)

curr_backward_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="events",
keyvalues={},
retcol="MAX(-MIN(stream_ordering), 1)",
allow_none=True,
)

def _setup_events_stream_seqs_set_pos(txn):
if curr_forward_id:
txn.execute(
"ALTER SEQUENCE events_stream_seq RESTART WITH %s", (next_id,)
"ALTER SEQUENCE events_stream_seq RESTART WITH %s",
(curr_forward_id + 1,),
)

txn.execute("SELECT GREATEST(-MIN(stream_ordering), 1) FROM events")
curr_id = txn.fetchone()[0]
next_id = curr_id + 1
txn.execute(
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s", (next_id,),
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
(curr_backward_id + 1,),
)

return self.postgres_store.db_pool.runInteraction(
"_setup_events_stream_seqs", r
return await self.postgres_store.db_pool.runInteraction(
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
)


Expand Down