Skip to content

Commit

Permalink
Add multithreaded single file hashing with accelerators
Browse files Browse the repository at this point in the history
  • Loading branch information
ianwal committed Jul 28, 2024
1 parent 6dff17e commit f034530
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 30 deletions.
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ dependencies = [
# Below is for vpdqpy
"pillow",
"pyav<12",
"hvdaccelerators @ git+https://github.com/hydrusvideodeduplicator/hvdaccelerators.git",
]

# pypi doesn't allow this, so need to remove and upload hvdaccelerators to pypi before release.
[tool.hatch.metadata]
allow-direct-references = true

[project.urls]
Documentation = "https://github.com/hydrusvideodeduplicator/hydrus-video-deduplicator#readme"
Issues = "https://github.com/hydrusvideodeduplicator/hydrus-video-deduplicator/issues"
Expand Down
48 changes: 24 additions & 24 deletions src/hydrusvideodeduplicator/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING

from joblib import Parallel, delayed
from rich import print
from tqdm import tqdm

Expand All @@ -14,6 +13,8 @@

FileHash = str

import gc

import hydrusvideodeduplicator.hydrus_api as hydrus_api

from .client import HVDClient
Expand Down Expand Up @@ -280,32 +281,31 @@ def add_perceptual_hashes_to_db(self, video_hashes: Sequence[str]) -> Perceptual
colour="BLUE",
) as pbar:
filehasher = FileHasher(self.client)
with Parallel(n_jobs=self.job_count, return_as="generator_unordered") as parallel:
# Note: joblib actually copies the entire filehasher into a new process, including the client.
result_generator = parallel(
delayed(filehasher.fetch_and_phash_file)(video_hash) for video_hash in video_hashes
)
for result in result_generator:
if isinstance(result, FailedPerceptuallyHashedFile):
# We only want to add the failure to the page if the file was the actual cause of failure.
if isinstance(result.exc, HydrusApiException):
stats.failed_from_api_errors_count += 1
print("[red] Hydrus API error during perceptual hashing:")
print(f"{result.exc}")
else:
stats.failed_from_phash_count += 1
print("[red] Failed to perceptually hash a file.")
print(f"Failed file SHA256 hash: {result.file_hash}")
print(f"{result.exc}")
if self.page_logger:
self.page_logger.add_failed_video(result.file_hash)
for video_hash in video_hashes:
result = filehasher.fetch_and_phash_file(video_hash)
if isinstance(result, FailedPerceptuallyHashedFile):
# We only want to add the failure to the page if the file was the actual cause of failure.
if isinstance(result.exc, HydrusApiException):
stats.failed_from_api_errors_count += 1
print("[red] Hydrus API error during perceptual hashing:")
print(f"{result.exc}")
else:
stats.success_hash_count += 1
self.db.add_to_phashed_files_queue(result.file_hash, result.perceptual_hash)

pbar.update(1)
stats.failed_from_phash_count += 1
print("[red] Failed to perceptually hash a file.")
print(f"Failed file SHA256 hash: {result.file_hash}")
print(f"{result.exc}")
if self.page_logger:
self.page_logger.add_failed_video(result.file_hash)
else:
stats.success_hash_count += 1
self.db.add_to_phashed_files_queue(result.file_hash, result.perceptual_hash)

# Collect garbage now to avoid huge memory usage from the video files and frames.
gc.collect()
pbar.update(1)
except KeyboardInterrupt:
raise CancelledPerceptualHashException(stats)
gc.collect()
return stats

def mark_videos_as_duplicates(self, video1_hash: str, video2_hash: str):
Expand Down
31 changes: 25 additions & 6 deletions src/hydrusvideodeduplicator/vpdqpy/vpdqpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from typing import TYPE_CHECKING

import av
from hvdaccelerators import stuff
from PIL import Image

from ..pdqhashing.hasher.pdq_hasher import PDQHasher
from ..pdqhashing.pdq_types.hash256 import Hash256

if TYPE_CHECKING:
Expand Down Expand Up @@ -161,14 +161,33 @@ def computeHash(
if video is None:
raise ValueError

pdq = PDQHasher()
# pdq = PDQHasher()
features: VpdqHash = []

hasher = None
for second, frame in enumerate(Vpdq.frame_extract_pyav(video)):
pdq_hash_and_quality = pdq.fromBufferedImage(frame.to_image())
pdq_frame = VpdqFeature(pdq_hash_and_quality.getHash(), pdq_hash_and_quality.getQuality(), second)
features.append(pdq_frame)

# TODO: This uses SO MUCH memory is hashing gets behind decoding since there will be
# lots of raw frames in the queue which are HUGE. Add a max size for the queue.
# ... or I have a memory leak :(
im = frame.to_image()
im.thumbnail((512, 512))
if not hasher:
# TODO: Fix this to get the average fps from frame_extract_pyav or a new method.
# Although this doesn't appear to actually do anything. Exact hashing tests pass...
average_fps = 1
hasher = stuff.Hasher(average_fps, im.width, im.height)
rgb_image = im.convert("RGB")
# result = stuff.hash_frame(rgb_image.tobytes(), im.width, im.height)
hasher.hash_frame(rgb_image.tobytes())
# (pdq_hash, pdq_quality) = result
# pdq_hash = str(pdq_hash, encoding="utf-8")
# pdq_frame = VpdqFeature(Hash256.fromHexString(pdq_hash), pdq_quality, second)
# features.append(pdq_frame)
features = hasher.finish()
features = [
VpdqFeature(Hash256.fromHexString(feature.get_hash()), feature.get_quality(), feature.get_frame_number())
for feature in features
]
deduped_features = Vpdq.dedupe_features(features)
return deduped_features

Expand Down

0 comments on commit f034530

Please sign in to comment.