Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed upgrader script causing incorrect database relationships #8163

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 237 additions & 16 deletions src/tribler/upgrade_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
from typing import TYPE_CHECKING

from configobj import ConfigObj
from pony.orm import db_session

if TYPE_CHECKING:
from tribler.tribler_config import TriblerConfigManager

FROM: str = "7.14"
TO: str = "8.0"

# ruff: noqa: N802,RUF015,W291


def _copy_if_not_exist(src: str, dst: str) -> None:
"""
Expand Down Expand Up @@ -86,7 +89,226 @@ def _import_7_14_settings(src: str, dst: TriblerConfigManager) -> None:
_copy_if_exists(old, "tunnel_community/max_circuits", dst, "tunnel_community/max_circuits")


def _inject_7_14_tables(src_db: str, dst_db: str) -> None:
def _inject_StatementOp(abs_src_db: str, abs_dst_db: str) -> None:
"""
Import old StatementOp entries.
"""
src_con = sqlite3.connect(abs_src_db)
output = list(src_con.execute("""SELECT SubjectResource.name, SubjectResource.type, ObjectResource.name,
ObjectResource.type, Statement.added_count, Statement.removed_count, Statement.local_operation, Peer.public_key,
Peer.added_at, StatementOp.operation, StatementOp.clock, StatementOp.signature, StatementOp.updated_at,
StatementOp.auto_generated
FROM StatementOp
INNER JOIN Peer ON StatementOp.peer=Peer.id
INNER JOIN Statement ON StatementOp.statement=Statement.id
INNER JOIN Resource AS SubjectResource ON Statement.subject=SubjectResource.id
INNER JOIN Resource AS ObjectResource ON Statement.object=ObjectResource.id
;"""))
src_con.close()

dst_con = sqlite3.connect(abs_dst_db)
with db_session:
for (subject_name, subject_type,
object_name, object_type,
stmt_added_count, stmt_removed_count, stmt_local_operation,
peer_public_key, peer_added_at,
stmtop_operation, stmtop_clock, stmtop_signature, stmtop_updated_at, stmtop_auto_generated) in output:
dst_con.execute("BEGIN")
try:
# Insert subject
results = list(dst_con.execute("SELECT id FROM Resource WHERE name=? AND type=?",
(subject_name, subject_type)))
if not results:
cursor = dst_con.execute("INSERT INTO Resource "
"VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), ?, ?)",
(subject_name, subject_type))
results = [(cursor.lastrowid, )]
subject_id, = results[0]

# Insert object
results = list(
dst_con.execute("SELECT id FROM Resource WHERE name=? AND type=?", (object_name, object_type)))
if not results:
cursor = dst_con.execute(
"INSERT INTO Resource VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), ?, ?)",
(object_name, object_type)
)
results = [(cursor.lastrowid, )]
object_id, = results[0]

# Insert statement
results = list(dst_con.execute("SELECT id FROM Statement WHERE object=? AND subject=?",
(object_id, subject_id)))
if not results:
cursor = dst_con.execute(
"INSERT INTO Statement VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Statement), ?, ?, ?, ?, ?)",
(subject_id, object_id, stmt_added_count, stmt_removed_count, stmt_local_operation)
)
results = [(cursor.lastrowid, )]
statement_id, = results[0]

# Insert peer
results = list(
dst_con.execute("SELECT id, added_at FROM Peer WHERE public_key=?", (peer_public_key, )))
if results and results[0][1] >= peer_added_at:
dst_con.execute("UPDATE Peer SET added_at=? WHERE public_key=?", (peer_added_at, peer_public_key))
results = [(results[0][0],)]
elif not results:
cursor = dst_con.execute(
"INSERT INTO Peer VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Peer), ?, ?)",
(peer_public_key, peer_added_at)
)
results = [(cursor.lastrowid, )]
else:
results = [(results[0][0], )]
peer_id, = results[0]

# Insert statement op
results = list(dst_con.execute("SELECT id FROM StatementOp WHERE statement=? AND peer=?",
(statement_id, peer_id)))
if not results:
dst_con.execute(
"INSERT INTO StatementOp VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM StatementOp), "
"?, ?, ?, ?, ?, ?, ?)",
(statement_id, peer_id, stmtop_operation, stmtop_clock, stmtop_signature, stmtop_updated_at,
stmtop_auto_generated))

dst_con.execute("COMMIT")
except sqlite3.DatabaseError as e:
dst_con.execute("ROLLBACK")
logging.exception(e)
dst_con.commit()
dst_con.close()


def _inject_ChannelNode(abs_src_db: str, abs_dst_db: str) -> None:
"""
Import old ChannelNode entries.
"""
src_con = sqlite3.connect(abs_src_db)
output = list(src_con.execute("""SELECT ChannelNode.infohash, ChannelNode.size, ChannelNode.torrent_date,
ChannelNode.tracker_info, ChannelNode.title, ChannelNode.tags, ChannelNode.metadata_type, ChannelNode.reserved_flags,
ChannelNode.origin_id, ChannelNode.public_key, ChannelNode.id_, ChannelNode.timestamp, ChannelNode.signature,
ChannelNode.added_on, ChannelNode.status, ChannelNode.xxx, ChannelNode.tag_processor_version, TorrentState.seeders,
TorrentState.leechers, TorrentState.last_check, TorrentState.self_checked, TorrentState.has_data
FROM ChannelNode
INNER JOIN TorrentState ON ChannelNode.health=TorrentState.rowid
;"""))
src_con.close()

dst_con = sqlite3.connect(abs_dst_db)
with db_session:
for (infohash, size, torrent_date, tracker_info, title, tags, metadata_type, reserved_flags,
origin_id, public_key, id_, timestamp, signature, added_on, status, xxx, tag_processor_version,
seeders, leechers, last_check, self_checked, has_data) in output:
dst_con.execute("BEGIN")
try:
# Insert subject
results = list(dst_con.execute("SELECT rowid FROM TorrentState WHERE infohash=?", (infohash, )))
if not results:
cursor = dst_con.execute("INSERT INTO TorrentState "
"VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TorrentState), "
"?, ?, ?, ?, ?, ?)",
(infohash, seeders, leechers, last_check, self_checked, has_data))
results = [(cursor.lastrowid, )]
health_id, = results[0]

# Insert channel ChannelNode
results = list(dst_con.execute("SELECT rowid FROM ChannelNode WHERE public_key=? AND id_=?",
(public_key, id_)))
if not results:
dst_con.execute(
"INSERT INTO ChannelNode VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM ChannelNode), "
"?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(infohash, size, torrent_date, tracker_info, title, tags, metadata_type, reserved_flags,
origin_id, public_key, id_, timestamp, signature, added_on, status, xxx, health_id,
tag_processor_version))

dst_con.execute("COMMIT")
except sqlite3.DatabaseError as e:
dst_con.execute("ROLLBACK")
logging.exception(e)
dst_con.commit()
dst_con.close()


def _inject_TrackerState(abs_src_db: str, abs_dst_db: str) -> None:
"""
Import old TrackerState entries.
"""
src_con = sqlite3.connect(abs_src_db)
output = list(src_con.execute("SELECT url, last_check, alive, failures FROM TrackerState;"))
src_con.close()

dst_con = sqlite3.connect(abs_dst_db)
with db_session:
for (url, last_check, alive, failures) in output:
dst_con.execute("BEGIN")
try:
results = list(dst_con.execute("SELECT rowid, last_check, alive, failures FROM TrackerState WHERE url=?", (url, )))
if results:
tracker_id, n_last_check, n_alive, n_failures = results[0]
s_last_check = max(n_last_check, last_check)
s_alive = alive if last_check > n_last_check else n_alive
s_failures = int(failures) + int(n_failures)
dst_con.execute(
"UPDATE TrackerState SET last_check=?, alive=?, failures=? WHERE rowid=?",
(s_last_check, s_alive, s_failures, tracker_id))
else:
dst_con.execute(
"INSERT INTO TrackerState VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TrackerState), "
"?, ?, ?, ?)",
(url, last_check, alive, failures))

dst_con.execute("COMMIT")
except sqlite3.DatabaseError as e:
dst_con.execute("ROLLBACK")
logging.exception(e)
dst_con.commit()
dst_con.close()


def _inject_TorrentState_TrackerState(abs_src_db: str, abs_dst_db: str) -> None:
"""
Import old TorrentState_TrackerState entries.
"""
src_con = sqlite3.connect(abs_src_db)
output = list(src_con.execute("""SELECT TorrentState.infohash, TrackerState.url
FROM TorrentState_TrackerState
INNER JOIN TorrentState ON TorrentState_TrackerState.torrentstate=TorrentState.rowid
INNER JOIN TrackerState ON TorrentState_TrackerState.trackerstate=TrackerState.rowid
;"""))
src_con.close()

dst_con = sqlite3.connect(abs_dst_db)
with db_session:
for (infohash, url) in output:
dst_con.execute("BEGIN")
try:
results = list(dst_con.execute("""SELECT TorrentState.infohash, TrackerState.url
FROM TorrentState_TrackerState
INNER JOIN TorrentState ON TorrentState_TrackerState.torrentstate=TorrentState.rowid
INNER JOIN TrackerState ON TorrentState_TrackerState.trackerstate=TrackerState.rowid
WHERE TorrentState.infohash=? AND TrackerState.url=?
;""", (infohash, url)))
if not results:
# Note: both the tracker and torrent state should've been imported already
torrent_state, = list(dst_con.execute("SELECT rowid FROM TorrentState WHERE infohash=?",
(infohash,)))[0]
tracker_state, = list(dst_con.execute("SELECT rowid FROM TrackerState WHERE url=?",
(url,)))[0]
dst_con.execute("INSERT INTO TorrentState_TrackerState VALUES (?, ?)",
(torrent_state, tracker_state))

dst_con.execute("COMMIT")
except sqlite3.DatabaseError as e:
dst_con.execute("ROLLBACK")
logging.exception(e)
dst_con.commit()
dst_con.close()


def _inject_7_14_tables(src_db: str, dst_db: str, db_format: str) -> None:
"""
Fetch data from the old database and attempt to insert it into a new one.
"""
Expand All @@ -100,20 +322,17 @@ def _inject_7_14_tables(src_db: str, dst_db: str) -> None:
return

# If they both exist, we have to inject data.
src_con = sqlite3.connect(os.path.abspath(src_db))
insert_script = list(src_con.iterdump())
src_con.close()
assert db_format in ["tribler.db", "metadata.db"]

from pony.orm import db_session
dst_con = sqlite3.connect(os.path.abspath(dst_db))
with db_session:
for line in insert_script:
try:
dst_con.execute(line)
except sqlite3.DatabaseError as e:
logging.exception(e)
dst_con.commit() # This should be part of the dump already but just to be sure.
dst_con.close()
abs_src_db = os.path.abspath(src_db)
abs_dst_db = os.path.abspath(dst_db)

if db_format == "tribler.db":
_inject_StatementOp(abs_src_db, abs_dst_db)
else:
_inject_ChannelNode(abs_src_db, abs_dst_db)
_inject_TrackerState(abs_src_db, abs_dst_db)
_inject_TorrentState_TrackerState(abs_src_db, abs_dst_db)


def upgrade(config: TriblerConfigManager, source: str, destination: str) -> None:
Expand Down Expand Up @@ -141,11 +360,13 @@ def upgrade(config: TriblerConfigManager, source: str, destination: str) -> None
# Step 3: Copy tribler db.
os.makedirs(os.path.join(destination, "sqlite"), exist_ok=True)
_inject_7_14_tables(os.path.join(source, "sqlite", "tribler.db"),
os.path.join(destination, "sqlite", "tribler.db"))
os.path.join(destination, "sqlite", "tribler.db"),
"tribler.db")

# Step 4: Copy metadata db.
_inject_7_14_tables(os.path.join(source, "sqlite", "metadata.db"),
os.path.join(destination, "sqlite", "metadata.db"))
os.path.join(destination, "sqlite", "metadata.db"),
"metadata.db")

# Step 5: Signal that our upgrade is done.
with open(os.path.join(config.get_version_state_dir(), ".upgraded"), "a"):
Expand Down