Skip to content

Commit

Permalink
Fix concurrent hashing
Browse files Browse the repository at this point in the history
This was broken due to the newly added db member variable. I moved everything to a FileHasher class that is able to be serialized by joblib.
  • Loading branch information
ianwal committed Jul 22, 2024
1 parent e188193 commit 7f7bee9
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 79 deletions.
4 changes: 0 additions & 4 deletions src/hydrusvideodeduplicator/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ def main(
):
# Fix mypy errors from optional parameters
assert overwrite is not None and threshold is not None and skip_hashing is not None and job_count is not None
if job_count != 1:
print(f"[yellow] Job count was {job_count} but was overriden to '1' for development right now.")
print("Don't worry. Multithreaded hashing will be added back soon before the next release.")
job_count = 1

# CLI debug parameter sets log level to info or debug
loglevel = logging.INFO
Expand Down
187 changes: 112 additions & 75 deletions src/hydrusvideodeduplicator/dedup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import time
from dataclasses import dataclass
from itertools import islice
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -41,14 +42,77 @@ class FailedPerceptuallyHashedFile:
"""Class for failed perceptually hashed files."""

file_hash: FileHash
exc: Exception | str


class HydrusApiException(Exception):
"""Wrapper around hydrus_api.HydrusAPIException to avoid some coupling with hydrus_api outside of FileHasher."""


class FailedPerceptualHashException(Exception):
"""Exception for when files are failed to be perceptually hashed."""

def __init__(self, file_hash: FileHash):
def __init__(self, file_hash: FileHash, other_exc: str = ""):
super().__init__()
self.file_hash = file_hash
self.other_exc = other_exc


class FileHasher:
"""
A class to fetch a file from Hydrus and phash it.
Note: This class was created because HydrusVideoDeduplicator is not pickleable by
joblib due to the sqlite db member variable.
"""

def __init__(self, client: HVDClient):
self.client = client

def _fetch_file(self, file_hash: str):
try:
response = self.client.client.get_file(hash_=file_hash)
except hydrus_api.HydrusAPIException as exc:
raise HydrusApiException(exc)
return response.content

def _phash_file(self, file: bytes) -> str:
try:
phash = compute_phash(file)
phash_str: str = encode_phash_to_str(phash)
except Exception as exc:
raise FailedPerceptualHashException("", str(exc))

# sanity check
if phash_str is None or phash_str == "[]" or phash_str == "":
raise FailedPerceptualHashException("", "phash_str was None or empty or [].")

return phash_str

def fetch_and_phash_file(self, file_hash: str) -> PerceptuallyHashedFile | FailedPerceptuallyHashedFile:
"""
Retrieves the file from Hydrus and calculates its perceptual hash and returns the result.
Returns FailedPerceptuallyHashedFile with the failed video hash if there are any errors.
"""
try:
file = self._fetch_file(file_hash)
except HydrusApiException as exc:
# Add a delay before turning so that if there is some transient issue
# the next file to be hashed won't also immediately error.
# This is a hack. There should probably be some counter in the result generator
# for the number of failures before hashing is stopped entirely.
time.sleep(3)
return FailedPerceptuallyHashedFile(file_hash, exc)

try:
phash = self._phash_file(file)
except FailedPerceptualHashException as exc:
# Note: Joblib can't serialize unless this exc is a str. It may be because
# it may contain exceptions from other modules like pyav or something.
return FailedPerceptuallyHashedFile(file_hash, str(exc))

return PerceptuallyHashedFile(file_hash, phash)


class HydrusVideoDeduplicator:
Expand Down Expand Up @@ -115,9 +179,6 @@ def deduplicate(
# while this is running.
pre_dedupe_count = self.client.get_potential_duplicate_count_hydrus()

# old:
# self.find_potential_duplicates_old()

# new:
self.find_potential_duplicates()

Expand All @@ -131,47 +192,6 @@ def deduplicate(

self.hydlog.info("Deduplication done.")

def fetch_and_hash_file_exception_safe(
self, file_hash: str
) -> PerceptuallyHashedFile | FailedPerceptuallyHashedFile:
"""
Joblib can't handle exceptions, so this is used to wrap fetch_and_hash_file and
convert any exceptions to the failed file class.
"""
try:
return self.fetch_and_hash_file(file_hash)
except FailedPerceptualHashException as exc:
return FailedPerceptuallyHashedFile(exc.file_hash)

def fetch_and_hash_file(self, video_hash: str) -> PerceptuallyHashedFile | FailedPerceptualHashException:
"""
Retrieves the video from Hydrus and calculates its perceptual hash.
Throws FailedPerceptualHashException with the failed video hash if there's any errors.
"""
try:
video_response = self.client.client.get_file(hash_=video_hash)
except hydrus_api.HydrusAPIException:
print("[red] Failed to get video from Hydrus.")
self.hydlog.error("Error getting video from Hydrus.")
raise FailedPerceptualHashException(video_hash)

# Calculate perceptual_hash
try:
phash = compute_phash(video_response.content)
phash_str: str = encode_phash_to_str(phash)
except Exception as exc:
print("[red] Failed to calculate a perceptual hash.")
self.hydlog.exception(exc)
self.hydlog.error(f"Errored file hash: {video_hash}")
raise FailedPerceptualHashException(video_hash)
else:
# "just in case" error checking
if phash_str is None or phash_str == "[]":
raise FailedPerceptualHashException(video_hash)

return PerceptuallyHashedFile(video_hash, phash_str)

def filter_unhashed(self, file_hashes: list[FileHash]) -> list[FileHash]:
"""
Get only the files that have not been perceptually hashed in the db from a list of files.
Expand All @@ -186,34 +206,43 @@ def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> None:
and then add it to the database.
"""
success_hash_count = 0
failed_hash_count = 0
failed_from_api_errors_count = 0
failed_from_phash_count = 0
self.hydlog.info("Starting perceptual hash processing")
try:
with (
tqdm(
total=len(video_hashes),
desc="Perceptually hashing files",
dynamic_ncols=True,
unit="video",
colour="BLUE",
) as pbar,
Parallel(n_jobs=self.job_count, return_as="generator_unordered") as parallel,
):
result_generator = parallel(
delayed(self.fetch_and_hash_file_exception_safe)(video_hash) for video_hash in video_hashes
)
for result in result_generator:
if isinstance(result, FailedPerceptuallyHashedFile):
if self.page_logger:
self.page_logger.add_failed_video(result.file_hash)
failed_hash_count += 1
pbar.update(1)
continue
self.db.add_to_phashed_files_queue(result.file_hash, result.perceptual_hash)
self.db.commit()
with tqdm(
total=len(video_hashes),
desc="Perceptually hashing files",
dynamic_ncols=True,
unit="video",
colour="BLUE",
) as pbar:
filehasher = FileHasher(self.client)
with Parallel(n_jobs=self.job_count, return_as="generator_unordered") as parallel:
# Note: joblib actually copies the entire filehasher into a new process, including the client.
result_generator = parallel(
delayed(filehasher.fetch_and_phash_file)(video_hash) for video_hash in video_hashes
)
for result in result_generator:
if isinstance(result, FailedPerceptuallyHashedFile):
# We only want to add the failure to the page if the file was the actual cause of failure.
if isinstance(result.exc, HydrusApiException):
failed_from_api_errors_count += 1
print("[red] Hydrus API error during perceptual hashing:")
print(f"{result.exc}")
else:
failed_from_phash_count += 1
print("[red] Failed to perceptually hash a file.")
print(f"Failed file SHA256 hash: {result.file_hash}")
print(f"{result.exc}")
if self.page_logger:
self.page_logger.add_failed_video(result.file_hash)
else:
success_hash_count += 1
self.db.add_to_phashed_files_queue(result.file_hash, result.perceptual_hash)
self.db.commit()

success_hash_count += 1
pbar.update(1)
pbar.update(1)

except KeyboardInterrupt:
print("[yellow] Perceptual hash processing was interrupted!")
Expand All @@ -222,12 +251,20 @@ def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> None:
print("[green] Finished perceptual hash processing.")

finally:
if failed_hash_count > 0:
print(f"[yellow] Perceptual hash processing had {failed_hash_count} failed files.")
if self.page_logger is None:
total_failures = failed_from_api_errors_count + failed_from_phash_count
if total_failures > 0:
print(f"[yellow] Perceptual hash processing had {total_failures} total failed files.")
if failed_from_api_errors_count > 0:
print(
f"[yellow] {failed_from_api_errors_count} failures were due to API errors. Ensure Hydrus is running and accessible before trying again." # noqa: E501
)
if failed_from_phash_count > 0:
print(
f"[yellow] {failed_from_phash_count} failures were from an error during perceptual hashing. Are the files corrupted?" # noqa: E501
)
print(
"\nTip: You can see what files failed directly in Hydrus by "
"creating a page with the name 'failed' and "
"\nTip: You could have seen which files failed directly in Hydrus by "
"creating a Hydrus page with the name 'failed' and "
"running the program with '--failed-page-name=failed'\n"
)
print(f"[green] Added {success_hash_count} new videos to the perceptual hash database.")
Expand Down

0 comments on commit 7f7bee9

Please sign in to comment.