Skip to content

Commit

Permalink
Images name hashing implemented: #24
Browse files Browse the repository at this point in the history
  • Loading branch information
Artiom N. authored and artiomn committed Apr 27, 2024
1 parent 5752f21 commit d245977
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 17 deletions.
1 change: 1 addition & 0 deletions markdown_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def main(arguments):
deduplication_type=getattr(DeduplicationVariant, arguments.deduplication_type.upper()),
images_dirname=arguments.images_dirname,
save_hierarchy=arguments.prepend_images_with_path,
replace_image_names=arguments.replace_image_names,
)

processor.process()
Expand Down
3 changes: 3 additions & 0 deletions markdown_toolset/article_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
deduplication_type: DeduplicationVariant = DeduplicationVariant.DISABLED,
images_dirname: Union[Path, str] = 'images',
save_hierarchy: bool = False,
replace_image_names: bool = False,
):
self._article_formatter = get_formatter(output_format, FORMATTERS)
self._article_downloader = ArticleDownloader(
Expand All @@ -60,6 +61,7 @@ def __init__(
self._save_hierarchy = save_hierarchy
self._img_downloader = None
self._running = False
self._replace_image_names = replace_image_names

def process(self):
try:
Expand Down Expand Up @@ -103,6 +105,7 @@ def process(self):
download_incorrect_mime_types=self._download_incorrect_mime,
downloading_timeout=self._downloading_timeout,
deduplicator=deduplicator,
replace_image_names=self._replace_image_names,
)

result = self._transform_article(article_path, self._input_formats, TRANSFORMERS)
Expand Down
53 changes: 41 additions & 12 deletions markdown_toolset/image_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

from .deduplicators.deduplicator import Deduplicator
from .out_path_maker import OutPathMaker
from .www_tools import download_from_url, get_filename_from_url, is_url, remove_protocol_prefix
from .www_tools import download_from_url, get_filename_from_url, is_url, remove_protocol_prefix, split_file_ext
from .string_tools import is_binary_same


class ImageLink:
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(
download_incorrect_mime_types: bool = False,
downloading_timeout: float = -1,
deduplicator: Optional[Deduplicator] = None,
replace_image_names: bool = False,
):
"""
:parameter out_path_maker: image local path creating strategy.
Expand All @@ -78,6 +80,7 @@ def __init__(
`downloading_timeout` seconds.
:parameter download_incorrect_mime_types: download images even if MIME type can't be identified.
:parameter deduplicator: file deduplicator object.
:parameter replace_image_names: replace image names with hash.
"""

self._out_path_maker = out_path_maker
Expand All @@ -87,6 +90,7 @@ def __init__(
self._download_incorrect_mime_types = download_incorrect_mime_types
self._deduplicator = deduplicator
self._running = False
self._replace_image_names = replace_image_names

# pylint: disable=R0912(too-many-branches)
def download_images(self, images: List[Union[str, ImageLink]]) -> dict:
Expand Down Expand Up @@ -152,6 +156,13 @@ def download_images(self, images: List[Union[str, ImageLink]]) -> dict:
'Empty image filename, probably this is incorrect link: "%s".', image_download_url
)
continue

if self._replace_image_names:
_, image_ext = split_file_ext(image_filename)
image_content_hash = hashlib.sha384(image_content).hexdigest()
logging.debug('Image content hash: %s', image_filename)
image_filename = f'{image_content_hash}.{image_ext}'

except Exception as e:
if self._skip_all_errors:
logging.warning(
Expand All @@ -171,8 +182,24 @@ def download_images(self, images: List[Union[str, ImageLink]]) -> dict:
if not result:
continue

real_image_path = self._process_image_path(image_url, image_filename, replacement_mapping)
image_local_url, real_image_path = self._get_real_path(image_url, image_filename)

if self._replace_image_names and real_image_path.exists():
# Image by this content hash exists, but possibly this is a collision.
with open(real_image_path, 'rb') as real_file:
if not is_binary_same(real_file, BytesIO(image_content)):
# Fix collision, changing name.
img_num: int = 0
while real_image_path.exists():
numerated_image_filename = f'{image_num}{image_filename}'
real_image_path = self._out_path_maker.get_real_path(
image_local_url, numerated_image_filename
)
img_num += 1

image_filename = numerated_image_filename

self._update_mapping(image_url, image_local_url, image_filename, replacement_mapping)
self._write_image(real_image_path, image_content, image_link)
finally:
logging.info('Finished images downloading.')
Expand Down Expand Up @@ -205,26 +232,28 @@ def _resize_image(image_content: bytes, new_size, filename):
logging.debug('Saving resized image to the %s', filename)
img.save(filename)

def _process_image_path(self, image_url, image_filename, replacement_mapping):
"""Get real image path and update replacement mapping."""

def _get_real_path(self, image_url, image_filename):
"""Get real image path."""
image_local_url = Path(remove_protocol_prefix(image_url)).parent.as_posix()
real_image_path = self._out_path_maker.get_real_path(image_local_url, image_filename)

logging.debug('Real image path = "%s", image filename = "%s"', real_image_path, image_filename)

return image_local_url, real_image_path

def _update_mapping(self, image_url, image_local_url, image_filename, replacement_mapping):
"""Update replacement mapping."""
document_img_path = self._out_path_maker.get_document_img_path(image_local_url, image_filename)
image_filename, document_img_path = self._fix_paths(
replacement_mapping, document_img_path, image_url, image_filename
)

real_image_path = self._out_path_maker.get_real_path(image_local_url, image_filename)
replacement_mapping.setdefault(image_url, '/'.join(document_img_path.parts))

logging.debug(
'Real image path = "%s", document image path = "%s", image filename = "%s"',
real_image_path,
'Document image path = "%s", image filename = "%s"',
document_img_path,
image_filename,
)
replacement_mapping.setdefault(image_url, '/'.join(document_img_path.parts))

return real_image_path

def _make_directories(self, path: Optional[Path] = None):
"""Create directories hierarchy, started from images directory."""
Expand Down
19 changes: 14 additions & 5 deletions markdown_toolset/www_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
import logging

from typing import Optional
from typing import Optional, Tuple
from mimetypes import guess_extension
import re
from urllib.parse import urlparse, urlunparse
Expand Down Expand Up @@ -65,6 +65,18 @@ def download_from_url(url: str, timeout: float = None):
return response


def split_file_ext(file_name: str) -> Tuple[str, str]:
"""
Split filename to the name and extension.
"""
name, ext = (
(name_and_ext := file_name.rsplit('.', 1)),
(*name_and_ext, None) if len(name_and_ext) == 1 else name_and_ext,
)[1:][0]

return name, ext


def get_filename_from_url(req: requests.Response) -> Optional[str]:
"""
Get filename from url and, if not found, try to get from content-disposition.
Expand All @@ -90,10 +102,7 @@ def get_filename_from_url(req: requests.Response) -> Optional[str]:

result = file_name[0]

f_name, f_ext = (
(name_and_ext := result.rsplit('.', 1)),
(*name_and_ext, None) if len(name_and_ext) == 1 else name_and_ext,
)[1:][0]
f_name, f_ext = split_file_ext(result)

if f_name == '':
return None
Expand Down
19 changes: 19 additions & 0 deletions tests/test_image_downloader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
from pathlib import Path

import pytest
Expand Down Expand Up @@ -68,3 +69,21 @@ def test_resizing(self):
with Image.open(self._out_image_filepath) as img:
assert img.width == w
assert img.height == h

def test_names_replacing(self):
image_downloader = ImageDownloader(
out_path_maker=self._out_path_maker,
skip_list=[],
skip_all_errors=False,
download_incorrect_mime_types=True,
downloading_timeout=-1,
deduplicator=None,
replace_image_names=True,
)

with open(self._article_images_path / self._image_filename, 'rb') as image_file:
image_hash = hashlib.sha384(image_file.read()).hexdigest()

image_downloader.download_images([self._image_in_relpath])

assert (self._images_out_path / f'{image_hash}.png').exists()

0 comments on commit d245977

Please sign in to comment.