Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6469 from matrix-org/erikj/make_database_class
Browse files Browse the repository at this point in the history
Create a Database class and move methods out of SQLBaseStore
  • Loading branch information
erikjohnston authored Dec 6, 2019
2 parents 649b6bc + 2ace775 commit f3ea2f5
Show file tree
Hide file tree
Showing 68 changed files with 2,685 additions and 2,514 deletions.
1 change: 1 addition & 0 deletions changelog.d/6469.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move per database functionality out of the data stores and into a dedicated `Database` class.
17 changes: 7 additions & 10 deletions scripts-dev/update_database
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ if __name__ == "__main__":
" on it."
)
)
parser.add_argument("-v", action='store_true')
parser.add_argument("-v", action="store_true")
parser.add_argument(
"--database-config",
type=argparse.FileType('r'),
type=argparse.FileType("r"),
required=True,
help="A database config file for either a SQLite3 database or a PostgreSQL one.",
)
Expand Down Expand Up @@ -101,24 +101,21 @@ if __name__ == "__main__":

# Instantiate and initialise the homeserver object.
hs = MockHomeserver(
config,
database_engine,
db_conn,
db_config=config.database_config,
config, database_engine, db_conn, db_config=config.database_config,
)
# setup instantiates the store within the homeserver object.
hs.setup()
store = hs.get_datastore()

@defer.inlineCallbacks
def run_background_updates():
yield store.run_background_updates(sleep=False)
yield store.db.updates.run_background_updates(sleep=False)
# Stop the reactor to exit the script once every background update is run.
reactor.stop()

# Apply all background updates on the database.
reactor.callWhenRunning(lambda: run_as_background_process(
"background_updates", run_background_updates
))
reactor.callWhenRunning(
lambda: run_as_background_process("background_updates", run_background_updates)
)

reactor.run()
83 changes: 44 additions & 39 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ class Store(
return (yield self.db_pool.runWithConnection(r))

def execute(self, f, *args, **kwargs):
return self.runInteraction(f.__name__, f, *args, **kwargs)
return self.db.runInteraction(f.__name__, f, *args, **kwargs)

def execute_sql(self, sql, *args):
def r(txn):
txn.execute(sql, args)
return txn.fetchall()

return self.runInteraction("execute_sql", r)
return self.db.runInteraction("execute_sql", r)

def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
Expand Down Expand Up @@ -223,7 +223,7 @@ class Porter(object):
def setup_table(self, table):
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
row = yield self.postgres_store.simple_select_one(
row = yield self.postgres_store.db.simple_select_one(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcols=("forward_rowid", "backward_rowid"),
Expand All @@ -233,12 +233,14 @@ class Porter(object):
total_to_port = None
if row is None:
if table == "sent_transactions":
forward_chunk, already_ported, total_to_port = (
yield self._setup_sent_transactions()
)
(
forward_chunk,
already_ported,
total_to_port,
) = yield self._setup_sent_transactions()
backward_chunk = 0
else:
yield self.postgres_store.simple_insert(
yield self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={
"table_name": table,
Expand Down Expand Up @@ -268,7 +270,7 @@ class Porter(object):

yield self.postgres_store.execute(delete_all)

yield self.postgres_store.simple_insert(
yield self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
)
Expand Down Expand Up @@ -322,7 +324,7 @@ class Porter(object):
if table == "user_directory_stream_pos":
# We need to make sure there is a single row, `(X, null), as that is
# what synapse expects to be there.
yield self.postgres_store.simple_insert(
yield self.postgres_store.db.simple_insert(
table=table, values={"stream_id": None}
)
self.progress.update(table, table_size) # Mark table as done
Expand Down Expand Up @@ -363,7 +365,9 @@ class Porter(object):

return headers, forward_rows, backward_rows

headers, frows, brows = yield self.sqlite_store.runInteraction("select", r)
headers, frows, brows = yield self.sqlite_store.db.runInteraction(
"select", r
)

if frows or brows:
if frows:
Expand All @@ -377,7 +381,7 @@ class Porter(object):
def insert(txn):
self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)

self.postgres_store.simple_update_one_txn(
self.postgres_store.db.simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
Expand Down Expand Up @@ -416,7 +420,7 @@ class Porter(object):

return headers, rows

headers, rows = yield self.sqlite_store.runInteraction("select", r)
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)

if rows:
forward_chunk = rows[-1][0] + 1
Expand All @@ -433,8 +437,8 @@ class Porter(object):
rows_dict = []
for row in rows:
d = dict(zip(headers, row))
if "\0" in d['value']:
logger.warning('dropping search row %s', d)
if "\0" in d["value"]:
logger.warning("dropping search row %s", d)
else:
rows_dict.append(d)

Expand All @@ -454,7 +458,7 @@ class Porter(object):
],
)

self.postgres_store.simple_update_one_txn(
self.postgres_store.db.simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": "event_search"},
Expand Down Expand Up @@ -504,35 +508,34 @@ class Porter(object):
self.progress.set_state("Preparing %s" % config["name"])
conn = self.setup_db(config, engine)

db_pool = adbapi.ConnectionPool(
config["name"], **config["args"]
)
db_pool = adbapi.ConnectionPool(config["name"], **config["args"])

hs = MockHomeserver(self.hs_config, engine, conn, db_pool)

store = Store(conn, hs)

yield store.runInteraction(
"%s_engine.check_database" % config["name"],
engine.check_database,
yield store.db.runInteraction(
"%s_engine.check_database" % config["name"], engine.check_database,
)

return store

@defer.inlineCallbacks
def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = yield self.postgres_store.has_completed_background_updates()
postgres_ready = (
yield self.postgres_store.db.updates.has_completed_background_updates()
)

if not postgres_ready:
# Only say that we're running background updates when there are background
# updates to run.
self.progress.set_state("Running background updates on PostgreSQL")

while not postgres_ready:
yield self.postgres_store.do_next_background_update(100)
yield self.postgres_store.db.updates.do_next_background_update(100)
postgres_ready = yield (
self.postgres_store.has_completed_background_updates()
self.postgres_store.db.updates.has_completed_background_updates()
)

@defer.inlineCallbacks
Expand All @@ -541,7 +544,9 @@ class Porter(object):
self.sqlite_store = yield self.build_db_store(self.sqlite_config)

# Check if all background updates are done, abort if not.
updates_complete = yield self.sqlite_store.has_completed_background_updates()
updates_complete = (
yield self.sqlite_store.db.updates.has_completed_background_updates()
)
if not updates_complete:
sys.stderr.write(
"Pending background updates exist in the SQLite3 database."
Expand Down Expand Up @@ -582,22 +587,22 @@ class Porter(object):
)

try:
yield self.postgres_store.runInteraction("alter_table", alter_table)
yield self.postgres_store.db.runInteraction("alter_table", alter_table)
except Exception:
# On Error Resume Next
pass

yield self.postgres_store.runInteraction(
yield self.postgres_store.db.runInteraction(
"create_port_table", create_port_table
)

# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store.simple_select_onecol(
sqlite_tables = yield self.sqlite_store.db.simple_select_onecol(
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
)

postgres_tables = yield self.postgres_store.simple_select_onecol(
postgres_tables = yield self.postgres_store.db.simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
Expand Down Expand Up @@ -687,11 +692,11 @@ class Porter(object):
rows = txn.fetchall()
headers = [column[0] for column in txn.description]

ts_ind = headers.index('ts')
ts_ind = headers.index("ts")

return headers, [r for r in rows if r[ts_ind] < yesterday]

headers, rows = yield self.sqlite_store.runInteraction("select", r)
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)

rows = self._convert_rows("sent_transactions", headers, rows)

Expand Down Expand Up @@ -724,7 +729,7 @@ class Porter(object):
next_chunk = yield self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)

yield self.postgres_store.simple_insert(
yield self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={
"table_name": "sent_transactions",
Expand All @@ -737,7 +742,7 @@ class Porter(object):
txn.execute(
"SELECT count(*) FROM sent_transactions" " WHERE ts >= ?", (yesterday,)
)
size, = txn.fetchone()
(size,) = txn.fetchone()
return int(size)

remaining_count = yield self.sqlite_store.execute(get_sent_table_size)
Expand Down Expand Up @@ -790,7 +795,7 @@ class Porter(object):
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))

return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
return self.postgres_store.db.runInteraction("setup_state_group_id_seq", r)


##############################################
Expand Down Expand Up @@ -871,7 +876,7 @@ class CursesProgress(Progress):
duration = int(now) - int(self.start_time)

minutes, seconds = divmod(duration, 60)
duration_str = '%02dm %02ds' % (minutes, seconds)
duration_str = "%02dm %02ds" % (minutes, seconds)

if self.finished:
status = "Time spent: %s (Done!)" % (duration_str,)
Expand All @@ -881,7 +886,7 @@ class CursesProgress(Progress):
left = float(self.total_remaining) / self.total_processed

est_remaining = (int(now) - self.start_time) * left
est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60)
est_remaining_str = "%02dm %02ds remaining" % divmod(est_remaining, 60)
else:
est_remaining_str = "Unknown"
status = "Time spent: %s (est. remaining: %s)" % (
Expand Down Expand Up @@ -967,7 +972,7 @@ if __name__ == "__main__":
description="A script to port an existing synapse SQLite database to"
" a new PostgreSQL database."
)
parser.add_argument("-v", action='store_true')
parser.add_argument("-v", action="store_true")
parser.add_argument(
"--sqlite-database",
required=True,
Expand All @@ -976,12 +981,12 @@ if __name__ == "__main__":
)
parser.add_argument(
"--postgres-config",
type=argparse.FileType('r'),
type=argparse.FileType("r"),
required=True,
help="The database config file for the PostgreSQL database",
)
parser.add_argument(
"--curses", action='store_true', help="display a curses based progress UI"
"--curses", action="store_true", help="display a curses based progress UI"
)

parser.add_argument(
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def handle_sighup(*args, **kwargs):

# It is now safe to start your Synapse.
hs.start_listening(listeners)
hs.get_datastore().start_profiling()
hs.get_datastore().db.start_profiling()

setup_sentry(hs)
setup_sdnotify(hs)
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def start():
_base.start(hs, config.listeners)

hs.get_pusherpool().start()
hs.get_datastore().start_doing_background_updates()
hs.get_datastore().db.updates.start_doing_background_updates()
except Exception:
# Print the exception and bail out.
print("Error during startup:", file=sys.stderr)
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/user_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(self, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(db_conn, hs)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.get_cache_dict(
curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
db_conn,
"current_state_delta_stream",
entity_column="room_id",
Expand Down
2 changes: 1 addition & 1 deletion synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,4 @@ def run_db_interaction(self, desc, func, *args, **kwargs):
Returns:
Deferred[object]: result of func
"""
return self._store.runInteraction(desc, func, *args, **kwargs)
return self._store.db.runInteraction(desc, func, *args, **kwargs)
2 changes: 1 addition & 1 deletion synapse/rest/media/v1/preview_url_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def _expire_url_cache_data(self):

logger.info("Running url preview cache expiry")

if not (yield self.store.has_completed_background_updates()):
if not (yield self.store.db.updates.has_completed_background_updates()):
logger.info("Still running DB updates; skipping expiry")
return

Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
"""
The storage layer is split up into multiple parts to allow Synapse to run
against different configurations of databases (e.g. single or multiple
databases). The `data_stores` are classes that talk directly to a single
database and have associated schemas, background updates, etc. On top of those
there are (or will be) classes that provide high level interfaces that combine
calls to multiple `data_stores`.
databases). The `Database` class represents a single physical database. The
`data_stores` are classes that talk directly to a `Database` instance and have
associated schemas, background updates, etc. On top of those there are classes
that provide high level interfaces that combine calls to multiple `data_stores`.
There are also schemas that get applied to every database, regardless of the
data stores associated with them (e.g. the schema version tables), which are
Expand Down
Loading

0 comments on commit f3ea2f5

Please sign in to comment.