From 2c46f5ec888dc3a9a8be2ae8070f757ce8c0c52f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Zimmermann?= <101292599+ekneg54@users.noreply.github.com> Date: Fri, 12 Jul 2024 15:31:06 +0200 Subject: [PATCH] add attributes to processor result (#630) * add outputs, event to processor_result and rename name to processor_name * add tests for collecting errors in result object * add tests for event reference * refactor pipeline_result * refactor result and remove lock from pipeline --- logprep/abc/processor.py | 59 +++++----- logprep/framework/pipeline.py | 84 ++++++++------ logprep/framework/pipeline_manager.py | 3 +- .../processor/generic_resolver/processor.py | 17 ++- .../processor/hyperscan_resolver/processor.py | 6 +- .../auto_rule_corpus_tester.py | 21 ++-- logprep/util/rule_dry_runner.py | 10 +- tests/unit/framework/test_pipeline.py | 106 ++++++++++-------- tests/unit/processor/amides/test_amides.py | 8 +- tests/unit/processor/base.py | 51 ++++++++- .../processor/calculator/test_calculator.py | 4 +- .../concatenator/test_concatenator.py | 6 +- .../test_datetime_extractor.py | 4 +- .../processor/dissector/test_dissector.py | 4 +- .../test_domain_label_extractor.py | 10 +- .../domain_resolver/test_domain_resolver.py | 6 +- .../field_manager/test_field_manager.py | 16 +-- .../generic_adder/test_generic_adder.py | 8 +- .../generic_resolver/test_generic_resolver.py | 4 +- .../geoip_enricher/test_geoip_enricher.py | 11 +- tests/unit/processor/grokker/test_grokker.py | 4 +- .../test_hyperscan_resolver.py | 4 +- .../processor/ip_informer/test_ip_informer.py | 4 +- .../processor/key_checker/test_key_checker.py | 6 +- .../list_comparison/test_list_comparison.py | 12 +- .../pseudonymizer/test_pseudonymizer.py | 3 + .../processor/requester/test_requester.py | 4 +- .../test_selective_extractor.py | 3 +- .../string_splitter/test_string_splitter.py | 4 +- .../test_template_replacer.py | 4 +- tests/unit/processor/test_process.py | 7 +- .../timestamp_differ/test_timestamp_differ.py | 4 +- .../processor/timestamper/test_timestamper.py | 4 +- .../unit/util/test_auto_rule_corpus_tester.py | 10 +- 34 files changed, 293 insertions(+), 218 deletions(-) diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 56bb533dd..fde55fe6e 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -38,30 +38,28 @@ class ProcessorResult: and errors (incl. warnings). """ - name: str = field(validator=validators.instance_of(str)) data: list = field(validator=validators.instance_of(list), factory=list) + """ The generated extra data """ errors: list = field( validator=validators.deep_iterable( - member_validator=validators.instance_of((ProcessingError, ProcessingWarning)), + member_validator=validators.instance_of(ProcessingError), iterable_validator=validators.instance_of(list), ), factory=list, ) - - def __contains__(self, error_class): - return any(isinstance(item, error_class) for item in self.errors) - - def get_warning_string(self): - """creates a string containing the warnings""" - return ", ".join( - [error.args[0] for error in self.errors if isinstance(error, ProcessingWarning)] - ) - - def get_error_string(self): - """creates a string containing the errors""" - return ", ".join( - [error.args[0] for error in self.errors if isinstance(error, ProcessingError)] - ) + """ The errors that occurred during processing """ + warnings: list = field( + validator=validators.deep_iterable( + member_validator=validators.instance_of(ProcessingWarning), + iterable_validator=validators.instance_of(list), + ), + factory=list, + ) + """ The warnings that occurred during processing """ + processor_name: str = field(validator=validators.instance_of(str)) + """ The name of the processor """ + event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None) + """ A reference to the event that was processed """ class Processor(Component): @@ -138,7 +136,7 @@ def __init__(self, name: str, configuration: "Processor.Config"): specific_rules_targets=self._config.specific_rules, ) self.has_custom_tests = False - self.result = ProcessorResult(name=self.name) + self.result = None @property def _specific_rules(self): @@ -180,23 +178,28 @@ def metric_labels(self) -> dict: "name": self.name, } - def process(self, event: dict): - """Process a log event by calling the implemented `process` method of the - strategy object set in `_strategy` attribute. + def process(self, event: dict) -> ProcessorResult: + """Process a log event. Parameters ---------- event : dict A dictionary representing a log event. + Returns + ------- + ProcessorResult + A ProcessorResult object containing the processed event, errors, + extra data and a list of target outputs. + """ - self.result = ProcessorResult(name=self.name) + self.result = ProcessorResult(processor_name=self.name, event=event) logger.debug(f"{self.describe()} processing event {event}") self._process_rule_tree(event, self._specific_tree) self._process_rule_tree(event, self._generic_tree) return self.result - def _process_rule_tree(self, event: dict, tree: "RuleTree"): + def _process_rule_tree(self, event: dict, tree: RuleTree): applied_rules = set() @Metric.measure_time() @@ -206,14 +209,14 @@ def _process_rule(rule, event): applied_rules.add(rule) return event - def _process_rule_tree_multiple_times(tree, event): + def _process_rule_tree_multiple_times(tree: RuleTree, event: dict): matching_rules = tree.get_matching_rules(event) while matching_rules: for rule in matching_rules: _process_rule(rule, event) matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules) - def _process_rule_tree_once(tree, event): + def _process_rule_tree_once(tree: RuleTree, event: dict): matching_rules = tree.get_matching_rules(event) for rule in matching_rules: _process_rule(rule, event) @@ -231,7 +234,7 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"): except ProcessingCriticalError as error: self.result.errors.append(error) # is needed to prevent wrapping it in itself event.clear() - except BaseException as error: + except Exception as error: self.result.errors.append(ProcessingCriticalError(str(error), rule, event)) event.clear() if not hasattr(rule, "delete_source_fields"): @@ -321,9 +324,9 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None): else: add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags}))) if isinstance(error, ProcessingWarning): - self.result.errors.append(error) + self.result.warnings.append(error) else: - self.result.errors.append(ProcessingWarning(str(error), rule, event)) + self.result.warnings.append(ProcessingWarning(str(error), rule, event)) def _has_missing_values(self, event, rule, source_field_dict): missing_fields = list( diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 6d79e277a..5e5ffd751 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -17,11 +17,10 @@ from ctypes import c_bool from functools import cached_property, partial from importlib.metadata import version -from multiprocessing import Lock, Value, current_process -from typing import Any, List, Tuple, Optional +from multiprocessing import Value, current_process +from typing import Any, Generator, List, Tuple import attrs -import msgspec from logprep.abc.component import Component from logprep.abc.input import ( @@ -53,12 +52,41 @@ class PipelineResult: results: List[ProcessorResult] = attrs.field( validator=[ - attrs.validators.instance_of(list), + attrs.validators.instance_of((list, Generator)), attrs.validators.deep_iterable( member_validator=attrs.validators.instance_of(ProcessorResult) ), ] ) + """List of ProcessorResults""" + event: dict = attrs.field(validator=attrs.validators.instance_of(dict)) + """The event that was processed""" + event_received: dict = attrs.field( + validator=attrs.validators.instance_of(dict), converter=copy.deepcopy + ) + """The event that was received""" + pipeline: list[Processor] + """The pipeline that processed the event""" + + @cached_property + def errors(self) -> List[ProcessingError]: + """Return all processing errors.""" + return list(itertools.chain(*[result.errors for result in self])) + + @cached_property + def warnings(self) -> List[ProcessingWarning]: + """Return all processing warnings.""" + return list(itertools.chain(*[result.warnings for result in self])) + + @cached_property + def data(self) -> List[Tuple[dict, dict]]: + """Return all extra data.""" + return list(itertools.chain(*[result.data for result in self])) + + def __attrs_post_init__(self): + self.results = list( + (processor.process(self.event) for processor in self.pipeline if self.event) + ) def __iter__(self): return iter(self.results) @@ -113,9 +141,6 @@ class Metrics(Component.Metrics): _continue_iterating: Value """ a flag to signal if iterating continues """ - _lock: Lock - """ the lock for the pipeline process """ - pipeline_index: int """ the index of this pipeline """ @@ -175,19 +200,13 @@ def _input(self) -> Input: ) return Factory.create(input_connector_config) - def __init__( - self, config: Configuration, pipeline_index: int = None, lock: Lock = None - ) -> None: + def __init__(self, config: Configuration, pipeline_index: int = None) -> None: self.logger = logging.getLogger("Pipeline") self.logger.name = f"Pipeline{pipeline_index}" self._logprep_config = config self._timeout = config.timeout self._continue_iterating = Value(c_bool) - - self._lock = lock self.pipeline_index = pipeline_index - self._encoder = msgspec.msgpack.Encoder() - self._decoder = msgspec.msgpack.Decoder() if self._logprep_config.profile_pipelines: self.run = partial(PipelineProfiler.profile_function, self.run) @@ -224,43 +243,36 @@ def run(self) -> None: # pylint: disable=method-hidden self._continue_iterating.value = True assert self._input, "Pipeline should not be run without input connector" assert self._output, "Pipeline should not be run without output connector" - with self._lock: - with warnings.catch_warnings(): - warnings.simplefilter("default") - self._setup() + with warnings.catch_warnings(): + warnings.simplefilter("default") + self._setup() self.logger.debug("Start iterating") while self._continue_iterating.value: self.process_pipeline() self._shut_down() @_handle_pipeline_error - def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]: + def process_pipeline(self) -> PipelineResult: """Retrieve next event, process event with full pipeline and store or return results""" Component.run_pending_tasks() event = self._get_event() if not event: return None, None - event_received = copy.deepcopy(event) result: PipelineResult = self.process_event(event) - for processor_result in result: - if not processor_result.errors: - continue - if ProcessingWarning in processor_result: - self.logger.warning(processor_result.get_warning_string()) - if ProcessingError in processor_result: - self.logger.error(processor_result.get_error_string()) - if self._output: - self._store_failed_event(processor_result.errors, event_received, event) - # pipeline is aborted on processing error - return event, result + if result.warnings: + self.logger.warning(",".join((str(warning) for warning in result.warnings))) + if result.errors: + self.logger.error(",".join((str(error) for error in result.errors))) + self._store_failed_event(result.errors, result.event_received, event) + return if self._output: result_data = [res.data for res in result if res.data] if result_data: self._store_extra_data(itertools.chain(*result_data)) if event: self._store_event(event) - return event, result + return result def _store_event(self, event: dict) -> None: for output_name, output in self._output.items(): @@ -288,9 +300,13 @@ def _get_event(self) -> dict: @Metric.measure_time() def process_event(self, event: dict): """process all processors for one event""" - return PipelineResult( - results=[processor.process(event) for processor in self._pipeline if event] + result = PipelineResult( + results=[], + event_received=event, + event=event, + pipeline=self._pipeline, ) + return result def _store_extra_data(self, result_data: List | itertools.chain) -> None: self.logger.debug("Storing extra data") diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index a0cf6181f..bba5ac540 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -64,7 +64,6 @@ def __init__(self, configuration: Configuration): self._pipelines: list[multiprocessing.Process] = [] self._configuration = configuration - self._lock = multiprocessing.Lock() prometheus_config = self._configuration.metrics if prometheus_config.enabled: self.prometheus_exporter = PrometheusExporter(prometheus_config) @@ -164,7 +163,7 @@ def restart(self): self.prometheus_exporter.run() def _create_pipeline(self, index) -> multiprocessing.Process: - pipeline = Pipeline(pipeline_index=index, config=self._configuration, lock=self._lock) + pipeline = Pipeline(pipeline_index=index, config=self._configuration) logger.info("Created new pipeline") process = multiprocessing.Process(target=pipeline.run, daemon=True) process.stop = pipeline.stop diff --git a/logprep/processor/generic_resolver/processor.py b/logprep/processor/generic_resolver/processor.py index 09f4e19a7..aaa1f2dad 100644 --- a/logprep/processor/generic_resolver/processor.py +++ b/logprep/processor/generic_resolver/processor.py @@ -28,18 +28,21 @@ import re from typing import Union -from logprep.processor.base.exceptions import FieldExistsWarning +from logprep.processor.base.exceptions import ( + FieldExistsWarning, + ProcessingCriticalError, +) from logprep.processor.field_manager.processor import FieldManager from logprep.processor.generic_resolver.rule import GenericResolverRule from logprep.util.getter import GetterFactory from logprep.util.helper import add_field_to, get_dotted_field_value -class GenericResolverError(BaseException): +class GenericResolverError(ProcessingCriticalError): """Base class for GenericResolver related exceptions.""" - def __init__(self, name: str, message: str): - super().__init__(f"GenericResolver ({name}): {message}") + def __init__(self, name: str, message: str, rule: GenericResolverRule, event: dict): + super().__init__(f"{name}: {message}", rule=rule, event=event) class GenericResolver(FieldManager): @@ -78,6 +81,8 @@ def _apply_rules(self, event, rule): raise GenericResolverError( self.name, "Mapping group is missing in mapping file pattern!", + rule=rule, + event=event, ) dest_val = replacements.get(mapping) if dest_val: @@ -145,9 +150,13 @@ def ensure_rules_from_file(self, rule): f"Additions file " f'\'{rule.resolve_from_file["path"]}\'' f" must be a dictionary with string values!", + rule=rule, + event=None, ) except FileNotFoundError as error: raise GenericResolverError( self.name, f'Additions file \'{rule.resolve_from_file["path"]}' f"' not found!", + rule=rule, + event=None, ) from error diff --git a/logprep/processor/hyperscan_resolver/processor.py b/logprep/processor/hyperscan_resolver/processor.py index 1c7d13039..d24b84321 100644 --- a/logprep/processor/hyperscan_resolver/processor.py +++ b/logprep/processor/hyperscan_resolver/processor.py @@ -39,6 +39,7 @@ from logprep.processor.base.exceptions import FieldExistsWarning, SkipImportError from logprep.processor.field_manager.processor import FieldManager +from logprep.processor.generic_resolver.processor import GenericResolverError from logprep.util.helper import add_field_to, get_dotted_field_value from logprep.util.validators import directory_validator @@ -56,12 +57,9 @@ # pylint: enable=ungrouped-imports -class HyperscanResolverError(BaseException): +class HyperscanResolverError(GenericResolverError): """Base class for HyperscanResolver related exceptions.""" - def __init__(self, name: str, message: str): - super().__init__(f"HyperscanResolver ({name}): {message}") - class HyperscanResolver(FieldManager): """Resolve values in documents by referencing a mapping list.""" diff --git a/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py b/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py index c9f922de7..6e0692025 100644 --- a/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py +++ b/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py @@ -99,7 +99,7 @@ from colorama import Fore, Style from deepdiff import DeepDiff, grep -from logprep.framework.pipeline import Pipeline +from logprep.framework.pipeline import Pipeline, PipelineResult from logprep.util.configuration import Configuration from logprep.util.helper import get_dotted_field_value from logprep.util.json_handling import parse_json @@ -113,9 +113,8 @@ def convert_extra_data_format(extra_outputs) -> List[Dict]: output target is the key and the values are the actual outputs. """ reformatted_extra_outputs = [] - for extra_output in extra_outputs: - for output in extra_output: - reformatted_extra_outputs.append({str(output[1]): output[0]}) + for value, key in extra_outputs: + reformatted_extra_outputs.append({str(key): value}) return reformatted_extra_outputs @@ -211,18 +210,12 @@ def _run_pipeline_per_test_case(self): print(Style.BRIGHT + "# Test Cases Summary:" + Style.RESET_ALL) for test_case_id, test_case in self._test_cases.items(): _ = [processor.setup() for processor in self._pipeline._pipeline] - parsed_event, result = self._pipeline.process_pipeline() - extra_outputs = convert_extra_data_format( - result.results[processor_result].data - for processor_result in range(len(result.results)) - ) + result: PipelineResult = self._pipeline.process_pipeline() + parsed_event = result.event + extra_outputs = convert_extra_data_format(result.data) test_case.generated_output = parsed_event test_case.generated_extra_output = extra_outputs - test_case.warnings = [ - result.results[processor_result].errors - for processor_result in range(len(result.results)) - ] - test_case.warnings = list(itertools.chain(*test_case.warnings)) + test_case.warnings = result.warnings self._compare_logprep_outputs(test_case_id, parsed_event) self._compare_extra_data_output(test_case_id, extra_outputs) self._print_pass_fail_statements(test_case_id) diff --git a/logprep/util/rule_dry_runner.py b/logprep/util/rule_dry_runner.py index 66684a7e2..0c6782fd6 100644 --- a/logprep/util/rule_dry_runner.py +++ b/logprep/util/rule_dry_runner.py @@ -45,7 +45,7 @@ from colorama import Back, Fore from ruamel.yaml import YAML -from logprep.framework.pipeline import Pipeline +from logprep.framework.pipeline import Pipeline, PipelineResult from logprep.util.auto_rule_tester.auto_rule_corpus_tester import ( convert_extra_data_format, ) @@ -103,11 +103,9 @@ def run(self): transformed_cnt = 0 output_count = 0 for input_document in self._input_documents: - test_output, result = self._pipeline.process_pipeline() - test_output_custom = convert_extra_data_format( - result.results[processor_result].data - for processor_result in range(len(result.results)) - ) + result: PipelineResult = self._pipeline.process_pipeline() + test_output = result.event + test_output_custom = convert_extra_data_format(result.data) if test_output: output_count += 1 diff = self._print_output_results(input_document, test_output, test_output_custom) diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index eff1d2a95..2f30825a6 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -26,7 +26,7 @@ ) from logprep.abc.processor import ProcessorResult from logprep.factory import Factory -from logprep.framework.pipeline import Pipeline +from logprep.framework.pipeline import Pipeline, PipelineResult from logprep.processor.base.exceptions import ( FieldExistsWarning, ProcessingCriticalError, @@ -52,7 +52,6 @@ class ConfigurationForTests: "metrics": {"enabled": False}, } ) - lock = Lock() def get_mock_create(): @@ -63,7 +62,7 @@ def get_mock_create(): mock_create = mock.MagicMock() mock_component = mock.MagicMock() mock_component.process = mock.MagicMock() - mock_component.process.return_value = ProcessorResult(name="mock_processor") + mock_component.process.return_value = ProcessorResult(processor_name="mock_processor") mock_create.return_value = mock_component return mock_create @@ -76,7 +75,6 @@ def setup_method(self): self.pipeline = Pipeline( pipeline_index=1, config=self.logprep_config, - lock=self.lock, ) def test_pipeline_property_returns_pipeline(self, mock_create): @@ -133,7 +131,7 @@ def test_empty_documents_are_not_forwarded_to_other_processors(self, _): processor_with_mock_result = mock.MagicMock() processor_with_mock_result.process = mock.MagicMock() processor_with_mock_result.process.return_value = ProcessorResult( - name="processor_with_mock_res" + processor_name="processor_with_mock_res" ) self.pipeline._pipeline = [ processor_with_mock_result, @@ -160,7 +158,9 @@ def test_not_empty_documents_are_stored_in_the_output(self, _): def test_empty_documents_are_not_stored_in_the_output(self, _): def mock_process_event(event): event.clear() - return [ProcessorResult(name="")] + return PipelineResult( + event=event, event_received=event, results=[], pipeline=self.pipeline._pipeline + ) self.pipeline.process_event = mock_process_event self.pipeline._setup() @@ -239,19 +239,21 @@ def test_output_warning_error_is_logged_but_processing_continues(self, mock_warn assert mock_warning.call_count == 1 assert self.pipeline._output["dummy"].store.call_count == 3 - def test_processor_warning_error_is_logged_but_processing_continues(self, _): + @mock.patch("logging.Logger.warning") + def test_processor_warning_error_is_logged_but_processing_continues(self, mock_warning, _): self.pipeline._setup() self.pipeline._input.get_next.return_value = ({"message": "test"}, None) mock_rule = mock.MagicMock() processing_warning = ProcessingWarning("not so bad", mock_rule, {"message": "test"}) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="mock_processor", errors=[processing_warning] + processor_name="mock_processor", warnings=[processing_warning] ) - self.pipeline.process_pipeline() self.pipeline._input.get_next.return_value = ({"message": "test"}, None) - _, result = self.pipeline.process_pipeline() - assert processing_warning in result.results[0].errors + result = self.pipeline.process_pipeline() + assert processing_warning in result.results[0].warnings + mock_warning.assert_called() + assert "ProcessingWarning: not so bad" in mock_warning.call_args[0][0] assert self.pipeline._output["dummy"].store.call_count == 2, "all events are processed" @mock.patch("logging.Logger.error") @@ -264,37 +266,20 @@ def test_processor_critical_error_is_logged_event_is_stored_in_error_output( self.pipeline._input.get_next.return_value = (input_event1, None) mock_rule = mock.MagicMock() self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="", + processor_name="", errors=[ProcessingCriticalError("really bad things happened", mock_rule, input_event1)], ) self.pipeline.process_pipeline() self.pipeline._input.get_next.return_value = (input_event2, None) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="", + processor_name="", errors=[ProcessingCriticalError("really bad things happened", mock_rule, input_event2)], ) self.pipeline.process_pipeline() assert self.pipeline._input.get_next.call_count == 2, "2 events gone into processing" assert mock_error.call_count == 2, f"two errors occurred: {mock_error.mock_calls}" - - logger_calls = ( - mock.call( - str( - ProcessingCriticalError( - "really bad things happened", mock_rule, {"message": "first event"} - ) - ) - ), - mock.call( - str( - ProcessingCriticalError( - "really bad things happened", mock_rule, {"message": "second event"} - ) - ) - ), - ) - mock_error.assert_has_calls(logger_calls) + assert "ProcessingCriticalError: really bad things happened" in mock_error.call_args[0][0] assert self.pipeline._output["dummy"].store.call_count == 0, "no event in output" assert ( self.pipeline._output["dummy"].store_failed.call_count == 2 @@ -311,9 +296,13 @@ def test_processor_logs_processing_error_and_warnings_separately( mock_rule = mock.MagicMock() self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) warning = FieldExistsWarning(mock_rule, input_event1, ["foo"]) - self.pipeline._pipeline[0].process.return_value = ProcessorResult(name="", errors=[warning]) + self.pipeline._pipeline[0].process.return_value = ProcessorResult( + processor_name="", warnings=[warning] + ) error = ProcessingCriticalError("really bad things happened", mock_rule, input_event1) - self.pipeline._pipeline[1].process.return_value = ProcessorResult(name="", errors=[error]) + self.pipeline._pipeline[1].process.return_value = ProcessorResult( + processor_name="", errors=[error] + ) self.pipeline.process_pipeline() assert mock_error.call_count == 1, f"one error occurred: {mock_error.mock_calls}" assert mock_warning.call_count == 1, f"one warning occurred: {mock_warning.mock_calls}" @@ -438,7 +427,7 @@ def test_extra_data_tuple_is_passed_to_store_custom(self, _): self.pipeline._input.get_next.return_value = ({"some": "event"}, None) self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="", data=[({"foo": "bar"}, ({"dummy": "target"},))] + processor_name="", data=[({"foo": "bar"}, ({"dummy": "target"},))] ) self.pipeline._pipeline.append(deepcopy(self.pipeline._pipeline[0])) self.pipeline.process_pipeline() @@ -451,7 +440,7 @@ def test_store_custom_calls_all_defined_outputs(self, _): self.pipeline._setup() self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="", + processor_name="", data=[ ( {"foo": "bar"}, @@ -474,7 +463,7 @@ def test_extra_data_list_is_passed_to_store_custom(self, _): self.pipeline._setup() self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="", data=[({"foo": "bar"}, ({"dummy": "target"},))] + processor_name="", data=[({"foo": "bar"}, ({"dummy": "target"},))] ) self.pipeline._pipeline.append(deepcopy(self.pipeline._pipeline[0])) self.pipeline._input.get_next.return_value = ({"some": "event"}, None) @@ -615,6 +604,27 @@ def test_shutdown_logs_fatal_errors(self, mock_error, _): logger_call = f"Couldn't gracefully shut down pipeline due to: {error}" mock_error.assert_called_with(logger_call) + def test_pipeline_result_provides_event_received(self, _): + self.pipeline._setup() + event = {"some": "event"} + self.pipeline._input.get_next.return_value = (event, None) + generic_adder = original_create( + { + "generic_adder": { + "type": "generic_adder", + "specific_rules": [ + {"filter": "some", "generic_adder": {"add": {"field": "foo"}}} + ], + "generic_rules": [], + } + } + ) + self.pipeline._pipeline = [generic_adder] + result = self.pipeline.process_pipeline() + assert result.event_received is not event, "event_received is a copy" + assert result.event_received == {"some": "event"}, "received event is as expected" + assert result.event == {"some": "event", "field": "foo"}, "processed event is as expected" + class TestPipelineWithActualInput: def setup_method(self): @@ -633,10 +643,9 @@ def test_pipeline_without_output_connector_and_one_input_event_and_preprocessors self.config.input["test_input"]["documents"] = [{"applyrule": "yes"}] pipeline = Pipeline(config=self.config) assert pipeline._output is None - event, extra_outputs = pipeline.process_pipeline() - assert event["label"] == {"reporter": ["windows"]} - assert "arrival_time" in event - assert extra_outputs.results[0].data == [] + result = pipeline.process_pipeline() + assert result.event["label"] == {"reporter": ["windows"]} + assert "arrival_time" in result.event def test_process_event_processes_without_input_and_without_output(self): event = {"applyrule": "yes"} @@ -656,13 +665,12 @@ def test_pipeline_without_output_connector_and_two_input_events_and_preprocessor self.config.input["test_input"]["documents"] = input_events pipeline = Pipeline(config=self.config) assert pipeline._output is None - event, extra_outputs = pipeline.process_pipeline() - assert event["label"] == {"reporter": ["windows"]} - assert "arrival_time" in event - event, extra_outputs = pipeline.process_pipeline() - assert "pseudonym" in event.get("winlog", {}).get("event_data", {}).get("IpAddress") - assert "arrival_time" in event - assert len(extra_outputs.results) == len(pipeline._pipeline) + result = pipeline.process_pipeline() + assert result.event["label"] == {"reporter": ["windows"]} + assert "arrival_time" in result.event + result = pipeline.process_pipeline() + assert "pseudonym" in result.event.get("winlog", {}).get("event_data", {}).get("IpAddress") + assert "arrival_time" in result.event def test_pipeline_hmac_error_message_without_output_connector(self): self.config.input["test_input"]["documents"] = [{"applyrule": "yes"}] @@ -671,8 +679,8 @@ def test_pipeline_hmac_error_message_without_output_connector(self): } pipeline = Pipeline(config=self.config) assert pipeline._output is None - event, _ = pipeline.process_pipeline() - assert event["hmac"]["hmac"] == "error" + result = pipeline.process_pipeline() + assert result.event["hmac"]["hmac"] == "error" def test_pipeline_run_raises_assertion_when_run_without_input(self): self.config.input = {} diff --git a/tests/unit/processor/amides/test_amides.py b/tests/unit/processor/amides/test_amides.py index 7ffc25cfe..4254cf71c 100644 --- a/tests/unit/processor/amides/test_amides.py +++ b/tests/unit/processor/amides/test_amides.py @@ -168,9 +168,11 @@ def test_process_event_raise_duplication_error(self): self.object.process(document) assert document.get("amides") result = self.object.process(document) - assert len(result.errors) > 0 - assert re.match(r".*missing source_fields: \['process.command_line'].*", str(result.errors)) - assert re.match(".*FieldExistsWarning.*", str(result.errors)) + assert len(result.warnings) > 0 + assert re.match( + r".*missing source_fields: \['process.command_line'].*", str(result.warnings) + ) + assert re.match(".*FieldExistsWarning.*", str(result.warnings)) def test_setup_get_model_via_file_getter(self, tmp_path, monkeypatch): model_uri = "file://tests/testdata/unit/amides/model.zip" diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index 5c3b7d519..d0adc908b 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -19,6 +19,8 @@ from logprep.factory import Factory from logprep.framework.rule_tree.rule_tree import RuleTree from logprep.metrics.metrics import CounterMetric, HistogramMetric +from logprep.processor.base.exceptions import ProcessingCriticalError +from logprep.processor.base.rule import Rule from logprep.util.json_handling import list_json_files_in_directory from tests.unit.component.base import BaseComponentTestCase @@ -75,10 +77,12 @@ def set_rules(rules_dirs): rules.append(rule) return rules - def _load_specific_rule(self, rule): + def _load_specific_rule(self, rule: dict | Rule): self.object._generic_tree = RuleTree() self.object._specific_tree = RuleTree() - specific_rule = self.object.rule_class._create_from_dict(rule) + specific_rule = ( + self.object.rule_class._create_from_dict(rule) if isinstance(rule, dict) else rule + ) self.object._specific_tree.add_rule(specific_rule, self.logger) def setup_method(self) -> None: @@ -95,6 +99,28 @@ def setup_method(self) -> None: self.object = Factory.create(configuration=config) self.specific_rules = self.set_rules(self.specific_rules_dirs) self.generic_rules = self.set_rules(self.generic_rules_dirs) + self.match_all_event = { + "message": "event", + "winlog": { + "event_id": 1, + "provider_name": "Microsoft-Windows-Sysmon", + "event_data": {"IpAddress": "127.0.0.54"}, + }, + "field1": "foo", + "field2": "bar", + "another_random_field": "baz", + "@timestamp": "2021-01-01T00:00:00.000Z", + "delete_event": "does not matter", + "irrelevant": "does not matter", + "url": "http://example.local", + "drop_me": "does not matter", + "add_generic_test": "does not matter", + "anything": "does not matter", + "client": {"ip": "127.0.0.54"}, + "ips": ["127.0.0.54", "192.168.4.33"], + "applyrule": "yes", + "A": "foobarfoo", + } # this is an event that can be used in all processor tests, cause it matches everywhere def teardown_method(self) -> None: """teardown for all methods""" @@ -290,6 +316,21 @@ def test_process_return_result_object(self): event = {"some": "event"} result = self.object.process(event) assert isinstance(result, ProcessorResult) - assert result.data == [] - assert result.errors == [] - assert result.name == "Test Instance Name" + assert isinstance(result.data, list) + assert isinstance(result.errors, list) + assert result.processor_name == "Test Instance Name" + + def test_process_collects_errors_in_result_object(self): + with mock.patch.object( + self.object, + "_apply_rules", + side_effect=ProcessingCriticalError( + "side effect", rule=self.object.rules[0], event=self.match_all_event + ), + ): + result = self.object.process(self.match_all_event) + assert len(result.errors) > 0, "minimum one error should be in result object" + + def test_result_object_has_reference_to_event(self): + result = self.object.process(self.match_all_event) + assert result.event is self.match_all_event diff --git a/tests/unit/processor/calculator/test_calculator.py b/tests/unit/processor/calculator/test_calculator.py index bb9dc07e4..be49fac5e 100644 --- a/tests/unit/processor/calculator/test_calculator.py +++ b/tests/unit/processor/calculator/test_calculator.py @@ -355,8 +355,8 @@ def test_testcases_failure_handling(self, testcase, rule, event, expected, error self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(rf".*{error_message}", str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(rf".*{error_message}", str(result.warnings[0])) assert event == expected, testcase @pytest.mark.parametrize( diff --git a/tests/unit/processor/concatenator/test_concatenator.py b/tests/unit/processor/concatenator/test_concatenator.py index c6c3df208..e29f6a8e5 100644 --- a/tests/unit/processor/concatenator/test_concatenator.py +++ b/tests/unit/processor/concatenator/test_concatenator.py @@ -170,7 +170,7 @@ def test_for_expected_output(self, test_case, rule, document, expected_output): self.object.process(document) assert document == expected_output, test_case - def test_process_raises_duplication_error_if_target_field_exists_and_should_not_be_overwritten( + def test_process_raises_field_exists_warning_if_target_field_exists_and_should_not_be_overwritten( self, ): rule = { @@ -186,8 +186,8 @@ def test_process_raises_duplication_error_if_target_field_exists_and_should_not_ self._load_specific_rule(rule) document = {"field": {"a": "first", "b": "second"}, "target_field": "has already content"} result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert "target_field" in document assert document.get("target_field") == "has already content" assert document.get("tags") == ["_concatenator_failure"] diff --git a/tests/unit/processor/datetime_extractor/test_datetime_extractor.py b/tests/unit/processor/datetime_extractor/test_datetime_extractor.py index 39aa9e344..2521194eb 100644 --- a/tests/unit/processor/datetime_extractor/test_datetime_extractor.py +++ b/tests/unit/processor/datetime_extractor/test_datetime_extractor.py @@ -181,8 +181,8 @@ def test_existing_target_raises_if_not_overwrite_target(self): } self._load_specific_rule(rule) result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) @staticmethod def _parse_local_tz(tz_local_name): diff --git a/tests/unit/processor/dissector/test_dissector.py b/tests/unit/processor/dissector/test_dissector.py index f0c833e4e..92c41fd96 100644 --- a/tests/unit/processor/dissector/test_dissector.py +++ b/tests/unit/processor/dissector/test_dissector.py @@ -731,6 +731,6 @@ def test_testcases(self, testcase, rule, event, expected): # pylint: disable=un def test_testcases_failure_handling(self, testcase, rule, event, expected): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], ProcessingWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], ProcessingWarning) assert event == expected, testcase diff --git a/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py b/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py index 3a0a6b311..fc0dcc308 100644 --- a/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py +++ b/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py @@ -245,8 +245,8 @@ def test_domain_extraction_with_ipv6_target(self): def test_domain_extraction_with_existing_output_field(self): document = {"url": {"domain": "test.domain.de", "subdomain": "exists already"}} result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) def test_domain_extraction_overwrites_target_field(self): document = {"url": {"domain": "test.domain.de", "subdomain": "exists already"}} @@ -314,7 +314,7 @@ def test_does_nothing_if_source_field_not_exits(self): self.object.process(document) assert document == expected - def test_raises_duplication_error_if_target_field_exits(self): + def test_raises_field_exists_warning_if_target_field_exits(self): document = {"url": {"domain": "test.domain.de", "subdomain": "exists already"}} expected = { "tags": ["_domain_label_extractor_failure"], @@ -336,8 +336,8 @@ def test_raises_duplication_error_if_target_field_exits(self): } self._load_specific_rule(rule_dict) result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected @responses.activate diff --git a/tests/unit/processor/domain_resolver/test_domain_resolver.py b/tests/unit/processor/domain_resolver/test_domain_resolver.py index ea49466a8..07e208d6d 100644 --- a/tests/unit/processor/domain_resolver/test_domain_resolver.py +++ b/tests/unit/processor/domain_resolver/test_domain_resolver.py @@ -228,12 +228,12 @@ def test_configured_dotted_subfield(self, _): assert document == expected @mock.patch("socket.gethostbyname", return_value="1.2.3.4") - def test_duplication_error(self, _): + def test_field_exits_warning(self, _): document = {"client": "google.de"} result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) @mock.patch("socket.gethostbyname", return_value="1.2.3.4") def test_no_duplication_error(self, _): diff --git a/tests/unit/processor/field_manager/test_field_manager.py b/tests/unit/processor/field_manager/test_field_manager.py index 0a12d0ffc..5a71fb849 100644 --- a/tests/unit/processor/field_manager/test_field_manager.py +++ b/tests/unit/processor/field_manager/test_field_manager.py @@ -591,11 +591,11 @@ def test_testcases(self, testcase, rule, event, expected): # pylint: disable=un def test_testcases_failure_handling(self, testcase, rule, event, expected, error): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(error, str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(error, str(result.warnings[0])) assert event == expected, testcase - def test_process_raises_duplication_error_if_target_field_exists_and_should_not_be_overwritten( + def test_process_raises_field_exists_warning_if_target_field_exists_and_should_not_be_overwritten( self, ): rule = { @@ -610,7 +610,7 @@ def test_process_raises_duplication_error_if_target_field_exists_and_should_not_ self._load_specific_rule(rule) document = {"field": {"a": "first", "b": "second"}, "target_field": "has already content"} result = self.object.process(document) - assert isinstance(result.errors[0], FieldExistsWarning) + assert isinstance(result.warnings[0], FieldExistsWarning) assert "target_field" in document assert document.get("target_field") == "has already content" assert document.get("tags") == ["_field_manager_failure"] @@ -626,10 +626,10 @@ def test_process_raises_processing_warning_with_missing_fields(self): self._load_specific_rule(rule) document = {"field": {"a": "first", "b": "second"}} result = self.object.process(document) - assert len(result.errors) == 1 + assert len(result.warnings) == 1 assert re.match( r".*ProcessingWarning.*missing source_fields: \['does.not.exists'\]", - str(result.errors[0]), + str(result.warnings[0]), ) def test_process_raises_processing_warning_with_missing_fields_but_event_is_processed(self): @@ -650,10 +650,10 @@ def test_process_raises_processing_warning_with_missing_fields_but_event_is_proc "tags": ["_field_manager_missing_field_warning"], } result = self.object.process(document) - assert len(result.errors) == 1 + assert len(result.warnings) == 1 assert re.match( r".*ProcessingWarning.*missing source_fields: \['does.not.exists'\]", - str(result.errors[0]), + str(result.warnings[0]), ) assert document == expected diff --git a/tests/unit/processor/generic_adder/test_generic_adder.py b/tests/unit/processor/generic_adder/test_generic_adder.py index c657d4ea4..5fe29930b 100644 --- a/tests/unit/processor/generic_adder/test_generic_adder.py +++ b/tests/unit/processor/generic_adder/test_generic_adder.py @@ -409,8 +409,8 @@ def test_generic_adder_testcases_failure_handling( ): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(rf".*FieldExistsWarning.*{error_message}", str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(rf".*FieldExistsWarning.*{error_message}", str(result.warnings[0])) assert event == expected, testcase def test_add_generic_fields_from_file_missing_and_existing_with_all_required(self): @@ -611,8 +611,8 @@ def test_sql_database_raises_exception_on_duplicate(self, caplog): self.object.process(document) result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected diff --git a/tests/unit/processor/generic_resolver/test_generic_resolver.py b/tests/unit/processor/generic_resolver/test_generic_resolver.py index 1e95ea3c2..de9237741 100644 --- a/tests/unit/processor/generic_resolver/test_generic_resolver.py +++ b/tests/unit/processor/generic_resolver/test_generic_resolver.py @@ -420,8 +420,8 @@ def test_resolve_dotted_src_and_dest_field_and_conflict_match(self, caplog): "re": {"solved": "I already exist!"}, } result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_resolve_generic_and_multiple_match_first_only(self): diff --git a/tests/unit/processor/geoip_enricher/test_geoip_enricher.py b/tests/unit/processor/geoip_enricher/test_geoip_enricher.py index 911cbcff9..5d95772db 100644 --- a/tests/unit/processor/geoip_enricher/test_geoip_enricher.py +++ b/tests/unit/processor/geoip_enricher/test_geoip_enricher.py @@ -122,10 +122,11 @@ def test_no_geoip_data_added_if_source_field_is_none(self): def test_source_field_is_none_emits_missing_fields_warning(self): document = {"client": {"ip": None}} expected = {"client": {"ip": None}, "tags": ["_geoip_enricher_missing_field_warning"]} - self.object._apply_rules(document, self.object.rules[0]) - assert len(self.object.result.errors) == 1 + self._load_specific_rule(self.object.rules[0]) + self.object.process(document) + assert len(self.object.result.warnings) == 1 assert re.match( - r".*missing source_fields: \['client\.ip'].*", str(self.object.result.errors[0]) + r".*missing source_fields: \['client\.ip'].*", str(self.object.result.warnings[0]) ) assert document == expected @@ -163,8 +164,8 @@ def test_enrich_an_event_geoip(self): def test_enrich_an_event_geoip_with_existing_differing_geoip(self): document = {"client": {"ip": "8.8.8.8"}, "geoip": {"type": "Feature"}} result = self.object.process(document) - assert len(result.errors) == 1 - assert re.match(".*FieldExistsWarning.*geoip.type", str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(".*FieldExistsWarning.*geoip.type", str(result.warnings[0])) def test_configured_dotted_output_field(self): document = {"source": {"ip": "8.8.8.8"}} diff --git a/tests/unit/processor/grokker/test_grokker.py b/tests/unit/processor/grokker/test_grokker.py index 3b0665cdc..dc092a27b 100644 --- a/tests/unit/processor/grokker/test_grokker.py +++ b/tests/unit/processor/grokker/test_grokker.py @@ -433,8 +433,8 @@ def test_testcases_failure_handling(self, testcase, rule, event, expected, error self.object.setup() if isinstance(error, str): result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(rf".*{error}", str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(rf".*{error}", str(result.warnings[0])) assert event == expected, testcase else: result = self.object.process(event) diff --git a/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py b/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py index 217be9214..f644c7cf5 100644 --- a/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py +++ b/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py @@ -378,8 +378,8 @@ def test_resolve_dotted_and_dest_field_with_conflict_match(self): "tags": ["_hyperscan_resolver_failure"], } result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_resolve_with_multiple_match_first_only(self): diff --git a/tests/unit/processor/ip_informer/test_ip_informer.py b/tests/unit/processor/ip_informer/test_ip_informer.py index 815830c75..0ef5e25b1 100644 --- a/tests/unit/processor/ip_informer/test_ip_informer.py +++ b/tests/unit/processor/ip_informer/test_ip_informer.py @@ -425,6 +425,6 @@ def test_testcases(self, testcase, rule, event, expected): def test_testcases_failure_handling(self, testcase, rule, event, expected): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], ProcessingWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], ProcessingWarning) assert event == expected, testcase diff --git a/tests/unit/processor/key_checker/test_key_checker.py b/tests/unit/processor/key_checker/test_key_checker.py index 1a73e1c32..0391c3e82 100644 --- a/tests/unit/processor/key_checker/test_key_checker.py +++ b/tests/unit/processor/key_checker/test_key_checker.py @@ -255,7 +255,7 @@ def test_testcases_positiv( self.object.process(event) assert event == expected - def test_raises_duplication_error(self): + def test_field_exists_warning(self): rule_dict = { "filter": "*", "key_checker": { @@ -273,5 +273,5 @@ def test_raises_duplication_error(self): "missing_fields": ["i.exists.already"], } result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) diff --git a/tests/unit/processor/list_comparison/test_list_comparison.py b/tests/unit/processor/list_comparison/test_list_comparison.py index 6fb49c5c1..385227ea7 100644 --- a/tests/unit/processor/list_comparison/test_list_comparison.py +++ b/tests/unit/processor/list_comparison/test_list_comparison.py @@ -159,8 +159,8 @@ def test_target_field_exists_and_cant_be_extended(self): self._load_specific_rule(rule_dict) self.object.setup() result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_intermediate_output_field_is_wrong_type(self): @@ -188,8 +188,8 @@ def test_intermediate_output_field_is_wrong_type(self): self._load_specific_rule(rule_dict) self.object.setup() result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_check_in_dotted_subfield(self): @@ -244,8 +244,8 @@ def test_overwrite_target_field(self): self._load_specific_rule(rule_dict) self.object.setup() result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected @responses.activate diff --git a/tests/unit/processor/pseudonymizer/test_pseudonymizer.py b/tests/unit/processor/pseudonymizer/test_pseudonymizer.py index 79a2df622..215ed622c 100644 --- a/tests/unit/processor/pseudonymizer/test_pseudonymizer.py +++ b/tests/unit/processor/pseudonymizer/test_pseudonymizer.py @@ -6,6 +6,7 @@ import re from copy import deepcopy from pathlib import Path +from unittest import mock import pytest @@ -814,6 +815,7 @@ def test_replace_regex_keywords_by_regex_expression_is_idempotent(self): assert self.object._specific_tree.rules[0].pseudonyms == {"something": expected_pattern} def test_pseudonymize_string_adds_pseudonyms(self): + self.object.result = ProcessorResult(processor_name="test") assert self.object._pseudonymize_string("foo").startswith("