Skip to content

Commit

Permalink
Simplify some internal redaction classes
Browse files Browse the repository at this point in the history
  • Loading branch information
brianhelba committed Jan 27, 2023
1 parent b93bc1f commit bc7f642
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 99 deletions.
70 changes: 30 additions & 40 deletions imagedephi/redact.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from collections.abc import Generator
import importlib.resources
from itertools import chain
from pathlib import Path
from typing import TYPE_CHECKING

Expand All @@ -10,7 +11,7 @@
import tifftools.constants
import yaml

from imagedephi.rules import RuleSet, RuleSource, TiffMetadataRule, build_ruleset
from imagedephi.rules import MetadataTiffRule, RuleSet, RuleSource, build_ruleset

if TYPE_CHECKING:
from tifftools.tifftools import IFD, TiffInfo
Expand All @@ -24,11 +25,9 @@ class TiffMetadataRedactionPlan:
images, and also executing the plan.
"""

redaction_steps: dict[int, TiffMetadataRule]
tiff_info: TiffInfo
redaction_steps: dict[int, MetadataTiffRule]
no_match_tags: list[tifftools.TiffTag]
image_data: TiffInfo
base_rules: list[TiffMetadataRule]
override_rules: list[TiffMetadataRule]

@staticmethod
def _iter_tiff_tag_entries(
Expand All @@ -47,57 +46,48 @@ def _iter_tiff_tag_entries(
for sub_ifds in entry.get("ifds", []):
yield from TiffMetadataRedactionPlan._iter_tiff_tag_entries(sub_ifds)

def _add_tag_to_plan(self, tag: tifftools.TiffTag) -> None:
"""Determine how to handle a given tag."""
for rule in self.override_rules:
if rule.is_match(tag):
self.redaction_steps[tag.value] = rule
return
for rule in self.base_rules:
if rule.is_match(tag):
self.redaction_steps[tag.value] = rule
return
self.no_match_tags.append(tag)

def __init__(
self,
tiff_info: TiffInfo,
base_rules: list[TiffMetadataRule],
override_rules: list[TiffMetadataRule],
base_rules: list[MetadataTiffRule],
override_rules: list[MetadataTiffRule],
) -> None:
self.tiff_info = tiff_info

self.redaction_steps = {}
self.no_match_tags = []
self.image_data = tiff_info
self.base_rules = base_rules
self.override_rules = override_rules
ifds = self.image_data["ifds"]
ifds = self.tiff_info["ifds"]
for tag, _ in self._iter_tiff_tag_entries(ifds):
self._add_tag_to_plan(tag)
# First iterate through overrides, then base
for rule in chain(override_rules, base_rules):
if rule.is_match(tag):
self.redaction_steps[tag.value] = rule
break
else:
# End of iteration, without "break"; no matching rule found anywhere
self.no_match_tags.append(tag)

def report_missing_rules(self) -> None:
if len(self.no_match_tags) == 0:
click.echo("This redaction plan is comprehensive.")
else:
if self.no_match_tags:
click.echo("The following tags could not be redacted given the current set of rules.")
for tag in self.no_match_tags:
click.echo(f"{tag.value} - {tag.name}")
else:
click.echo("This redaction plan is comprehensive.")

def report_plan(self) -> None:
click.echo("Tiff Metadata Redaction Plan\n")
for _key, rule in self.redaction_steps.items():
for rule in self.redaction_steps.values():
click.echo(rule.get_description())
self.report_missing_rules()

def _redact_one_tag(self, ifd: IFD, tag: tifftools.TiffTag) -> None:
if tag.value in self.redaction_steps:
rule = self.redaction_steps[tag.value]
rule.apply(ifd)

def execute_plan(self) -> None:
"""Modify the image data according to the redaction rules."""
ifds = self.image_data["ifds"]
ifds = self.tiff_info["ifds"]
for tag, ifd in self._iter_tiff_tag_entries(ifds):
self._redact_one_tag(ifd, tag)
rule = self.redaction_steps.get(tag.value)
if rule is not None:
rule.apply(ifd)


def _get_output_path(file_path: Path, output_dir: Path) -> Path:
Expand All @@ -122,10 +112,10 @@ def redact_images(image_dir: Path, output_dir: Path, override_rules: RuleSet | N
click.echo(f"Redacting {child.name}...")
redaction_plan = TiffMetadataRedactionPlan(
tiff_info,
base_rules.get_tiff_metadata_rules(),
override_rules.get_tiff_metadata_rules() if override_rules else [],
base_rules.get_metadata_tiff_rules(),
override_rules.get_metadata_tiff_rules() if override_rules else [],
)
if len(redaction_plan.no_match_tags):
if redaction_plan.no_match_tags:
click.echo(f"Redaction could not be performed for {child.name}.")
redaction_plan.report_missing_rules()
else:
Expand All @@ -139,7 +129,7 @@ def show_redaction_plan(image_path: click.Path, override_rules: RuleSet | None =
tiff_info = tifftools.read_tiff(str(image_path))
redaction_plan = TiffMetadataRedactionPlan(
tiff_info,
base_rules.get_tiff_metadata_rules(),
override_rules.get_tiff_metadata_rules() if override_rules else [],
base_rules.get_metadata_tiff_rules(),
override_rules.get_metadata_tiff_rules() if override_rules else [],
)
redaction_plan.report_plan()
127 changes: 68 additions & 59 deletions imagedephi/rules.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import abc
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -30,100 +31,108 @@ class RuleSource(Enum):
OVERRIDE = "override"


@dataclass
class Rule:
class Rule(abc.ABC):
description: str | None
redact_method: RedactMethod
rule_type: RuleType
rule_source: RuleSource

# Consider factory class fn here
# def make(...):

def get_description(self) -> str:
"""Generate a title for the rule."""
return self.description if self.description else ""


@dataclass
class TiffMetadataRule(Rule):
class TiffRule(Rule):
@abc.abstractmethod
def apply(self, ifd: IFD):
...


class MetadataTiffRule(TiffRule):
tag: tifftools.TiffTag
replace_value: str | bytes | list[int | float] | None
rule_type = RuleType.METADATA

@classmethod
def build(cls, rule_dict: dict, source: RuleSource) -> TiffMetadataRule:
def __init__(self, rule_spec: dict, rule_source: RuleSource) -> None:
"""Transform a rule from schema into an object."""
tag = tifftools.constants.Tag[rule_dict["tag_name"]]
redact_method = RedactMethod[rule_dict["method"].upper()]
return TiffMetadataRule(
description=rule_dict.get("description", None), # this is optional
redact_method=redact_method,
rule_type=RuleType.METADATA,
rule_source=source,
tag=tag,
replace_value=rule_dict["new_value"] if redact_method == RedactMethod.REPLACE else None,
)
self.description = rule_spec.get("description", None) # this is optional
self.redact_method = RedactMethod[rule_spec["method"].upper()]
self.rule_source = rule_source
self.tag = tifftools.constants.Tag[rule_spec["tag_name"]]

def is_match(self, tag: tifftools.TiffTag) -> bool:
return self.tag.value == tag.value

@abc.abstractmethod
def apply(self, ifd: IFD):
if self.redact_method == RedactMethod.DELETE:
del ifd["tags"][self.tag.value]
elif self.redact_method == RedactMethod.REPLACE:
# If rules are constructed via make_rule, this should not be an issue
if self.replace_value is None:
raise RuntimeError(
f"A rule with redaction method {self.redact_method} "
"must have a valid replacement value."
)
ifd["tags"][self.tag.value]["data"] = self.replace_value
elif self.redact_method == RedactMethod.KEEP:
pass
...

def get_description(self) -> str:
if self.description:
return self.description
return f"Tag {self.tag.value} - {self.tag.name}: {self.redact_method} ({self.rule_source})"

@classmethod
def build(cls, rule_spec: dict, rule_source: RuleSource) -> MetadataTiffRule:
# TODO: some input validation here, in case an invalid "method" is provided
redact_method = RedactMethod[rule_spec["method"].upper()]
for rule_class in cls.__subclasses__():
if rule_class.redact_method == redact_method:
return rule_class(rule_spec, rule_source)
else:
raise Exception("Unknown redact_method.")


class ReplaceMetadataTiffRule(MetadataTiffRule):
redact_method = RedactMethod.REPLACE
replace_value: str | bytes | list[int | float]

def __init__(self, rule_spec: dict, source: RuleSource) -> None:
super().__init__(rule_spec, source)
self.replace_value = rule_spec["new_value"]

def apply(self, ifd: IFD):
ifd["tags"][self.tag.value]["data"] = self.replace_value


class DeleteMetadataTiffRule(MetadataTiffRule):
redact_method = RedactMethod.DELETE

def apply(self, ifd: IFD):
del ifd["tags"][self.tag.value]


class KeepMetadataTiffRule(MetadataTiffRule):
redact_method = RedactMethod.KEEP

def apply(self, ifd: IFD):
pass


@dataclass
class RuleSet:
name: str
description: str
rules: dict[FileFormat, dict[RuleType, list[Rule]]]
rules: dict[FileFormat, list[Rule]]

def get_tiff_metadata_rules(self) -> list[TiffMetadataRule]:
return [
rule
for rule in self.rules[FileFormat.TIFF][RuleType.METADATA]
if isinstance(rule, TiffMetadataRule)
]
def get_metadata_tiff_rules(self) -> list[MetadataTiffRule]:
return [rule for rule in self.rules[FileFormat.TIFF] if isinstance(rule, MetadataTiffRule)]


def _build_rule(
file_format: FileFormat, rule_type: RuleType, rule_dict: dict, source: RuleSource
) -> Rule | None:
def _build_rule(file_format: FileFormat, rule_spec: dict, rule_source: RuleSource) -> Rule:
rule_type = RuleType[rule_spec["type"].upper()]
if file_format == FileFormat.TIFF:
if rule_type == RuleType.METADATA:
return TiffMetadataRule.build(rule_dict, source)
return None
return MetadataTiffRule.build(rule_spec, rule_source)

raise Exception("Unsupported rule.")


def build_ruleset(rules_dict: dict, rule_source: RuleSource) -> RuleSet:
def build_ruleset(ruleset_spec: dict, rule_source: RuleSource) -> RuleSet:
"""Read in metadata redaction rules from a file."""
rule_set_rules = {}
for file_format in rules_dict["rules"]:
format_key = FileFormat[file_format.upper()]
format_rules = rules_dict["rules"][file_format]
format_rule_objects: dict[RuleType, list[Rule]] = {
RuleType.IMAGE: [],
RuleType.METADATA: [],
}
for rule in format_rules:
rule_type = RuleType[rule["type"].upper()]
rule = _build_rule(format_key, rule_type, rule, rule_source)
if rule:
format_rule_objects[rule_type].append(rule)
rule_set_rules[format_key] = format_rule_objects
return RuleSet(rules_dict["name"], rules_dict["description"], rule_set_rules)
for file_format_key, rule_specs in ruleset_spec["rules"].items():
file_format = FileFormat[file_format_key.upper()]
rule_set_rules[file_format] = [
_build_rule(file_format, rule_spec, rule_source) for rule_spec in rule_specs
]
return RuleSet(ruleset_spec["name"], ruleset_spec["description"], rule_set_rules)

0 comments on commit bc7f642

Please sign in to comment.