Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Create a Database class and move methods out of SQLBaseStore #6469

Merged
merged 7 commits into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6469.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move per database functionality out of the data stores and into a dedicated `Database` class.
17 changes: 7 additions & 10 deletions scripts-dev/update_database
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ if __name__ == "__main__":
" on it."
)
)
parser.add_argument("-v", action='store_true')
parser.add_argument("-v", action="store_true")
parser.add_argument(
"--database-config",
type=argparse.FileType('r'),
type=argparse.FileType("r"),
required=True,
help="A database config file for either a SQLite3 database or a PostgreSQL one.",
)
Expand Down Expand Up @@ -101,24 +101,21 @@ if __name__ == "__main__":

# Instantiate and initialise the homeserver object.
hs = MockHomeserver(
config,
database_engine,
db_conn,
db_config=config.database_config,
config, database_engine, db_conn, db_config=config.database_config,
)
# setup instantiates the store within the homeserver object.
hs.setup()
store = hs.get_datastore()

@defer.inlineCallbacks
def run_background_updates():
yield store.run_background_updates(sleep=False)
yield store.db.updates.run_background_updates(sleep=False)
# Stop the reactor to exit the script once every background update is run.
reactor.stop()

# Apply all background updates on the database.
reactor.callWhenRunning(lambda: run_as_background_process(
"background_updates", run_background_updates
))
reactor.callWhenRunning(
lambda: run_as_background_process("background_updates", run_background_updates)
)

reactor.run()
83 changes: 44 additions & 39 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ class Store(
return (yield self.db_pool.runWithConnection(r))

def execute(self, f, *args, **kwargs):
return self.runInteraction(f.__name__, f, *args, **kwargs)
return self.db.runInteraction(f.__name__, f, *args, **kwargs)

def execute_sql(self, sql, *args):
def r(txn):
txn.execute(sql, args)
return txn.fetchall()

return self.runInteraction("execute_sql", r)
return self.db.runInteraction("execute_sql", r)

def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
Expand Down Expand Up @@ -223,7 +223,7 @@ class Porter(object):
def setup_table(self, table):
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
row = yield self.postgres_store.simple_select_one(
row = yield self.postgres_store.db.simple_select_one(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcols=("forward_rowid", "backward_rowid"),
Expand All @@ -233,12 +233,14 @@ class Porter(object):
total_to_port = None
if row is None:
if table == "sent_transactions":
forward_chunk, already_ported, total_to_port = (
yield self._setup_sent_transactions()
)
(
forward_chunk,
already_ported,
total_to_port,
) = yield self._setup_sent_transactions()
backward_chunk = 0
else:
yield self.postgres_store.simple_insert(
yield self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={
"table_name": table,
Expand Down Expand Up @@ -268,7 +270,7 @@ class Porter(object):

yield self.postgres_store.execute(delete_all)

yield self.postgres_store.simple_insert(
yield self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
)
Expand Down Expand Up @@ -322,7 +324,7 @@ class Porter(object):
if table == "user_directory_stream_pos":
# We need to make sure there is a single row, `(X, null), as that is
# what synapse expects to be there.
yield self.postgres_store.simple_insert(
yield self.postgres_store.db.simple_insert(
table=table, values={"stream_id": None}
)
self.progress.update(table, table_size) # Mark table as done
Expand Down Expand Up @@ -363,7 +365,9 @@ class Porter(object):

return headers, forward_rows, backward_rows

headers, frows, brows = yield self.sqlite_store.runInteraction("select", r)
headers, frows, brows = yield self.sqlite_store.db.runInteraction(
"select", r
)

if frows or brows:
if frows:
Expand All @@ -377,7 +381,7 @@ class Porter(object):
def insert(txn):
self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)

self.postgres_store.simple_update_one_txn(
self.postgres_store.db.simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
Expand Down Expand Up @@ -416,7 +420,7 @@ class Porter(object):

return headers, rows

headers, rows = yield self.sqlite_store.runInteraction("select", r)
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)

if rows:
forward_chunk = rows[-1][0] + 1
Expand All @@ -433,8 +437,8 @@ class Porter(object):
rows_dict = []
for row in rows:
d = dict(zip(headers, row))
if "\0" in d['value']:
logger.warning('dropping search row %s', d)
if "\0" in d["value"]:
logger.warning("dropping search row %s", d)
else:
rows_dict.append(d)

Expand All @@ -454,7 +458,7 @@ class Porter(object):
],
)

self.postgres_store.simple_update_one_txn(
self.postgres_store.db.simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": "event_search"},
Expand Down Expand Up @@ -504,35 +508,34 @@ class Porter(object):
self.progress.set_state("Preparing %s" % config["name"])
conn = self.setup_db(config, engine)

db_pool = adbapi.ConnectionPool(
config["name"], **config["args"]
)
db_pool = adbapi.ConnectionPool(config["name"], **config["args"])

hs = MockHomeserver(self.hs_config, engine, conn, db_pool)

store = Store(conn, hs)

yield store.runInteraction(
"%s_engine.check_database" % config["name"],
engine.check_database,
yield store.db.runInteraction(
"%s_engine.check_database" % config["name"], engine.check_database,
)

return store

@defer.inlineCallbacks
def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = yield self.postgres_store.has_completed_background_updates()
postgres_ready = (
yield self.postgres_store.db.updates.has_completed_background_updates()
)

if not postgres_ready:
# Only say that we're running background updates when there are background
# updates to run.
self.progress.set_state("Running background updates on PostgreSQL")

while not postgres_ready:
yield self.postgres_store.do_next_background_update(100)
yield self.postgres_store.db.updates.do_next_background_update(100)
postgres_ready = yield (
self.postgres_store.has_completed_background_updates()
self.postgres_store.db.updates.has_completed_background_updates()
)

@defer.inlineCallbacks
Expand All @@ -541,7 +544,9 @@ class Porter(object):
self.sqlite_store = yield self.build_db_store(self.sqlite_config)

# Check if all background updates are done, abort if not.
updates_complete = yield self.sqlite_store.has_completed_background_updates()
updates_complete = (
yield self.sqlite_store.db.updates.has_completed_background_updates()
)
if not updates_complete:
sys.stderr.write(
"Pending background updates exist in the SQLite3 database."
Expand Down Expand Up @@ -582,22 +587,22 @@ class Porter(object):
)

try:
yield self.postgres_store.runInteraction("alter_table", alter_table)
yield self.postgres_store.db.runInteraction("alter_table", alter_table)
except Exception:
# On Error Resume Next
pass

yield self.postgres_store.runInteraction(
yield self.postgres_store.db.runInteraction(
"create_port_table", create_port_table
)

# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store.simple_select_onecol(
sqlite_tables = yield self.sqlite_store.db.simple_select_onecol(
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
)

postgres_tables = yield self.postgres_store.simple_select_onecol(
postgres_tables = yield self.postgres_store.db.simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
Expand Down Expand Up @@ -687,11 +692,11 @@ class Porter(object):
rows = txn.fetchall()
headers = [column[0] for column in txn.description]

ts_ind = headers.index('ts')
ts_ind = headers.index("ts")

return headers, [r for r in rows if r[ts_ind] < yesterday]

headers, rows = yield self.sqlite_store.runInteraction("select", r)
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)

rows = self._convert_rows("sent_transactions", headers, rows)

Expand Down Expand Up @@ -724,7 +729,7 @@ class Porter(object):
next_chunk = yield self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)

yield self.postgres_store.simple_insert(
yield self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={
"table_name": "sent_transactions",
Expand All @@ -737,7 +742,7 @@ class Porter(object):
txn.execute(
"SELECT count(*) FROM sent_transactions" " WHERE ts >= ?", (yesterday,)
)
size, = txn.fetchone()
(size,) = txn.fetchone()
return int(size)

remaining_count = yield self.sqlite_store.execute(get_sent_table_size)
Expand Down Expand Up @@ -790,7 +795,7 @@ class Porter(object):
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))

return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
return self.postgres_store.db.runInteraction("setup_state_group_id_seq", r)


##############################################
Expand Down Expand Up @@ -871,7 +876,7 @@ class CursesProgress(Progress):
duration = int(now) - int(self.start_time)

minutes, seconds = divmod(duration, 60)
duration_str = '%02dm %02ds' % (minutes, seconds)
duration_str = "%02dm %02ds" % (minutes, seconds)

if self.finished:
status = "Time spent: %s (Done!)" % (duration_str,)
Expand All @@ -881,7 +886,7 @@ class CursesProgress(Progress):
left = float(self.total_remaining) / self.total_processed

est_remaining = (int(now) - self.start_time) * left
est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60)
est_remaining_str = "%02dm %02ds remaining" % divmod(est_remaining, 60)
else:
est_remaining_str = "Unknown"
status = "Time spent: %s (est. remaining: %s)" % (
Expand Down Expand Up @@ -967,7 +972,7 @@ if __name__ == "__main__":
description="A script to port an existing synapse SQLite database to"
" a new PostgreSQL database."
)
parser.add_argument("-v", action='store_true')
parser.add_argument("-v", action="store_true")
parser.add_argument(
"--sqlite-database",
required=True,
Expand All @@ -976,12 +981,12 @@ if __name__ == "__main__":
)
parser.add_argument(
"--postgres-config",
type=argparse.FileType('r'),
type=argparse.FileType("r"),
required=True,
help="The database config file for the PostgreSQL database",
)
parser.add_argument(
"--curses", action='store_true', help="display a curses based progress UI"
"--curses", action="store_true", help="display a curses based progress UI"
)

parser.add_argument(
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def handle_sighup(*args, **kwargs):

# It is now safe to start your Synapse.
hs.start_listening(listeners)
hs.get_datastore().start_profiling()
hs.get_datastore().db.start_profiling()

setup_sentry(hs)
setup_sdnotify(hs)
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def start():
_base.start(hs, config.listeners)

hs.get_pusherpool().start()
hs.get_datastore().start_doing_background_updates()
hs.get_datastore().db.updates.start_doing_background_updates()
except Exception:
# Print the exception and bail out.
print("Error during startup:", file=sys.stderr)
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/user_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(self, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(db_conn, hs)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.get_cache_dict(
curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
db_conn,
"current_state_delta_stream",
entity_column="room_id",
Expand Down
2 changes: 1 addition & 1 deletion synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,4 @@ def run_db_interaction(self, desc, func, *args, **kwargs):
Returns:
Deferred[object]: result of func
"""
return self._store.runInteraction(desc, func, *args, **kwargs)
return self._store.db.runInteraction(desc, func, *args, **kwargs)
2 changes: 1 addition & 1 deletion synapse/rest/media/v1/preview_url_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def _expire_url_cache_data(self):

logger.info("Running url preview cache expiry")

if not (yield self.store.has_completed_background_updates()):
if not (yield self.store.db.updates.has_completed_background_updates()):
logger.info("Still running DB updates; skipping expiry")
return

Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
"""
The storage layer is split up into multiple parts to allow Synapse to run
against different configurations of databases (e.g. single or multiple
databases). The `data_stores` are classes that talk directly to a single
database and have associated schemas, background updates, etc. On top of those
there are (or will be) classes that provide high level interfaces that combine
calls to multiple `data_stores`.
databases). The `Database` class represents a single physical database. The
`data_stores` are classes that talk directly to a `Database` instance and have
associated schemas, background updates, etc. On top of those there are classes
that provide high level interfaces that combine calls to multiple `data_stores`.

There are also schemas that get applied to every database, regardless of the
data stores associated with them (e.g. the schema version tables), which are
Expand Down
Loading