From bc7f6426b3c979aa1bf30e3e9f4fd3feff1e69df Mon Sep 17 00:00:00 2001 From: Brian Helba Date: Fri, 27 Jan 2023 17:36:48 -0500 Subject: [PATCH] Simplify some internal redaction classes --- imagedephi/redact.py | 70 ++++++++++-------------- imagedephi/rules.py | 127 +++++++++++++++++++++++-------------------- 2 files changed, 98 insertions(+), 99 deletions(-) diff --git a/imagedephi/redact.py b/imagedephi/redact.py index a95d38bf..145a3812 100644 --- a/imagedephi/redact.py +++ b/imagedephi/redact.py @@ -2,6 +2,7 @@ from collections.abc import Generator import importlib.resources +from itertools import chain from pathlib import Path from typing import TYPE_CHECKING @@ -10,7 +11,7 @@ import tifftools.constants import yaml -from imagedephi.rules import RuleSet, RuleSource, TiffMetadataRule, build_ruleset +from imagedephi.rules import MetadataTiffRule, RuleSet, RuleSource, build_ruleset if TYPE_CHECKING: from tifftools.tifftools import IFD, TiffInfo @@ -24,11 +25,9 @@ class TiffMetadataRedactionPlan: images, and also executing the plan. """ - redaction_steps: dict[int, TiffMetadataRule] + tiff_info: TiffInfo + redaction_steps: dict[int, MetadataTiffRule] no_match_tags: list[tifftools.TiffTag] - image_data: TiffInfo - base_rules: list[TiffMetadataRule] - override_rules: list[TiffMetadataRule] @staticmethod def _iter_tiff_tag_entries( @@ -47,57 +46,48 @@ def _iter_tiff_tag_entries( for sub_ifds in entry.get("ifds", []): yield from TiffMetadataRedactionPlan._iter_tiff_tag_entries(sub_ifds) - def _add_tag_to_plan(self, tag: tifftools.TiffTag) -> None: - """Determine how to handle a given tag.""" - for rule in self.override_rules: - if rule.is_match(tag): - self.redaction_steps[tag.value] = rule - return - for rule in self.base_rules: - if rule.is_match(tag): - self.redaction_steps[tag.value] = rule - return - self.no_match_tags.append(tag) - def __init__( self, tiff_info: TiffInfo, - base_rules: list[TiffMetadataRule], - override_rules: list[TiffMetadataRule], + base_rules: list[MetadataTiffRule], + override_rules: list[MetadataTiffRule], ) -> None: + self.tiff_info = tiff_info + self.redaction_steps = {} self.no_match_tags = [] - self.image_data = tiff_info - self.base_rules = base_rules - self.override_rules = override_rules - ifds = self.image_data["ifds"] + ifds = self.tiff_info["ifds"] for tag, _ in self._iter_tiff_tag_entries(ifds): - self._add_tag_to_plan(tag) + # First iterate through overrides, then base + for rule in chain(override_rules, base_rules): + if rule.is_match(tag): + self.redaction_steps[tag.value] = rule + break + else: + # End of iteration, without "break"; no matching rule found anywhere + self.no_match_tags.append(tag) def report_missing_rules(self) -> None: - if len(self.no_match_tags) == 0: - click.echo("This redaction plan is comprehensive.") - else: + if self.no_match_tags: click.echo("The following tags could not be redacted given the current set of rules.") for tag in self.no_match_tags: click.echo(f"{tag.value} - {tag.name}") + else: + click.echo("This redaction plan is comprehensive.") def report_plan(self) -> None: click.echo("Tiff Metadata Redaction Plan\n") - for _key, rule in self.redaction_steps.items(): + for rule in self.redaction_steps.values(): click.echo(rule.get_description()) self.report_missing_rules() - def _redact_one_tag(self, ifd: IFD, tag: tifftools.TiffTag) -> None: - if tag.value in self.redaction_steps: - rule = self.redaction_steps[tag.value] - rule.apply(ifd) - def execute_plan(self) -> None: """Modify the image data according to the redaction rules.""" - ifds = self.image_data["ifds"] + ifds = self.tiff_info["ifds"] for tag, ifd in self._iter_tiff_tag_entries(ifds): - self._redact_one_tag(ifd, tag) + rule = self.redaction_steps.get(tag.value) + if rule is not None: + rule.apply(ifd) def _get_output_path(file_path: Path, output_dir: Path) -> Path: @@ -122,10 +112,10 @@ def redact_images(image_dir: Path, output_dir: Path, override_rules: RuleSet | N click.echo(f"Redacting {child.name}...") redaction_plan = TiffMetadataRedactionPlan( tiff_info, - base_rules.get_tiff_metadata_rules(), - override_rules.get_tiff_metadata_rules() if override_rules else [], + base_rules.get_metadata_tiff_rules(), + override_rules.get_metadata_tiff_rules() if override_rules else [], ) - if len(redaction_plan.no_match_tags): + if redaction_plan.no_match_tags: click.echo(f"Redaction could not be performed for {child.name}.") redaction_plan.report_missing_rules() else: @@ -139,7 +129,7 @@ def show_redaction_plan(image_path: click.Path, override_rules: RuleSet | None = tiff_info = tifftools.read_tiff(str(image_path)) redaction_plan = TiffMetadataRedactionPlan( tiff_info, - base_rules.get_tiff_metadata_rules(), - override_rules.get_tiff_metadata_rules() if override_rules else [], + base_rules.get_metadata_tiff_rules(), + override_rules.get_metadata_tiff_rules() if override_rules else [], ) redaction_plan.report_plan() diff --git a/imagedephi/rules.py b/imagedephi/rules.py index 24469a54..b6d047da 100644 --- a/imagedephi/rules.py +++ b/imagedephi/rules.py @@ -1,5 +1,6 @@ from __future__ import annotations +import abc from dataclasses import dataclass from enum import Enum from typing import TYPE_CHECKING @@ -30,100 +31,108 @@ class RuleSource(Enum): OVERRIDE = "override" -@dataclass -class Rule: +class Rule(abc.ABC): description: str | None redact_method: RedactMethod rule_type: RuleType rule_source: RuleSource - # Consider factory class fn here - # def make(...): - def get_description(self) -> str: """Generate a title for the rule.""" return self.description if self.description else "" -@dataclass -class TiffMetadataRule(Rule): +class TiffRule(Rule): + @abc.abstractmethod + def apply(self, ifd: IFD): + ... + + +class MetadataTiffRule(TiffRule): tag: tifftools.TiffTag - replace_value: str | bytes | list[int | float] | None + rule_type = RuleType.METADATA - @classmethod - def build(cls, rule_dict: dict, source: RuleSource) -> TiffMetadataRule: + def __init__(self, rule_spec: dict, rule_source: RuleSource) -> None: """Transform a rule from schema into an object.""" - tag = tifftools.constants.Tag[rule_dict["tag_name"]] - redact_method = RedactMethod[rule_dict["method"].upper()] - return TiffMetadataRule( - description=rule_dict.get("description", None), # this is optional - redact_method=redact_method, - rule_type=RuleType.METADATA, - rule_source=source, - tag=tag, - replace_value=rule_dict["new_value"] if redact_method == RedactMethod.REPLACE else None, - ) + self.description = rule_spec.get("description", None) # this is optional + self.redact_method = RedactMethod[rule_spec["method"].upper()] + self.rule_source = rule_source + self.tag = tifftools.constants.Tag[rule_spec["tag_name"]] def is_match(self, tag: tifftools.TiffTag) -> bool: return self.tag.value == tag.value + @abc.abstractmethod def apply(self, ifd: IFD): - if self.redact_method == RedactMethod.DELETE: - del ifd["tags"][self.tag.value] - elif self.redact_method == RedactMethod.REPLACE: - # If rules are constructed via make_rule, this should not be an issue - if self.replace_value is None: - raise RuntimeError( - f"A rule with redaction method {self.redact_method} " - "must have a valid replacement value." - ) - ifd["tags"][self.tag.value]["data"] = self.replace_value - elif self.redact_method == RedactMethod.KEEP: - pass + ... def get_description(self) -> str: if self.description: return self.description return f"Tag {self.tag.value} - {self.tag.name}: {self.redact_method} ({self.rule_source})" + @classmethod + def build(cls, rule_spec: dict, rule_source: RuleSource) -> MetadataTiffRule: + # TODO: some input validation here, in case an invalid "method" is provided + redact_method = RedactMethod[rule_spec["method"].upper()] + for rule_class in cls.__subclasses__(): + if rule_class.redact_method == redact_method: + return rule_class(rule_spec, rule_source) + else: + raise Exception("Unknown redact_method.") + + +class ReplaceMetadataTiffRule(MetadataTiffRule): + redact_method = RedactMethod.REPLACE + replace_value: str | bytes | list[int | float] + + def __init__(self, rule_spec: dict, source: RuleSource) -> None: + super().__init__(rule_spec, source) + self.replace_value = rule_spec["new_value"] + + def apply(self, ifd: IFD): + ifd["tags"][self.tag.value]["data"] = self.replace_value + + +class DeleteMetadataTiffRule(MetadataTiffRule): + redact_method = RedactMethod.DELETE + + def apply(self, ifd: IFD): + del ifd["tags"][self.tag.value] + + +class KeepMetadataTiffRule(MetadataTiffRule): + redact_method = RedactMethod.KEEP + + def apply(self, ifd: IFD): + pass + @dataclass class RuleSet: name: str description: str - rules: dict[FileFormat, dict[RuleType, list[Rule]]] + rules: dict[FileFormat, list[Rule]] - def get_tiff_metadata_rules(self) -> list[TiffMetadataRule]: - return [ - rule - for rule in self.rules[FileFormat.TIFF][RuleType.METADATA] - if isinstance(rule, TiffMetadataRule) - ] + def get_metadata_tiff_rules(self) -> list[MetadataTiffRule]: + return [rule for rule in self.rules[FileFormat.TIFF] if isinstance(rule, MetadataTiffRule)] -def _build_rule( - file_format: FileFormat, rule_type: RuleType, rule_dict: dict, source: RuleSource -) -> Rule | None: +def _build_rule(file_format: FileFormat, rule_spec: dict, rule_source: RuleSource) -> Rule: + rule_type = RuleType[rule_spec["type"].upper()] if file_format == FileFormat.TIFF: if rule_type == RuleType.METADATA: - return TiffMetadataRule.build(rule_dict, source) - return None + return MetadataTiffRule.build(rule_spec, rule_source) + + raise Exception("Unsupported rule.") -def build_ruleset(rules_dict: dict, rule_source: RuleSource) -> RuleSet: +def build_ruleset(ruleset_spec: dict, rule_source: RuleSource) -> RuleSet: """Read in metadata redaction rules from a file.""" rule_set_rules = {} - for file_format in rules_dict["rules"]: - format_key = FileFormat[file_format.upper()] - format_rules = rules_dict["rules"][file_format] - format_rule_objects: dict[RuleType, list[Rule]] = { - RuleType.IMAGE: [], - RuleType.METADATA: [], - } - for rule in format_rules: - rule_type = RuleType[rule["type"].upper()] - rule = _build_rule(format_key, rule_type, rule, rule_source) - if rule: - format_rule_objects[rule_type].append(rule) - rule_set_rules[format_key] = format_rule_objects - return RuleSet(rules_dict["name"], rules_dict["description"], rule_set_rules) + for file_format_key, rule_specs in ruleset_spec["rules"].items(): + file_format = FileFormat[file_format_key.upper()] + rule_set_rules[file_format] = [ + _build_rule(file_format, rule_spec, rule_source) for rule_spec in rule_specs + ] + return RuleSet(ruleset_spec["name"], ruleset_spec["description"], rule_set_rules)