Skip to content

Commit

Permalink
Put hashed videos into new queue table, move vptree into new separate…
Browse files Browse the repository at this point in the history
… step
  • Loading branch information
ianwal committed Jul 21, 2024
1 parent 2ecf26c commit e188193
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 35 deletions.
57 changes: 47 additions & 10 deletions src/hydrusvideodeduplicator/db/DedupeDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,8 @@ def get_db_stats_old() -> DatabaseStats:

def get_db_stats(db: DedupeDb) -> DatabaseStats:
"""Get some database stats."""
num_videos = len(
db.execute(
"SELECT hash_id FROM files WHERE hash_id IN (SELECT hash_id FROM shape_perceptual_hash_map)"
).fetchall()
)
# TODO: We don't need to get the file hashes. We just need the length.
num_videos = len(db.get_phashed_files())
file_size = os.path.getsize(get_db_file_path())
return DatabaseStats(num_videos, file_size)

Expand Down Expand Up @@ -312,6 +309,7 @@ def create_tables(self):
# old:

# videos table (this is the sqlitedict schema)
# TODO: Remove this table.
self.execute("CREATE TABLE IF NOT EXISTS videos (key TEXT PRIMARY KEY, value BLOB)")

# new:
Expand Down Expand Up @@ -339,6 +337,16 @@ def create_tables(self):
self.execute(
"CREATE TABLE IF NOT EXISTS shape_search_cache ( hash_id INTEGER PRIMARY KEY, searched_distance INTEGER )"
)

# vptree insert queue. this is the list of files and their phashes that need to be inserted into the vptree.
# when entries are added to this queue they don't exist at all in the other tables. they don't have a hash_id
# or phash_id yet, unless those already exist from other files.
# this is just a table to store the phashes until they are properly inserted into the vptree, since inserting
# can take a while.
self.execute(
"CREATE TABLE IF NOT EXISTS phashed_file_queue ( file_hash BLOB_BYTES NOT NULL UNIQUE, phash BLOB_BYTES NOT NULL, PRIMARY KEY ( file_hash, phash ) )" # noqa: E501
)

# TODO: We don't need this I don't think.
# self.conn.execute(
# "CREATE TABLE IF NOT EXISTS pixel_hash_map ( hash_id INTEGER, pixel_hash_id INTEGER, PRIMARY KEY ( hash_id, pixel_hash_id ) )" # noqa: E501
Expand Down Expand Up @@ -388,10 +396,25 @@ def add_perceptual_hash(self, perceptual_hash: str) -> int:
assert isinstance(result, int)
return result

def add_to_phashed_files_queue(self, file_hash: str, perceptual_hash: str):
"""
Add a file and its corresponding perceptual hash to the queue to be inserted into the vptree.
We keep the queue of files to be inserted in the vptree in a separate table to avoid any potential issues
with assumptions of what needs to exist when/where for vptree operations.
If the file hash is already in the queue, it will be replaced with the new perceptual hash.
"""
self.execute(
"REPLACE INTO phashed_file_queue ( file_hash, phash ) VALUES ( :file_hash, :phash )",
{"file_hash": file_hash, "phash": perceptual_hash},
)

def associate_file_with_perceptual_hash(self, file_hash: str, perceptual_hash: str):
"""
Associate a file with a perceptual hash in the database. If the file already has a perceptual hash, it will be
overwritten.
Associate a file with a perceptual hash in the database.
This will insert the file into the VpTree.
If the file already has a perceptual hash, it will be overwritten.
Note:
Perceptual hashes are not unique for each file.
Expand Down Expand Up @@ -503,6 +526,18 @@ def get_file_hash(self, hash_id: str) -> str | None:
(file_hash,) = result
return file_hash

def get_phashed_files(self) -> list[str]:
"""Get the file hashes of all files that are phashed. This includes the files in the phashed_file_queue."""
all_phashed_files_query = (
"SELECT file_hash FROM files "
"WHERE hash_id IN (SELECT hash_id FROM shape_perceptual_hash_map) "
"UNION "
"SELECT file_hash FROM phashed_file_queue"
)
all_phashed_files = self.execute(all_phashed_files_query)
all_phashed_files = [row[0] for row in all_phashed_files]
return all_phashed_files

"""
Misc
"""
Expand Down Expand Up @@ -558,6 +593,10 @@ def print_upgrade(version: str, new_version: str):
"CREATE TABLE IF NOT EXISTS shape_search_cache ( hash_id INTEGER PRIMARY KEY, searched_distance INTEGER )" # noqa: E501
)

self.execute(
"CREATE TABLE IF NOT EXISTS phashed_file_queue ( file_hash BLOB_BYTES NOT NULL UNIQUE, phash BLOB_BYTES NOT NULL, PRIMARY KEY ( file_hash, phash ) )" # noqa: E501
)

# Insert the files from the old videos table into the DB and the newly added vptree.
old_videos_data = []
with SqliteDict(
Expand All @@ -580,9 +619,7 @@ def print_upgrade(version: str, new_version: str):
for video_hash, perceptual_hash in old_videos_data:
# TODO: If these functions change this upgrade may not work! We need to be careful about updating them. # noqa: E501
# An upgrade cutoff at some point to prevent bitrot is a good idea, which is what Hydrus does.
self.add_file(video_hash)
self.add_perceptual_hash(perceptual_hash)
self.associate_file_with_perceptual_hash(video_hash, perceptual_hash)
self.add_to_phashed_files_queue(video_hash, perceptual_hash)
pbar.update(1)

self.set_version("0.6.9")
Expand Down
55 changes: 30 additions & 25 deletions src/hydrusvideodeduplicator/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def deduplicate(
print(f"[blue] Found {len(video_hashes)} eligible files to perceptually hash.")
self.add_perceptual_hashes_to_db(video_hashes)

# Insert the perceptual hashed files into the vptree.
self.process_phashed_file_queue()

# Number of potential duplicates before adding more.
# This is just to print info for the user.
# Note: This will be inaccurate if the user searches for duplicates in the Hydrus client
Expand Down Expand Up @@ -173,26 +176,9 @@ def filter_unhashed(self, file_hashes: list[FileHash]) -> list[FileHash]:
"""
Get only the files that have not been perceptually hashed in the db from a list of files.
"""

# new:
all_phashed_files = self.db.execute(
"SELECT file_hash FROM files WHERE hash_id IN (SELECT hash_id FROM shape_perceptual_hash_map)"
).fetchall()

all_phashed_files = [row[0] for row in all_phashed_files]

all_phashed_files = self.db.get_phashed_files()
return [file_hash for file_hash in file_hashes if file_hash not in all_phashed_files]

# old:
# with SqliteDict(
# str(DedupeDB.get_db_file_path()), tablename="videos", flag="r", outer_stack=False
# ) as videos_table:
# return [
# file_hash
# for file_hash in file_hashes
# if file_hash not in videos_table or "perceptual_hash" not in videos_table[file_hash]
# ]

def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> None:
"""
Retrieves the video from Hydrus,
Expand All @@ -204,7 +190,13 @@ def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> None:
self.hydlog.info("Starting perceptual hash processing")
try:
with (
tqdm(total=len(video_hashes), dynamic_ncols=True, unit="video", colour="BLUE") as pbar,
tqdm(
total=len(video_hashes),
desc="Perceptually hashing files",
dynamic_ncols=True,
unit="video",
colour="BLUE",
) as pbar,
Parallel(n_jobs=self.job_count, return_as="generator_unordered") as parallel,
):
result_generator = parallel(
Expand All @@ -217,12 +209,7 @@ def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> None:
failed_hash_count += 1
pbar.update(1)
continue
self.db.add_file(result.file_hash)
self.db.add_perceptual_hash(result.perceptual_hash)
self.db.associate_file_with_perceptual_hash(result.file_hash, result.perceptual_hash)
# We don't want files to exist in the database without a perceptual hash because we don't
# have proper error checking right now for this in vptree.
# So we need to wait to commit until after all the above is done.
self.db.add_to_phashed_files_queue(result.file_hash, result.perceptual_hash)
self.db.commit()

success_hash_count += 1
Expand Down Expand Up @@ -271,6 +258,24 @@ def mark_videos_as_duplicates(self, video1_hash: str, video2_hash: str):

self.client.client.set_file_relationships([new_relationship])

def process_phashed_file_queue(self):
"""
Process the files in the phashed files queue.
This inserts the queue entries into their respective tables and then inserts the file into the vptree.
"""
results = self.db.execute("SELECT file_hash, phash FROM phashed_file_queue").fetchall()
for file_hash, perceptual_hash in tqdm(
results, dynamic_ncols=True, total=len(results), desc="Building vptree", unit="file", colour="BLUE"
):
self.db.add_file(file_hash)
self.db.add_perceptual_hash(perceptual_hash)
self.db.associate_file_with_perceptual_hash(file_hash, perceptual_hash)
self.db.execute(
"DELETE FROM phashed_file_queue WHERE file_hash = :file_hash AND phash = :phash",
{"file_hash": file_hash, "phash": perceptual_hash},
)
self.db.commit()

def find_potential_duplicates(
self,
) -> None:
Expand Down

0 comments on commit e188193

Please sign in to comment.