From 74d41de5c15b4ea762b3471c3ecb2629520e9b21 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 19 Feb 2021 10:27:02 -0500 Subject: [PATCH 1/5] Abstract code for porting sequences. --- scripts/synapse_port_db | 39 +++++++++++++++++++-------------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 69bf9110a6d3..1f9cb0d0ae05 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -22,7 +22,7 @@ import logging import sys import time import traceback -from typing import Dict, Optional, Set +from typing import Dict, Iterable, Optional, Set import yaml @@ -629,7 +629,9 @@ class Porter(object): await self._setup_state_group_id_seq() await self._setup_user_id_seq() await self._setup_events_stream_seqs() - await self._setup_device_inbox_seq() + await self._setup_sequence( + "device_inbox_sequence", ("device_inbox", "device_federation_outbox") + ) # Step 3. Get tables. self.progress.set_state("Fetching tables") @@ -912,31 +914,28 @@ class Porter(object): "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos, ) - async def _setup_device_inbox_seq(self): - """Set the device inbox sequence to the correct value. + async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None: + """Set a sequence to the correct value. """ - curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol( - table="device_inbox", - keyvalues={}, - retcol="COALESCE(MAX(stream_id), 1)", - allow_none=True, - ) - - curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol( - table="device_federation_outbox", - keyvalues={}, - retcol="COALESCE(MAX(stream_id), 1)", - allow_none=True, - ) + current_stream_ids = [] + for stream_id_table in stream_id_tables: + max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table=stream_id_table, + keyvalues={}, + retcol="COALESCE(MAX(stream_id), 1)", + allow_none=True, + ) + current_stream_ids.append(max_stream_id) - next_id = max(curr_local_id, curr_federation_id) + 1 + next_id = max(current_stream_ids) + 1 def r(txn): txn.execute( - "ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,) + "ALTER SEQUENCE %s RESTART WITH %s", (sequence_name, next_id,) ) - return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r) + await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r) + ############################################## From 840dadbc4217f0680abe2b9ec1bb2f6728a7e1ff Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 19 Feb 2021 10:29:20 -0500 Subject: [PATCH 2/5] Add additional seequences. --- scripts/synapse_port_db | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 1f9cb0d0ae05..e17a2b87969c 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -632,6 +632,9 @@ class Porter(object): await self._setup_sequence( "device_inbox_sequence", ("device_inbox", "device_federation_outbox") ) + await self._setup_sequence( + "account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data")) + await self._setup_sequence("receipts_sequence", ("receipts_linearized", )) # Step 3. Get tables. self.progress.set_state("Fetching tables") From b287a452af9195c687f8d5df136da6bf6caeb223 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 19 Feb 2021 10:31:01 -0500 Subject: [PATCH 3/5] Ensure runInteraction calls are awaited. --- scripts/synapse_port_db | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index e17a2b87969c..42b10f475ae2 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -859,7 +859,7 @@ class Porter(object): return done, remaining + done - async def _setup_state_group_id_seq(self): + async def _setup_state_group_id_seq(self) -> None: curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol( table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True ) @@ -873,7 +873,7 @@ class Porter(object): await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r) - async def _setup_user_id_seq(self): + async def _setup_user_id_seq(self) -> None: curr_id = await self.sqlite_store.db_pool.runInteraction( "setup_user_id_seq", find_max_generated_user_id_localpart ) @@ -882,9 +882,9 @@ class Porter(object): next_id = curr_id + 1 txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,)) - return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) + await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) - async def _setup_events_stream_seqs(self): + async def _setup_events_stream_seqs(self) -> None: """Set the event stream sequences to the correct values. """ @@ -913,7 +913,7 @@ class Porter(object): (curr_backward_id + 1,), ) - return await self.postgres_store.db_pool.runInteraction( + await self.postgres_store.db_pool.runInteraction( "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos, ) From 4506517323df92319e19c7d47d81103ee32151ab Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 19 Feb 2021 10:37:37 -0500 Subject: [PATCH 4/5] Port the auth chain sequence and check it for consistency. --- scripts/synapse_port_db | 19 +++++++++++++++++-- synapse/storage/databases/__init__.py | 2 +- synapse/storage/databases/main/events.py | 13 ++++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 42b10f475ae2..d2aaea08f55c 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -635,6 +635,7 @@ class Porter(object): await self._setup_sequence( "account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data")) await self._setup_sequence("receipts_sequence", ("receipts_linearized", )) + await self._setup_auth_chain_sequence() # Step 3. Get tables. self.progress.set_state("Fetching tables") @@ -932,12 +933,26 @@ class Porter(object): next_id = max(current_stream_ids) + 1 + def r(txn): + sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, ) + txn.execute(sql + " %s", (next_id, )) + + await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r) + + async def _setup_auth_chain_sequence(self) -> None: + curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True + ) + def r(txn): txn.execute( - "ALTER SEQUENCE %s RESTART WITH %s", (sequence_name, next_id,) + "ALTER SEQUENCE event_auth_chain_id RESTART WITH %s", + (curr_chain_id,), ) - await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r) + await self.postgres_store.db_pool.runInteraction( + "_setup_event_auth_chain_id", r, + ) diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index e84f8b42f734..379c78bb83bb 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -79,7 +79,7 @@ def __init__(self, main_store_class, hs): # If we're on a process that can persist events also # instantiate a `PersistEventsStore` if hs.get_instance_name() in hs.config.worker.writers.events: - persist_events = PersistEventsStore(hs, database, main) + persist_events = PersistEventsStore(hs, database, main, db_conn) if "state" in database_config.databases: logger.info( diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 287606cb4f07..a7a11a5bc049 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -42,6 +42,7 @@ from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.search import SearchEntry +from synapse.storage.types import Connection from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import StateMap, get_domain_from_id from synapse.util import json_encoder @@ -90,7 +91,11 @@ class PersistEventsStore: """ def __init__( - self, hs: "HomeServer", db: DatabasePool, main_data_store: "DataStore" + self, + hs: "HomeServer", + db: DatabasePool, + main_data_store: "DataStore", + db_conn: Connection, ): self.hs = hs self.db_pool = db @@ -109,6 +114,12 @@ def __init__( ) # type: MultiWriterIdGenerator self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator + # The consistency of this cannot be checked when the ID generator is + # created since the database might not yet be up-to-date. + self.db_pool.event_chain_id_gen.check_consistency( + db_conn, "event_auth_chains", "chain_id" # type: ignore + ) + # This should only exist on instances that are configured to write assert ( hs.get_instance_name() in hs.config.worker.writers.events From 6c152baf15163a29d695a881ecbdfe5a789b7e3b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 19 Feb 2021 10:41:07 -0500 Subject: [PATCH 5/5] Newsfragment --- changelog.d/9449.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9449.bugfix diff --git a/changelog.d/9449.bugfix b/changelog.d/9449.bugfix new file mode 100644 index 000000000000..54214a7e4a07 --- /dev/null +++ b/changelog.d/9449.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.26.0 where some sequences were not properly configured when running `synapse_port_db`.