From e78af4d65a3125514e7bf73768b7a5ba012e1cc5 Mon Sep 17 00:00:00 2001 From: Quinten Stokkink Date: Wed, 18 Sep 2024 15:54:01 +0200 Subject: [PATCH] Fixed upgrader script causing incorrect database relationships --- src/tribler/upgrade_script.py | 253 +++++++++++++++++++++++++++++++--- 1 file changed, 237 insertions(+), 16 deletions(-) diff --git a/src/tribler/upgrade_script.py b/src/tribler/upgrade_script.py index 8a97dc1e13..2647220d37 100644 --- a/src/tribler/upgrade_script.py +++ b/src/tribler/upgrade_script.py @@ -17,6 +17,7 @@ from typing import TYPE_CHECKING from configobj import ConfigObj +from pony.orm import db_session if TYPE_CHECKING: from tribler.tribler_config import TriblerConfigManager @@ -24,6 +25,8 @@ FROM: str = "7.14" TO: str = "8.0" +# ruff: noqa: N802,RUF015,W291 + def _copy_if_not_exist(src: str, dst: str) -> None: """ @@ -86,7 +89,226 @@ def _import_7_14_settings(src: str, dst: TriblerConfigManager) -> None: _copy_if_exists(old, "tunnel_community/max_circuits", dst, "tunnel_community/max_circuits") -def _inject_7_14_tables(src_db: str, dst_db: str) -> None: +def _inject_StatementOp(abs_src_db: str, abs_dst_db: str) -> None: + """ + Import old StatementOp entries. + """ + src_con = sqlite3.connect(abs_src_db) + output = list(src_con.execute("""SELECT SubjectResource.name, SubjectResource.type, ObjectResource.name, +ObjectResource.type, Statement.added_count, Statement.removed_count, Statement.local_operation, Peer.public_key, +Peer.added_at, StatementOp.operation, StatementOp.clock, StatementOp.signature, StatementOp.updated_at, +StatementOp.auto_generated +FROM StatementOp +INNER JOIN Peer ON StatementOp.peer=Peer.id +INNER JOIN Statement ON StatementOp.statement=Statement.id +INNER JOIN Resource AS SubjectResource ON Statement.subject=SubjectResource.id +INNER JOIN Resource AS ObjectResource ON Statement.object=ObjectResource.id +;""")) + src_con.close() + + dst_con = sqlite3.connect(abs_dst_db) + with db_session: + for (subject_name, subject_type, + object_name, object_type, + stmt_added_count, stmt_removed_count, stmt_local_operation, + peer_public_key, peer_added_at, + stmtop_operation, stmtop_clock, stmtop_signature, stmtop_updated_at, stmtop_auto_generated) in output: + dst_con.execute("BEGIN") + try: + # Insert subject + results = list(dst_con.execute("SELECT id FROM Resource WHERE name=? AND type=?", + (subject_name, subject_type))) + if not results: + cursor = dst_con.execute("INSERT INTO Resource " + "VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), ?, ?)", + (subject_name, subject_type)) + results = [(cursor.lastrowid, )] + subject_id, = results[0] + + # Insert object + results = list( + dst_con.execute("SELECT id FROM Resource WHERE name=? AND type=?", (object_name, object_type))) + if not results: + cursor = dst_con.execute( + "INSERT INTO Resource VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), ?, ?)", + (object_name, object_type) + ) + results = [(cursor.lastrowid, )] + object_id, = results[0] + + # Insert statement + results = list(dst_con.execute("SELECT id FROM Statement WHERE object=? AND subject=?", + (object_id, subject_id))) + if not results: + cursor = dst_con.execute( + "INSERT INTO Statement VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Statement), ?, ?, ?, ?, ?)", + (subject_id, object_id, stmt_added_count, stmt_removed_count, stmt_local_operation) + ) + results = [(cursor.lastrowid, )] + statement_id, = results[0] + + # Insert peer + results = list( + dst_con.execute("SELECT id, added_at FROM Peer WHERE public_key=?", (peer_public_key, ))) + if results and results[0][1] >= peer_added_at: + dst_con.execute("UPDATE Peer SET added_at=? WHERE public_key=?", (peer_added_at, peer_public_key)) + results = [(results[0][0],)] + elif not results: + cursor = dst_con.execute( + "INSERT INTO Peer VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Peer), ?, ?)", + (peer_public_key, peer_added_at) + ) + results = [(cursor.lastrowid, )] + else: + results = [(results[0][0], )] + peer_id, = results[0] + + # Insert statement op + results = list(dst_con.execute("SELECT id FROM StatementOp WHERE statement=? AND peer=?", + (statement_id, peer_id))) + if not results: + dst_con.execute( + "INSERT INTO StatementOp VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM StatementOp), " + "?, ?, ?, ?, ?, ?, ?)", + (statement_id, peer_id, stmtop_operation, stmtop_clock, stmtop_signature, stmtop_updated_at, + stmtop_auto_generated)) + + dst_con.execute("COMMIT") + except sqlite3.DatabaseError as e: + dst_con.execute("ROLLBACK") + logging.exception(e) + dst_con.commit() + dst_con.close() + + +def _inject_ChannelNode(abs_src_db: str, abs_dst_db: str) -> None: + """ + Import old ChannelNode entries. + """ + src_con = sqlite3.connect(abs_src_db) + output = list(src_con.execute("""SELECT ChannelNode.infohash, ChannelNode.size, ChannelNode.torrent_date, +ChannelNode.tracker_info, ChannelNode.title, ChannelNode.tags, ChannelNode.metadata_type, ChannelNode.reserved_flags, +ChannelNode.origin_id, ChannelNode.public_key, ChannelNode.id_, ChannelNode.timestamp, ChannelNode.signature, +ChannelNode.added_on, ChannelNode.status, ChannelNode.xxx, ChannelNode.tag_processor_version, TorrentState.seeders, +TorrentState.leechers, TorrentState.last_check, TorrentState.self_checked, TorrentState.has_data +FROM ChannelNode +INNER JOIN TorrentState ON ChannelNode.health=TorrentState.rowid +;""")) + src_con.close() + + dst_con = sqlite3.connect(abs_dst_db) + with db_session: + for (infohash, size, torrent_date, tracker_info, title, tags, metadata_type, reserved_flags, + origin_id, public_key, id_, timestamp, signature, added_on, status, xxx, tag_processor_version, + seeders, leechers, last_check, self_checked, has_data) in output: + dst_con.execute("BEGIN") + try: + # Insert subject + results = list(dst_con.execute("SELECT rowid FROM TorrentState WHERE infohash=?", (infohash, ))) + if not results: + cursor = dst_con.execute("INSERT INTO TorrentState " + "VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TorrentState), " + "?, ?, ?, ?, ?, ?)", + (infohash, seeders, leechers, last_check, self_checked, has_data)) + results = [(cursor.lastrowid, )] + health_id, = results[0] + + # Insert channel ChannelNode + results = list(dst_con.execute("SELECT rowid FROM ChannelNode WHERE public_key=? AND id_=?", + (public_key, id_))) + if not results: + dst_con.execute( + "INSERT INTO ChannelNode VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM ChannelNode), " + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (infohash, size, torrent_date, tracker_info, title, tags, metadata_type, reserved_flags, + origin_id, public_key, id_, timestamp, signature, added_on, status, xxx, health_id, + tag_processor_version)) + + dst_con.execute("COMMIT") + except sqlite3.DatabaseError as e: + dst_con.execute("ROLLBACK") + logging.exception(e) + dst_con.commit() + dst_con.close() + + +def _inject_TrackerState(abs_src_db: str, abs_dst_db: str) -> None: + """ + Import old TrackerState entries. + """ + src_con = sqlite3.connect(abs_src_db) + output = list(src_con.execute("SELECT url, last_check, alive, failures FROM TrackerState;")) + src_con.close() + + dst_con = sqlite3.connect(abs_dst_db) + with db_session: + for (url, last_check, alive, failures) in output: + dst_con.execute("BEGIN") + try: + results = list(dst_con.execute("SELECT rowid, last_check, alive, failures FROM TrackerState WHERE url=?", (url, ))) + if results: + tracker_id, n_last_check, n_alive, n_failures = results[0] + s_last_check = max(n_last_check, last_check) + s_alive = alive if last_check > n_last_check else n_alive + s_failures = int(failures) + int(n_failures) + dst_con.execute( + "UPDATE TrackerState SET last_check=?, alive=?, failures=? WHERE rowid=?", + (s_last_check, s_alive, s_failures, tracker_id)) + else: + dst_con.execute( + "INSERT INTO TrackerState VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TrackerState), " + "?, ?, ?, ?)", + (url, last_check, alive, failures)) + + dst_con.execute("COMMIT") + except sqlite3.DatabaseError as e: + dst_con.execute("ROLLBACK") + logging.exception(e) + dst_con.commit() + dst_con.close() + + +def _inject_TorrentState_TrackerState(abs_src_db: str, abs_dst_db: str) -> None: + """ + Import old TorrentState_TrackerState entries. + """ + src_con = sqlite3.connect(abs_src_db) + output = list(src_con.execute("""SELECT TorrentState.infohash, TrackerState.url +FROM TorrentState_TrackerState +INNER JOIN TorrentState ON TorrentState_TrackerState.torrentstate=TorrentState.rowid +INNER JOIN TrackerState ON TorrentState_TrackerState.trackerstate=TrackerState.rowid +;""")) + src_con.close() + + dst_con = sqlite3.connect(abs_dst_db) + with db_session: + for (infohash, url) in output: + dst_con.execute("BEGIN") + try: + results = list(dst_con.execute("""SELECT TorrentState.infohash, TrackerState.url +FROM TorrentState_TrackerState +INNER JOIN TorrentState ON TorrentState_TrackerState.torrentstate=TorrentState.rowid +INNER JOIN TrackerState ON TorrentState_TrackerState.trackerstate=TrackerState.rowid +WHERE TorrentState.infohash=? AND TrackerState.url=? +;""", (infohash, url))) + if not results: + # Note: both the tracker and torrent state should've been imported already + torrent_state, = list(dst_con.execute("SELECT rowid FROM TorrentState WHERE infohash=?", + (infohash,)))[0] + tracker_state, = list(dst_con.execute("SELECT rowid FROM TrackerState WHERE url=?", + (url,)))[0] + dst_con.execute("INSERT INTO TorrentState_TrackerState VALUES (?, ?)", + (torrent_state, tracker_state)) + + dst_con.execute("COMMIT") + except sqlite3.DatabaseError as e: + dst_con.execute("ROLLBACK") + logging.exception(e) + dst_con.commit() + dst_con.close() + + +def _inject_7_14_tables(src_db: str, dst_db: str, db_format: str) -> None: """ Fetch data from the old database and attempt to insert it into a new one. """ @@ -100,20 +322,17 @@ def _inject_7_14_tables(src_db: str, dst_db: str) -> None: return # If they both exist, we have to inject data. - src_con = sqlite3.connect(os.path.abspath(src_db)) - insert_script = list(src_con.iterdump()) - src_con.close() + assert db_format in ["tribler.db", "metadata.db"] - from pony.orm import db_session - dst_con = sqlite3.connect(os.path.abspath(dst_db)) - with db_session: - for line in insert_script: - try: - dst_con.execute(line) - except sqlite3.DatabaseError as e: - logging.exception(e) - dst_con.commit() # This should be part of the dump already but just to be sure. - dst_con.close() + abs_src_db = os.path.abspath(src_db) + abs_dst_db = os.path.abspath(dst_db) + + if db_format == "tribler.db": + _inject_StatementOp(abs_src_db, abs_dst_db) + else: + _inject_ChannelNode(abs_src_db, abs_dst_db) + _inject_TrackerState(abs_src_db, abs_dst_db) + _inject_TorrentState_TrackerState(abs_src_db, abs_dst_db) def upgrade(config: TriblerConfigManager, source: str, destination: str) -> None: @@ -141,11 +360,13 @@ def upgrade(config: TriblerConfigManager, source: str, destination: str) -> None # Step 3: Copy tribler db. os.makedirs(os.path.join(destination, "sqlite"), exist_ok=True) _inject_7_14_tables(os.path.join(source, "sqlite", "tribler.db"), - os.path.join(destination, "sqlite", "tribler.db")) + os.path.join(destination, "sqlite", "tribler.db"), + "tribler.db") # Step 4: Copy metadata db. _inject_7_14_tables(os.path.join(source, "sqlite", "metadata.db"), - os.path.join(destination, "sqlite", "metadata.db")) + os.path.join(destination, "sqlite", "metadata.db"), + "metadata.db") # Step 5: Signal that our upgrade is done. with open(os.path.join(config.get_version_state_dir(), ".upgraded"), "a"):