From e188193696e15e3d82038424dba75837a4fd1a8e Mon Sep 17 00:00:00 2001 From: ianwal <52143079+ianwal@users.noreply.github.com> Date: Sun, 21 Jul 2024 14:32:42 -0700 Subject: [PATCH] Put hashed videos into new queue table, move vptree into new separate step --- src/hydrusvideodeduplicator/db/DedupeDB.py | 57 ++++++++++++++++++---- src/hydrusvideodeduplicator/dedup.py | 55 +++++++++++---------- 2 files changed, 77 insertions(+), 35 deletions(-) diff --git a/src/hydrusvideodeduplicator/db/DedupeDB.py b/src/hydrusvideodeduplicator/db/DedupeDB.py index 1f75591..640c9b0 100644 --- a/src/hydrusvideodeduplicator/db/DedupeDB.py +++ b/src/hydrusvideodeduplicator/db/DedupeDB.py @@ -221,11 +221,8 @@ def get_db_stats_old() -> DatabaseStats: def get_db_stats(db: DedupeDb) -> DatabaseStats: """Get some database stats.""" - num_videos = len( - db.execute( - "SELECT hash_id FROM files WHERE hash_id IN (SELECT hash_id FROM shape_perceptual_hash_map)" - ).fetchall() - ) + # TODO: We don't need to get the file hashes. We just need the length. + num_videos = len(db.get_phashed_files()) file_size = os.path.getsize(get_db_file_path()) return DatabaseStats(num_videos, file_size) @@ -312,6 +309,7 @@ def create_tables(self): # old: # videos table (this is the sqlitedict schema) + # TODO: Remove this table. self.execute("CREATE TABLE IF NOT EXISTS videos (key TEXT PRIMARY KEY, value BLOB)") # new: @@ -339,6 +337,16 @@ def create_tables(self): self.execute( "CREATE TABLE IF NOT EXISTS shape_search_cache ( hash_id INTEGER PRIMARY KEY, searched_distance INTEGER )" ) + + # vptree insert queue. this is the list of files and their phashes that need to be inserted into the vptree. + # when entries are added to this queue they don't exist at all in the other tables. they don't have a hash_id + # or phash_id yet, unless those already exist from other files. + # this is just a table to store the phashes until they are properly inserted into the vptree, since inserting + # can take a while. + self.execute( + "CREATE TABLE IF NOT EXISTS phashed_file_queue ( file_hash BLOB_BYTES NOT NULL UNIQUE, phash BLOB_BYTES NOT NULL, PRIMARY KEY ( file_hash, phash ) )" # noqa: E501 + ) + # TODO: We don't need this I don't think. # self.conn.execute( # "CREATE TABLE IF NOT EXISTS pixel_hash_map ( hash_id INTEGER, pixel_hash_id INTEGER, PRIMARY KEY ( hash_id, pixel_hash_id ) )" # noqa: E501 @@ -388,10 +396,25 @@ def add_perceptual_hash(self, perceptual_hash: str) -> int: assert isinstance(result, int) return result + def add_to_phashed_files_queue(self, file_hash: str, perceptual_hash: str): + """ + Add a file and its corresponding perceptual hash to the queue to be inserted into the vptree. + + We keep the queue of files to be inserted in the vptree in a separate table to avoid any potential issues + with assumptions of what needs to exist when/where for vptree operations. + + If the file hash is already in the queue, it will be replaced with the new perceptual hash. + """ + self.execute( + "REPLACE INTO phashed_file_queue ( file_hash, phash ) VALUES ( :file_hash, :phash )", + {"file_hash": file_hash, "phash": perceptual_hash}, + ) + def associate_file_with_perceptual_hash(self, file_hash: str, perceptual_hash: str): """ - Associate a file with a perceptual hash in the database. If the file already has a perceptual hash, it will be - overwritten. + Associate a file with a perceptual hash in the database. + This will insert the file into the VpTree. + If the file already has a perceptual hash, it will be overwritten. Note: Perceptual hashes are not unique for each file. @@ -503,6 +526,18 @@ def get_file_hash(self, hash_id: str) -> str | None: (file_hash,) = result return file_hash + def get_phashed_files(self) -> list[str]: + """Get the file hashes of all files that are phashed. This includes the files in the phashed_file_queue.""" + all_phashed_files_query = ( + "SELECT file_hash FROM files " + "WHERE hash_id IN (SELECT hash_id FROM shape_perceptual_hash_map) " + "UNION " + "SELECT file_hash FROM phashed_file_queue" + ) + all_phashed_files = self.execute(all_phashed_files_query) + all_phashed_files = [row[0] for row in all_phashed_files] + return all_phashed_files + """ Misc """ @@ -558,6 +593,10 @@ def print_upgrade(version: str, new_version: str): "CREATE TABLE IF NOT EXISTS shape_search_cache ( hash_id INTEGER PRIMARY KEY, searched_distance INTEGER )" # noqa: E501 ) + self.execute( + "CREATE TABLE IF NOT EXISTS phashed_file_queue ( file_hash BLOB_BYTES NOT NULL UNIQUE, phash BLOB_BYTES NOT NULL, PRIMARY KEY ( file_hash, phash ) )" # noqa: E501 + ) + # Insert the files from the old videos table into the DB and the newly added vptree. old_videos_data = [] with SqliteDict( @@ -580,9 +619,7 @@ def print_upgrade(version: str, new_version: str): for video_hash, perceptual_hash in old_videos_data: # TODO: If these functions change this upgrade may not work! We need to be careful about updating them. # noqa: E501 # An upgrade cutoff at some point to prevent bitrot is a good idea, which is what Hydrus does. - self.add_file(video_hash) - self.add_perceptual_hash(perceptual_hash) - self.associate_file_with_perceptual_hash(video_hash, perceptual_hash) + self.add_to_phashed_files_queue(video_hash, perceptual_hash) pbar.update(1) self.set_version("0.6.9") diff --git a/src/hydrusvideodeduplicator/dedup.py b/src/hydrusvideodeduplicator/dedup.py index 833926c..628a577 100644 --- a/src/hydrusvideodeduplicator/dedup.py +++ b/src/hydrusvideodeduplicator/dedup.py @@ -106,6 +106,9 @@ def deduplicate( print(f"[blue] Found {len(video_hashes)} eligible files to perceptually hash.") self.add_perceptual_hashes_to_db(video_hashes) + # Insert the perceptual hashed files into the vptree. + self.process_phashed_file_queue() + # Number of potential duplicates before adding more. # This is just to print info for the user. # Note: This will be inaccurate if the user searches for duplicates in the Hydrus client @@ -173,26 +176,9 @@ def filter_unhashed(self, file_hashes: list[FileHash]) -> list[FileHash]: """ Get only the files that have not been perceptually hashed in the db from a list of files. """ - - # new: - all_phashed_files = self.db.execute( - "SELECT file_hash FROM files WHERE hash_id IN (SELECT hash_id FROM shape_perceptual_hash_map)" - ).fetchall() - - all_phashed_files = [row[0] for row in all_phashed_files] - + all_phashed_files = self.db.get_phashed_files() return [file_hash for file_hash in file_hashes if file_hash not in all_phashed_files] - # old: - # with SqliteDict( - # str(DedupeDB.get_db_file_path()), tablename="videos", flag="r", outer_stack=False - # ) as videos_table: - # return [ - # file_hash - # for file_hash in file_hashes - # if file_hash not in videos_table or "perceptual_hash" not in videos_table[file_hash] - # ] - def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> None: """ Retrieves the video from Hydrus, @@ -204,7 +190,13 @@ def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> None: self.hydlog.info("Starting perceptual hash processing") try: with ( - tqdm(total=len(video_hashes), dynamic_ncols=True, unit="video", colour="BLUE") as pbar, + tqdm( + total=len(video_hashes), + desc="Perceptually hashing files", + dynamic_ncols=True, + unit="video", + colour="BLUE", + ) as pbar, Parallel(n_jobs=self.job_count, return_as="generator_unordered") as parallel, ): result_generator = parallel( @@ -217,12 +209,7 @@ def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> None: failed_hash_count += 1 pbar.update(1) continue - self.db.add_file(result.file_hash) - self.db.add_perceptual_hash(result.perceptual_hash) - self.db.associate_file_with_perceptual_hash(result.file_hash, result.perceptual_hash) - # We don't want files to exist in the database without a perceptual hash because we don't - # have proper error checking right now for this in vptree. - # So we need to wait to commit until after all the above is done. + self.db.add_to_phashed_files_queue(result.file_hash, result.perceptual_hash) self.db.commit() success_hash_count += 1 @@ -271,6 +258,24 @@ def mark_videos_as_duplicates(self, video1_hash: str, video2_hash: str): self.client.client.set_file_relationships([new_relationship]) + def process_phashed_file_queue(self): + """ + Process the files in the phashed files queue. + This inserts the queue entries into their respective tables and then inserts the file into the vptree. + """ + results = self.db.execute("SELECT file_hash, phash FROM phashed_file_queue").fetchall() + for file_hash, perceptual_hash in tqdm( + results, dynamic_ncols=True, total=len(results), desc="Building vptree", unit="file", colour="BLUE" + ): + self.db.add_file(file_hash) + self.db.add_perceptual_hash(perceptual_hash) + self.db.associate_file_with_perceptual_hash(file_hash, perceptual_hash) + self.db.execute( + "DELETE FROM phashed_file_queue WHERE file_hash = :file_hash AND phash = :phash", + {"file_hash": file_hash, "phash": perceptual_hash}, + ) + self.db.commit() + def find_potential_duplicates( self, ) -> None: