From 7f7bee91705aafcc706fe2324d4905916dc7d39d Mon Sep 17 00:00:00 2001 From: ianwal <52143079+ianwal@users.noreply.github.com> Date: Sun, 21 Jul 2024 17:12:13 -0700 Subject: [PATCH] Fix concurrent hashing This was broken due to the newly added db member variable. I moved everything to a FileHasher class that is able to be serialized by joblib. --- src/hydrusvideodeduplicator/__main__.py | 4 - src/hydrusvideodeduplicator/dedup.py | 187 ++++++++++++++---------- 2 files changed, 112 insertions(+), 79 deletions(-) diff --git a/src/hydrusvideodeduplicator/__main__.py b/src/hydrusvideodeduplicator/__main__.py index 1a303f3..83896b4 100644 --- a/src/hydrusvideodeduplicator/__main__.py +++ b/src/hydrusvideodeduplicator/__main__.py @@ -73,10 +73,6 @@ def main( ): # Fix mypy errors from optional parameters assert overwrite is not None and threshold is not None and skip_hashing is not None and job_count is not None - if job_count != 1: - print(f"[yellow] Job count was {job_count} but was overriden to '1' for development right now.") - print("Don't worry. Multithreaded hashing will be added back soon before the next release.") - job_count = 1 # CLI debug parameter sets log level to info or debug loglevel = logging.INFO diff --git a/src/hydrusvideodeduplicator/dedup.py b/src/hydrusvideodeduplicator/dedup.py index 628a577..52149ca 100644 --- a/src/hydrusvideodeduplicator/dedup.py +++ b/src/hydrusvideodeduplicator/dedup.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import time from dataclasses import dataclass from itertools import islice from typing import TYPE_CHECKING @@ -41,14 +42,77 @@ class FailedPerceptuallyHashedFile: """Class for failed perceptually hashed files.""" file_hash: FileHash + exc: Exception | str + + +class HydrusApiException(Exception): + """Wrapper around hydrus_api.HydrusAPIException to avoid some coupling with hydrus_api outside of FileHasher.""" class FailedPerceptualHashException(Exception): """Exception for when files are failed to be perceptually hashed.""" - def __init__(self, file_hash: FileHash): + def __init__(self, file_hash: FileHash, other_exc: str = ""): super().__init__() self.file_hash = file_hash + self.other_exc = other_exc + + +class FileHasher: + """ + A class to fetch a file from Hydrus and phash it. + + Note: This class was created because HydrusVideoDeduplicator is not pickleable by + joblib due to the sqlite db member variable. + """ + + def __init__(self, client: HVDClient): + self.client = client + + def _fetch_file(self, file_hash: str): + try: + response = self.client.client.get_file(hash_=file_hash) + except hydrus_api.HydrusAPIException as exc: + raise HydrusApiException(exc) + return response.content + + def _phash_file(self, file: bytes) -> str: + try: + phash = compute_phash(file) + phash_str: str = encode_phash_to_str(phash) + except Exception as exc: + raise FailedPerceptualHashException("", str(exc)) + + # sanity check + if phash_str is None or phash_str == "[]" or phash_str == "": + raise FailedPerceptualHashException("", "phash_str was None or empty or [].") + + return phash_str + + def fetch_and_phash_file(self, file_hash: str) -> PerceptuallyHashedFile | FailedPerceptuallyHashedFile: + """ + Retrieves the file from Hydrus and calculates its perceptual hash and returns the result. + + Returns FailedPerceptuallyHashedFile with the failed video hash if there are any errors. + """ + try: + file = self._fetch_file(file_hash) + except HydrusApiException as exc: + # Add a delay before turning so that if there is some transient issue + # the next file to be hashed won't also immediately error. + # This is a hack. There should probably be some counter in the result generator + # for the number of failures before hashing is stopped entirely. + time.sleep(3) + return FailedPerceptuallyHashedFile(file_hash, exc) + + try: + phash = self._phash_file(file) + except FailedPerceptualHashException as exc: + # Note: Joblib can't serialize unless this exc is a str. It may be because + # it may contain exceptions from other modules like pyav or something. + return FailedPerceptuallyHashedFile(file_hash, str(exc)) + + return PerceptuallyHashedFile(file_hash, phash) class HydrusVideoDeduplicator: @@ -115,9 +179,6 @@ def deduplicate( # while this is running. pre_dedupe_count = self.client.get_potential_duplicate_count_hydrus() - # old: - # self.find_potential_duplicates_old() - # new: self.find_potential_duplicates() @@ -131,47 +192,6 @@ def deduplicate( self.hydlog.info("Deduplication done.") - def fetch_and_hash_file_exception_safe( - self, file_hash: str - ) -> PerceptuallyHashedFile | FailedPerceptuallyHashedFile: - """ - Joblib can't handle exceptions, so this is used to wrap fetch_and_hash_file and - convert any exceptions to the failed file class. - """ - try: - return self.fetch_and_hash_file(file_hash) - except FailedPerceptualHashException as exc: - return FailedPerceptuallyHashedFile(exc.file_hash) - - def fetch_and_hash_file(self, video_hash: str) -> PerceptuallyHashedFile | FailedPerceptualHashException: - """ - Retrieves the video from Hydrus and calculates its perceptual hash. - - Throws FailedPerceptualHashException with the failed video hash if there's any errors. - """ - try: - video_response = self.client.client.get_file(hash_=video_hash) - except hydrus_api.HydrusAPIException: - print("[red] Failed to get video from Hydrus.") - self.hydlog.error("Error getting video from Hydrus.") - raise FailedPerceptualHashException(video_hash) - - # Calculate perceptual_hash - try: - phash = compute_phash(video_response.content) - phash_str: str = encode_phash_to_str(phash) - except Exception as exc: - print("[red] Failed to calculate a perceptual hash.") - self.hydlog.exception(exc) - self.hydlog.error(f"Errored file hash: {video_hash}") - raise FailedPerceptualHashException(video_hash) - else: - # "just in case" error checking - if phash_str is None or phash_str == "[]": - raise FailedPerceptualHashException(video_hash) - - return PerceptuallyHashedFile(video_hash, phash_str) - def filter_unhashed(self, file_hashes: list[FileHash]) -> list[FileHash]: """ Get only the files that have not been perceptually hashed in the db from a list of files. @@ -186,34 +206,43 @@ def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> None: and then add it to the database. """ success_hash_count = 0 - failed_hash_count = 0 + failed_from_api_errors_count = 0 + failed_from_phash_count = 0 self.hydlog.info("Starting perceptual hash processing") try: - with ( - tqdm( - total=len(video_hashes), - desc="Perceptually hashing files", - dynamic_ncols=True, - unit="video", - colour="BLUE", - ) as pbar, - Parallel(n_jobs=self.job_count, return_as="generator_unordered") as parallel, - ): - result_generator = parallel( - delayed(self.fetch_and_hash_file_exception_safe)(video_hash) for video_hash in video_hashes - ) - for result in result_generator: - if isinstance(result, FailedPerceptuallyHashedFile): - if self.page_logger: - self.page_logger.add_failed_video(result.file_hash) - failed_hash_count += 1 - pbar.update(1) - continue - self.db.add_to_phashed_files_queue(result.file_hash, result.perceptual_hash) - self.db.commit() + with tqdm( + total=len(video_hashes), + desc="Perceptually hashing files", + dynamic_ncols=True, + unit="video", + colour="BLUE", + ) as pbar: + filehasher = FileHasher(self.client) + with Parallel(n_jobs=self.job_count, return_as="generator_unordered") as parallel: + # Note: joblib actually copies the entire filehasher into a new process, including the client. + result_generator = parallel( + delayed(filehasher.fetch_and_phash_file)(video_hash) for video_hash in video_hashes + ) + for result in result_generator: + if isinstance(result, FailedPerceptuallyHashedFile): + # We only want to add the failure to the page if the file was the actual cause of failure. + if isinstance(result.exc, HydrusApiException): + failed_from_api_errors_count += 1 + print("[red] Hydrus API error during perceptual hashing:") + print(f"{result.exc}") + else: + failed_from_phash_count += 1 + print("[red] Failed to perceptually hash a file.") + print(f"Failed file SHA256 hash: {result.file_hash}") + print(f"{result.exc}") + if self.page_logger: + self.page_logger.add_failed_video(result.file_hash) + else: + success_hash_count += 1 + self.db.add_to_phashed_files_queue(result.file_hash, result.perceptual_hash) + self.db.commit() - success_hash_count += 1 - pbar.update(1) + pbar.update(1) except KeyboardInterrupt: print("[yellow] Perceptual hash processing was interrupted!") @@ -222,12 +251,20 @@ def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> None: print("[green] Finished perceptual hash processing.") finally: - if failed_hash_count > 0: - print(f"[yellow] Perceptual hash processing had {failed_hash_count} failed files.") - if self.page_logger is None: + total_failures = failed_from_api_errors_count + failed_from_phash_count + if total_failures > 0: + print(f"[yellow] Perceptual hash processing had {total_failures} total failed files.") + if failed_from_api_errors_count > 0: + print( + f"[yellow] {failed_from_api_errors_count} failures were due to API errors. Ensure Hydrus is running and accessible before trying again." # noqa: E501 + ) + if failed_from_phash_count > 0: + print( + f"[yellow] {failed_from_phash_count} failures were from an error during perceptual hashing. Are the files corrupted?" # noqa: E501 + ) print( - "\nTip: You can see what files failed directly in Hydrus by " - "creating a page with the name 'failed' and " + "\nTip: You could have seen which files failed directly in Hydrus by " + "creating a Hydrus page with the name 'failed' and " "running the program with '--failed-page-name=failed'\n" ) print(f"[green] Added {success_hash_count} new videos to the perceptual hash database.")