diff --git a/CHANGELOG.md b/CHANGELOG.md index c70bcf366..96cf82f38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ ### Breaking ### Features ### Improvements + +* a result object was added to processors and pipelines + * each processor returns an object including the processor name, generated extra_data, warnings + and errors + * the pipeline returns an object with the list of all processor result objects + ### Bugfix ## 13.0.0 diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 84f3d6e3f..fde55fe6e 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -3,7 +3,7 @@ import logging from abc import abstractmethod from pathlib import Path -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional from attr import define, field, validators @@ -13,6 +13,7 @@ from logprep.processor.base.exceptions import ( FieldExistsWarning, ProcessingCriticalError, + ProcessingError, ProcessingWarning, ) from logprep.util import getter @@ -30,6 +31,37 @@ logger = logging.getLogger("Processor") +@define(kw_only=True) +class ProcessorResult: + """ + Result object to be returned by every processor. It contains the processor name, created data + and errors (incl. warnings). + """ + + data: list = field(validator=validators.instance_of(list), factory=list) + """ The generated extra data """ + errors: list = field( + validator=validators.deep_iterable( + member_validator=validators.instance_of(ProcessingError), + iterable_validator=validators.instance_of(list), + ), + factory=list, + ) + """ The errors that occurred during processing """ + warnings: list = field( + validator=validators.deep_iterable( + member_validator=validators.instance_of(ProcessingWarning), + iterable_validator=validators.instance_of(list), + ), + factory=list, + ) + """ The warnings that occurred during processing """ + processor_name: str = field(validator=validators.instance_of(str)) + """ The name of the processor """ + event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None) + """ A reference to the event that was processed """ + + class Processor(Component): """Abstract Processor Class to define the Interface""" @@ -76,7 +108,7 @@ class Config(Component.Config): "_event", "_specific_tree", "_generic_tree", - "_extra_data", + "result", ] rule_class: "Rule" @@ -84,8 +116,8 @@ class Config(Component.Config): _event: dict _specific_tree: RuleTree _generic_tree: RuleTree - _extra_data: List[Tuple[dict, Tuple[dict]]] _strategy = None + result: ProcessorResult def __init__(self, name: str, configuration: "Processor.Config"): super().__init__(name, configuration) @@ -104,7 +136,7 @@ def __init__(self, name: str, configuration: "Processor.Config"): specific_rules_targets=self._config.specific_rules, ) self.has_custom_tests = False - self._extra_data = [] + self.result = None @property def _specific_rules(self): @@ -146,23 +178,28 @@ def metric_labels(self) -> dict: "name": self.name, } - def process(self, event: dict): - """Process a log event by calling the implemented `process` method of the - strategy object set in `_strategy` attribute. + def process(self, event: dict) -> ProcessorResult: + """Process a log event. Parameters ---------- event : dict A dictionary representing a log event. + Returns + ------- + ProcessorResult + A ProcessorResult object containing the processed event, errors, + extra data and a list of target outputs. + """ - self._extra_data.clear() + self.result = ProcessorResult(processor_name=self.name, event=event) logger.debug(f"{self.describe()} processing event {event}") self._process_rule_tree(event, self._specific_tree) self._process_rule_tree(event, self._generic_tree) - return self._extra_data if self._extra_data else None + return self.result - def _process_rule_tree(self, event: dict, tree: "RuleTree"): + def _process_rule_tree(self, event: dict, tree: RuleTree): applied_rules = set() @Metric.measure_time() @@ -172,14 +209,14 @@ def _process_rule(rule, event): applied_rules.add(rule) return event - def _process_rule_tree_multiple_times(tree, event): + def _process_rule_tree_multiple_times(tree: RuleTree, event: dict): matching_rules = tree.get_matching_rules(event) while matching_rules: for rule in matching_rules: _process_rule(rule, event) matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules) - def _process_rule_tree_once(tree, event): + def _process_rule_tree_once(tree: RuleTree, event: dict): matching_rules = tree.get_matching_rules(event) for rule in matching_rules: _process_rule(rule, event) @@ -195,9 +232,11 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"): except ProcessingWarning as error: self._handle_warning_error(event, rule, error) except ProcessingCriticalError as error: - raise error # is needed to prevent wrapping it in itself - except BaseException as error: - raise ProcessingCriticalError(str(error), rule, event) from error + self.result.errors.append(error) # is needed to prevent wrapping it in itself + event.clear() + except Exception as error: + self.result.errors.append(ProcessingCriticalError(str(error), rule, event)) + event.clear() if not hasattr(rule, "delete_source_fields"): return if rule.delete_source_fields: @@ -285,9 +324,9 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None): else: add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags}))) if isinstance(error, ProcessingWarning): - logger.warning(str(error)) + self.result.warnings.append(error) else: - logger.warning(str(ProcessingWarning(str(error), rule, event))) + self.result.warnings.append(ProcessingWarning(str(error), rule, event)) def _has_missing_values(self, event, rule, source_field_dict): missing_fields = list( diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index d8c0433ab..5e5ffd751 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -6,6 +6,7 @@ """ import copy +import itertools import logging import logging.handlers import multiprocessing @@ -16,11 +17,10 @@ from ctypes import c_bool from functools import cached_property, partial from importlib.metadata import version -from multiprocessing import Lock, Value, current_process -from typing import Any, List, Tuple +from multiprocessing import Value, current_process +from typing import Any, Generator, List, Tuple import attrs -import msgspec from logprep.abc.component import Component from logprep.abc.input import ( @@ -37,14 +37,61 @@ Output, OutputWarning, ) -from logprep.abc.processor import Processor +from logprep.abc.processor import Processor, ProcessorResult from logprep.factory import Factory from logprep.metrics.metrics import HistogramMetric, Metric -from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning +from logprep.processor.base.exceptions import ProcessingError, ProcessingWarning from logprep.util.configuration import Configuration from logprep.util.pipeline_profiler import PipelineProfiler +@attrs.define(kw_only=True) +class PipelineResult: + """Result object to be returned after processing the event. + It contains all results of each processor of the pipeline.""" + + results: List[ProcessorResult] = attrs.field( + validator=[ + attrs.validators.instance_of((list, Generator)), + attrs.validators.deep_iterable( + member_validator=attrs.validators.instance_of(ProcessorResult) + ), + ] + ) + """List of ProcessorResults""" + event: dict = attrs.field(validator=attrs.validators.instance_of(dict)) + """The event that was processed""" + event_received: dict = attrs.field( + validator=attrs.validators.instance_of(dict), converter=copy.deepcopy + ) + """The event that was received""" + pipeline: list[Processor] + """The pipeline that processed the event""" + + @cached_property + def errors(self) -> List[ProcessingError]: + """Return all processing errors.""" + return list(itertools.chain(*[result.errors for result in self])) + + @cached_property + def warnings(self) -> List[ProcessingWarning]: + """Return all processing warnings.""" + return list(itertools.chain(*[result.warnings for result in self])) + + @cached_property + def data(self) -> List[Tuple[dict, dict]]: + """Return all extra data.""" + return list(itertools.chain(*[result.data for result in self])) + + def __attrs_post_init__(self): + self.results = list( + (processor.process(self.event) for processor in self.pipeline if self.event) + ) + + def __iter__(self): + return iter(self.results) + + def _handle_pipeline_error(func): def _inner(self: "Pipeline") -> Any: try: @@ -94,9 +141,6 @@ class Metrics(Component.Metrics): _continue_iterating: Value """ a flag to signal if iterating continues """ - _lock: Lock - """ the lock for the pipeline process """ - pipeline_index: int """ the index of this pipeline """ @@ -127,7 +171,7 @@ def metric_labels(self) -> dict: } @cached_property - def _pipeline(self) -> tuple: + def _pipeline(self) -> list[Processor]: self.logger.debug(f"Building '{self._process_name}'") pipeline = [self._create_processor(entry) for entry in self._logprep_config.pipeline] self.logger.debug("Finished building pipeline") @@ -156,19 +200,13 @@ def _input(self) -> Input: ) return Factory.create(input_connector_config) - def __init__( - self, config: Configuration, pipeline_index: int = None, lock: Lock = None - ) -> None: + def __init__(self, config: Configuration, pipeline_index: int = None) -> None: self.logger = logging.getLogger("Pipeline") self.logger.name = f"Pipeline{pipeline_index}" self._logprep_config = config self._timeout = config.timeout self._continue_iterating = Value(c_bool) - - self._lock = lock self.pipeline_index = pipeline_index - self._encoder = msgspec.msgpack.Encoder() - self._decoder = msgspec.msgpack.Decoder() if self._logprep_config.profile_pipelines: self.run = partial(PipelineProfiler.profile_function, self.run) @@ -205,35 +243,36 @@ def run(self) -> None: # pylint: disable=method-hidden self._continue_iterating.value = True assert self._input, "Pipeline should not be run without input connector" assert self._output, "Pipeline should not be run without output connector" - with self._lock: - with warnings.catch_warnings(): - warnings.simplefilter("default") - self._setup() + with warnings.catch_warnings(): + warnings.simplefilter("default") + self._setup() self.logger.debug("Start iterating") while self._continue_iterating.value: self.process_pipeline() self._shut_down() @_handle_pipeline_error - def process_pipeline(self) -> Tuple[dict, list]: + def process_pipeline(self) -> PipelineResult: """Retrieve next event, process event with full pipeline and store or return results""" Component.run_pending_tasks() - extra_outputs = [] - event = None - try: - event = self._get_event() - except CriticalInputParsingError as error: - input_data = error.raw_input - if isinstance(input_data, bytes): - input_data = input_data.decode("utf8") - error_event = self._encoder.encode({"invalid_json": input_data}) - self._store_failed_event(error, "", error_event) - self.logger.error(f"{error}, event was written to error output") - if event: - extra_outputs = self.process_event(event) - if event and self._output: - self._store_event(event) - return event, extra_outputs + + event = self._get_event() + if not event: + return None, None + result: PipelineResult = self.process_event(event) + if result.warnings: + self.logger.warning(",".join((str(warning) for warning in result.warnings))) + if result.errors: + self.logger.error(",".join((str(error) for error in result.errors))) + self._store_failed_event(result.errors, result.event_received, event) + return + if self._output: + result_data = [res.data for res in result if res.data] + if result_data: + self._store_extra_data(itertools.chain(*result_data)) + if event: + self._store_event(event) + return result def _store_event(self, event: dict) -> None: for output_name, output in self._output.items(): @@ -241,45 +280,37 @@ def _store_event(self, event: dict) -> None: output.store(event) self.logger.debug(f"Stored output in {output_name}") - def _store_failed_event(self, error, event, event_received): + def _store_failed_event(self, error, event_received, event): for _, output in self._output.items(): if output.default: - output.store_failed(str(error), self._decoder.decode(event_received), event) + output.store_failed(str(error), event_received, event) def _get_event(self) -> dict: - event, non_critical_error_msg = self._input.get_next(self._timeout) - if non_critical_error_msg and self._output: - for _, output in self._output.items(): - if output.default: - output.store_failed(non_critical_error_msg, event, None) - return event + try: + event, non_critical_error_msg = self._input.get_next(self._timeout) + if non_critical_error_msg and self._output: + self._store_failed_event(non_critical_error_msg, event, None) + return event + except CriticalInputParsingError as error: + input_data = error.raw_input + if isinstance(input_data, bytes): + input_data = input_data.decode("utf8") + self._store_failed_event(error, {"invalid_json": input_data}, "") @Metric.measure_time() def process_event(self, event: dict): """process all processors for one event""" + result = PipelineResult( + results=[], + event_received=event, + event=event, + pipeline=self._pipeline, + ) + return result - event_received = self._encoder.encode(event) - extra_outputs = [] - for processor in self._pipeline: - try: - if extra_data := processor.process(event): - if self._output: - self._store_extra_data(extra_data) - extra_outputs.append(extra_data) - except ProcessingWarning as error: - self.logger.warning(str(error)) - except ProcessingCriticalError as error: - self.logger.error(str(error)) - if self._output: - self._store_failed_event(error, copy.deepcopy(event), event_received) - event.clear() - if not event: - break - return extra_outputs - - def _store_extra_data(self, extra_data: List[tuple]) -> None: + def _store_extra_data(self, result_data: List | itertools.chain) -> None: self.logger.debug("Storing extra data") - for document, outputs in extra_data: + for document, outputs in result_data: for output in outputs: for output_name, target in output.items(): self._output[output_name].store_custom(document, target) diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index a0cf6181f..bba5ac540 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -64,7 +64,6 @@ def __init__(self, configuration: Configuration): self._pipelines: list[multiprocessing.Process] = [] self._configuration = configuration - self._lock = multiprocessing.Lock() prometheus_config = self._configuration.metrics if prometheus_config.enabled: self.prometheus_exporter = PrometheusExporter(prometheus_config) @@ -164,7 +163,7 @@ def restart(self): self.prometheus_exporter.run() def _create_pipeline(self, index) -> multiprocessing.Process: - pipeline = Pipeline(pipeline_index=index, config=self._configuration, lock=self._lock) + pipeline = Pipeline(pipeline_index=index, config=self._configuration) logger.info("Created new pipeline") process = multiprocessing.Process(target=pipeline.run, daemon=True) process.stop = pipeline.stop diff --git a/logprep/processor/generic_resolver/processor.py b/logprep/processor/generic_resolver/processor.py index 09f4e19a7..aaa1f2dad 100644 --- a/logprep/processor/generic_resolver/processor.py +++ b/logprep/processor/generic_resolver/processor.py @@ -28,18 +28,21 @@ import re from typing import Union -from logprep.processor.base.exceptions import FieldExistsWarning +from logprep.processor.base.exceptions import ( + FieldExistsWarning, + ProcessingCriticalError, +) from logprep.processor.field_manager.processor import FieldManager from logprep.processor.generic_resolver.rule import GenericResolverRule from logprep.util.getter import GetterFactory from logprep.util.helper import add_field_to, get_dotted_field_value -class GenericResolverError(BaseException): +class GenericResolverError(ProcessingCriticalError): """Base class for GenericResolver related exceptions.""" - def __init__(self, name: str, message: str): - super().__init__(f"GenericResolver ({name}): {message}") + def __init__(self, name: str, message: str, rule: GenericResolverRule, event: dict): + super().__init__(f"{name}: {message}", rule=rule, event=event) class GenericResolver(FieldManager): @@ -78,6 +81,8 @@ def _apply_rules(self, event, rule): raise GenericResolverError( self.name, "Mapping group is missing in mapping file pattern!", + rule=rule, + event=event, ) dest_val = replacements.get(mapping) if dest_val: @@ -145,9 +150,13 @@ def ensure_rules_from_file(self, rule): f"Additions file " f'\'{rule.resolve_from_file["path"]}\'' f" must be a dictionary with string values!", + rule=rule, + event=None, ) except FileNotFoundError as error: raise GenericResolverError( self.name, f'Additions file \'{rule.resolve_from_file["path"]}' f"' not found!", + rule=rule, + event=None, ) from error diff --git a/logprep/processor/hyperscan_resolver/processor.py b/logprep/processor/hyperscan_resolver/processor.py index 1c7d13039..d24b84321 100644 --- a/logprep/processor/hyperscan_resolver/processor.py +++ b/logprep/processor/hyperscan_resolver/processor.py @@ -39,6 +39,7 @@ from logprep.processor.base.exceptions import FieldExistsWarning, SkipImportError from logprep.processor.field_manager.processor import FieldManager +from logprep.processor.generic_resolver.processor import GenericResolverError from logprep.util.helper import add_field_to, get_dotted_field_value from logprep.util.validators import directory_validator @@ -56,12 +57,9 @@ # pylint: enable=ungrouped-imports -class HyperscanResolverError(BaseException): +class HyperscanResolverError(GenericResolverError): """Base class for HyperscanResolver related exceptions.""" - def __init__(self, name: str, message: str): - super().__init__(f"HyperscanResolver ({name}): {message}") - class HyperscanResolver(FieldManager): """Resolve values in documents by referencing a mapping list.""" diff --git a/logprep/processor/pre_detector/processor.py b/logprep/processor/pre_detector/processor.py index b322647e3..c44b32322 100644 --- a/logprep/processor/pre_detector/processor.py +++ b/logprep/processor/pre_detector/processor.py @@ -38,7 +38,7 @@ from logprep.abc.processor import Processor from logprep.processor.pre_detector.ip_alerter import IPAlerter from logprep.processor.pre_detector.rule import PreDetectorRule -from logprep.util.helper import get_dotted_field_value, add_field_to +from logprep.util.helper import add_field_to, get_dotted_field_value from logprep.util.time import TimeParser @@ -98,7 +98,7 @@ def _apply_rules(self, event, rule): and not self._ip_alerter.is_in_alerts_list(rule, event) ): self._get_detection_result(event, rule) - for detection, _ in self._extra_data: + for detection, _ in self.result.data: detection["creation_timestamp"] = TimeParser.now().isoformat() timestamp = get_dotted_field_value(event, "@timestamp") if timestamp is not None: @@ -111,7 +111,7 @@ def _get_detection_result(self, event: dict, rule: PreDetectorRule): add_field_to(event, "pre_detection_id", pre_detection_id) detection_result = self._generate_detection_result(pre_detection_id, event, rule) - self._extra_data.append((detection_result, self._config.outputs)) + self.result.data.append((detection_result, self._config.outputs)) @staticmethod def _generate_detection_result(pre_detection_id: str, event: dict, rule: PreDetectorRule): diff --git a/logprep/processor/pseudonymizer/processor.py b/logprep/processor/pseudonymizer/processor.py index d51316425..a7dc90f73 100644 --- a/logprep/processor/pseudonymizer/processor.py +++ b/logprep/processor/pseudonymizer/processor.py @@ -266,7 +266,7 @@ def _apply_rules(self, event: dict, rule: PseudonymizerRule): field_value = self._pseudonymize_field(rule, dotted_field, regex, field_value) _ = add_field_to(event, dotted_field, field_value, overwrite_output_field=True) if "@timestamp" in event: - for pseudonym, _ in self._extra_data: + for pseudonym, _ in self.result.data: pseudonym["@timestamp"] = event["@timestamp"] self._update_cache_metrics() @@ -297,8 +297,8 @@ def _pseudonymize_string(self, value: str) -> str: return value pseudonym_dict = self._get_pseudonym_dict_cached(value) extra = (pseudonym_dict, self._config.outputs) - if extra not in self._extra_data: - self._extra_data.append(extra) + if extra not in self.result.data: + self.result.data.append(extra) return self._wrap_hash(pseudonym_dict["pseudonym"]) def _pseudonymize(self, value): diff --git a/logprep/processor/selective_extractor/processor.py b/logprep/processor/selective_extractor/processor.py index 82cb91d3e..4656b5eb3 100644 --- a/logprep/processor/selective_extractor/processor.py +++ b/logprep/processor/selective_extractor/processor.py @@ -66,4 +66,4 @@ def _apply_rules(self, event: dict, rule: SelectiveExtractorRule): filtered_event = {} for field, content in flattened_fields.items(): add_field_to(filtered_event, field, content) - self._extra_data.append((filtered_event, rule.outputs)) + self.result.data.append((filtered_event, rule.outputs)) diff --git a/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py b/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py index 87b8103a3..6e0692025 100644 --- a/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py +++ b/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py @@ -78,7 +78,7 @@ If one or more test cases fail this tester ends with an exit code of 1, otherwise 0. """ -import io +import itertools # pylint: enable=anomalous-backslash-in-string # pylint: disable=protected-access @@ -93,13 +93,13 @@ from json import JSONDecodeError from pathlib import Path from pprint import pprint -from typing import List +from typing import Dict, List from attr import Factory, define, field, validators from colorama import Fore, Style from deepdiff import DeepDiff, grep -from logprep.framework.pipeline import Pipeline +from logprep.framework.pipeline import Pipeline, PipelineResult from logprep.util.configuration import Configuration from logprep.util.helper import get_dotted_field_value from logprep.util.json_handling import parse_json @@ -107,20 +107,14 @@ logger = logging.getLogger("corpustester") -def align_extra_output_formats(extra_outputs): +def convert_extra_data_format(extra_outputs) -> List[Dict]: """ - Aligns the different output formats into one common format of the selective_extractor, - predetector and pseudonymizer. + Converts the format of the extra data outputs such that it is a list of dicts, where the + output target is the key and the values are the actual outputs. """ reformatted_extra_outputs = [] - for extra_output in extra_outputs: - if isinstance(extra_output, tuple): - documents, target = extra_output - for document in documents: - reformatted_extra_outputs.append({str(target): document}) - else: - for output in extra_output: - reformatted_extra_outputs.append({str(output[1]): output[0]}) + for value, key in extra_outputs: + reformatted_extra_outputs.append({str(key): value}) return reformatted_extra_outputs @@ -148,6 +142,7 @@ class TestCase: generated_extra_output: dict = field(validator=validators.instance_of(list), default=[]) failed: bool = field(validator=validators.instance_of(bool), default=False) report: List = Factory(list) + warnings: str = field(default="") @cached_property def _tmp_dir(self): @@ -191,7 +186,6 @@ def _pipeline(self): def __init__(self, config_paths: tuple[str], input_test_data_path: str): self._original_config_paths = config_paths self._input_test_data_path = input_test_data_path - self.log_capture_string = sys.stdout def run(self): """ @@ -216,10 +210,12 @@ def _run_pipeline_per_test_case(self): print(Style.BRIGHT + "# Test Cases Summary:" + Style.RESET_ALL) for test_case_id, test_case in self._test_cases.items(): _ = [processor.setup() for processor in self._pipeline._pipeline] - parsed_event, extra_outputs = self._pipeline.process_pipeline() - extra_outputs = align_extra_output_formats(extra_outputs) + result: PipelineResult = self._pipeline.process_pipeline() + parsed_event = result.event + extra_outputs = convert_extra_data_format(result.data) test_case.generated_output = parsed_event test_case.generated_extra_output = extra_outputs + test_case.warnings = result.warnings self._compare_logprep_outputs(test_case_id, parsed_event) self._compare_extra_data_output(test_case_id, extra_outputs) self._print_pass_fail_statements(test_case_id) @@ -330,6 +326,8 @@ def _print_pass_fail_statements(self, test_case_id): status = f"{Style.BRIGHT}{Fore.RESET} SKIPPED - (no expected output given)" elif len(test_case.report) > 0: status = f"{Style.BRIGHT}{Fore.RED} FAILED" + elif test_case.warnings: + status = f"{Style.BRIGHT}{Fore.YELLOW} PASSED - (with warnings)" print(f"{Fore.BLUE} Test Case: {Fore.CYAN}{test_case_id} {status}{Style.RESET_ALL}") def _print_test_reports(self): @@ -337,7 +335,7 @@ def _print_test_reports(self): return print(Style.BRIGHT + "# Test Cases Detailed Reports:" + Style.RESET_ALL) for test_case_id, test_case in self._test_cases.items(): - if test_case.report and test_case.expected_output: + if (test_case.warnings or test_case.report) and test_case.expected_output: self._print_long_test_result(test_case_id, test_case) print() @@ -345,6 +343,14 @@ def _print_long_test_result(self, test_case_id, test_case): report_title = f"test report for '{test_case_id}'" print(f"{Fore.RED}{Style.BRIGHT}↓ {report_title} ↓ {Style.RESET_ALL}") print_logprep_output = True + if test_case.warnings and not test_case.report: + print(Fore.GREEN + "Test passed, but with following warnings:" + Fore.RESET) + print(test_case.warnings) + print_logprep_output = False + if test_case.warnings and test_case.report: + print(Fore.RED + "Logprep Warnings:" + Fore.RESET) + for warning in test_case.warnings: + print(warning) for statement in test_case.report: if isinstance(statement, (dict, list)): pprint(statement) diff --git a/logprep/util/auto_rule_tester/auto_rule_tester.py b/logprep/util/auto_rule_tester/auto_rule_tester.py index 5f6b64cb6..99aeaf45e 100644 --- a/logprep/util/auto_rule_tester/auto_rule_tester.py +++ b/logprep/util/auto_rule_tester/auto_rule_tester.py @@ -358,7 +358,7 @@ def _eval_file_rule_test(self, rule_test: dict, processor: "Processor", r_idx: i continue try: - extra_output = processor.process(test["raw"]) + result = processor.process(test["raw"]) except BaseException as error: self._print_error_on_exception(error, rule_test, t_idx) self._success = False @@ -372,7 +372,7 @@ def _eval_file_rule_test(self, rule_test: dict, processor: "Processor", r_idx: i warnings = [] if isinstance(processor, PreDetector): - self._pd_extra.update_errors(processor, extra_output, errors, warnings) + self._pd_extra.update_errors(processor, result.data, errors, warnings) if print_diff or warnings or errors: self._print_filename(rule_test) diff --git a/logprep/util/rule_dry_runner.py b/logprep/util/rule_dry_runner.py index 3bccd50bf..0c6782fd6 100644 --- a/logprep/util/rule_dry_runner.py +++ b/logprep/util/rule_dry_runner.py @@ -45,9 +45,9 @@ from colorama import Back, Fore from ruamel.yaml import YAML -from logprep.framework.pipeline import Pipeline +from logprep.framework.pipeline import Pipeline, PipelineResult from logprep.util.auto_rule_tester.auto_rule_corpus_tester import ( - align_extra_output_formats, + convert_extra_data_format, ) from logprep.util.configuration import Configuration from logprep.util.getter import GetterFactory @@ -103,8 +103,9 @@ def run(self): transformed_cnt = 0 output_count = 0 for input_document in self._input_documents: - test_output, test_output_custom = self._pipeline.process_pipeline() - test_output_custom = align_extra_output_formats(test_output_custom) + result: PipelineResult = self._pipeline.process_pipeline() + test_output = result.event + test_output_custom = convert_extra_data_format(result.data) if test_output: output_count += 1 diff = self._print_output_results(input_document, test_output, test_output_custom) diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 760958ebb..2f30825a6 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -24,9 +24,14 @@ Output, OutputWarning, ) +from logprep.abc.processor import ProcessorResult from logprep.factory import Factory -from logprep.framework.pipeline import Pipeline -from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning +from logprep.framework.pipeline import Pipeline, PipelineResult +from logprep.processor.base.exceptions import ( + FieldExistsWarning, + ProcessingCriticalError, + ProcessingWarning, +) from logprep.processor.deleter.rule import DeleterRule from logprep.util.configuration import Configuration @@ -47,10 +52,22 @@ class ConfigurationForTests: "metrics": {"enabled": False}, } ) - lock = Lock() -@mock.patch("logprep.factory.Factory.create") +def get_mock_create(): + """ + Create a new mock_create magic mock with a default processor result. Is applied for every + test. + """ + mock_create = mock.MagicMock() + mock_component = mock.MagicMock() + mock_component.process = mock.MagicMock() + mock_component.process.return_value = ProcessorResult(processor_name="mock_processor") + mock_create.return_value = mock_component + return mock_create + + +@mock.patch("logprep.factory.Factory.create", new_callable=get_mock_create) class TestPipeline(ConfigurationForTests): def setup_method(self): self._check_failed_stored = None @@ -58,7 +75,6 @@ def setup_method(self): self.pipeline = Pipeline( pipeline_index=1, config=self.logprep_config, - lock=self.lock, ) def test_pipeline_property_returns_pipeline(self, mock_create): @@ -112,7 +128,16 @@ def test_empty_documents_are_not_forwarded_to_other_processors(self, _): {"filter": "delete_me", "deleter": {"delete": True}} ) deleter_processor._specific_tree.add_rule(deleter_rule) - self.pipeline._pipeline = [mock.MagicMock(), deleter_processor, mock.MagicMock()] + processor_with_mock_result = mock.MagicMock() + processor_with_mock_result.process = mock.MagicMock() + processor_with_mock_result.process.return_value = ProcessorResult( + processor_name="processor_with_mock_res" + ) + self.pipeline._pipeline = [ + processor_with_mock_result, + deleter_processor, + deepcopy(processor_with_mock_result), + ] logger = logging.getLogger("Pipeline") logger.setLevel(DEBUG) while self.pipeline._input._documents: @@ -131,7 +156,13 @@ def test_not_empty_documents_are_stored_in_the_output(self, _): assert self.pipeline._store_event.call_count == 1 def test_empty_documents_are_not_stored_in_the_output(self, _): - self.pipeline.process_event = lambda x: x.clear() + def mock_process_event(event): + event.clear() + return PipelineResult( + event=event, event_received=event, results=[], pipeline=self.pipeline._pipeline + ) + + self.pipeline.process_event = mock_process_event self.pipeline._setup() self.pipeline._input.get_next.return_value = ({"message": "test"}, None) self.pipeline._store_event = mock.MagicMock() @@ -214,15 +245,15 @@ def test_processor_warning_error_is_logged_but_processing_continues(self, mock_w self.pipeline._input.get_next.return_value = ({"message": "test"}, None) mock_rule = mock.MagicMock() processing_warning = ProcessingWarning("not so bad", mock_rule, {"message": "test"}) - - def raise_processing_warning(_): - raise processing_warning - - self.pipeline._pipeline[1].process.side_effect = raise_processing_warning + self.pipeline._pipeline[1].process.return_value = ProcessorResult( + processor_name="mock_processor", warnings=[processing_warning] + ) self.pipeline.process_pipeline() self.pipeline._input.get_next.return_value = ({"message": "test"}, None) - self.pipeline.process_pipeline() - mock_warning.assert_called_with(str(processing_warning)) + result = self.pipeline.process_pipeline() + assert processing_warning in result.results[0].warnings + mock_warning.assert_called() + assert "ProcessingWarning: not so bad" in mock_warning.call_args[0][0] assert self.pipeline._output["dummy"].store.call_count == 2, "all events are processed" @mock.patch("logging.Logger.error") @@ -234,39 +265,50 @@ def test_processor_critical_error_is_logged_event_is_stored_in_error_output( input_event2 = {"message": "second event"} self.pipeline._input.get_next.return_value = (input_event1, None) mock_rule = mock.MagicMock() - - def raise_critical_processing_error(event): - raise ProcessingCriticalError("really bad things happened", mock_rule, event) - - self.pipeline._pipeline[1].process.side_effect = raise_critical_processing_error + self.pipeline._pipeline[1].process.return_value = ProcessorResult( + processor_name="", + errors=[ProcessingCriticalError("really bad things happened", mock_rule, input_event1)], + ) self.pipeline.process_pipeline() self.pipeline._input.get_next.return_value = (input_event2, None) + self.pipeline._pipeline[1].process.return_value = ProcessorResult( + processor_name="", + errors=[ProcessingCriticalError("really bad things happened", mock_rule, input_event2)], + ) + self.pipeline.process_pipeline() assert self.pipeline._input.get_next.call_count == 2, "2 events gone into processing" - assert mock_error.call_count == 2, "two errors occurred" - - logger_calls = ( - mock.call( - str( - ProcessingCriticalError( - "really bad things happened", mock_rule, {"message": "first event"} - ) - ) - ), - mock.call( - str( - ProcessingCriticalError( - "really bad things happened", mock_rule, {"message": "second event"} - ) - ) - ), - ) - mock_error.assert_has_calls(logger_calls) + assert mock_error.call_count == 2, f"two errors occurred: {mock_error.mock_calls}" + assert "ProcessingCriticalError: really bad things happened" in mock_error.call_args[0][0] assert self.pipeline._output["dummy"].store.call_count == 0, "no event in output" assert ( self.pipeline._output["dummy"].store_failed.call_count == 2 ), "errored events are gone to connector error output handler" + @mock.patch("logging.Logger.error") + @mock.patch("logging.Logger.warning") + def test_processor_logs_processing_error_and_warnings_separately( + self, mock_warning, mock_error, _ + ): + self.pipeline._setup() + input_event1 = {"message": "first event"} + self.pipeline._input.get_next.return_value = (input_event1, None) + mock_rule = mock.MagicMock() + self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) + warning = FieldExistsWarning(mock_rule, input_event1, ["foo"]) + self.pipeline._pipeline[0].process.return_value = ProcessorResult( + processor_name="", warnings=[warning] + ) + error = ProcessingCriticalError("really bad things happened", mock_rule, input_event1) + self.pipeline._pipeline[1].process.return_value = ProcessorResult( + processor_name="", errors=[error] + ) + self.pipeline.process_pipeline() + assert mock_error.call_count == 1, f"one error occurred: {mock_error.mock_calls}" + assert mock_warning.call_count == 1, f"one warning occurred: {mock_warning.mock_calls}" + mock_error.assert_called_with(str(error)) + mock_warning.assert_called_with(str(warning)) + @mock.patch("logging.Logger.error") def test_critical_input_error_is_logged_error_is_stored_in_failed_events(self, mock_error, _): self.pipeline._setup() @@ -383,10 +425,11 @@ def test_processor_fatal_output_error_in_setup_is_logged(self, mock_log_error, _ def test_extra_data_tuple_is_passed_to_store_custom(self, _): self.pipeline._setup() self.pipeline._input.get_next.return_value = ({"some": "event"}, None) - processor_with_extra_data = mock.MagicMock() - processor_with_extra_data.process = mock.MagicMock() - processor_with_extra_data.process.return_value = [({"foo": "bar"}, ({"dummy": "target"},))] - self.pipeline._pipeline = [mock.MagicMock(), processor_with_extra_data, mock.MagicMock()] + self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) + self.pipeline._pipeline[1].process.return_value = ProcessorResult( + processor_name="", data=[({"foo": "bar"}, ({"dummy": "target"},))] + ) + self.pipeline._pipeline.append(deepcopy(self.pipeline._pipeline[0])) self.pipeline.process_pipeline() assert self.pipeline._input.get_next.call_count == 1 assert self.pipeline._output["dummy"].store_custom.call_count == 1 @@ -395,16 +438,18 @@ def test_extra_data_tuple_is_passed_to_store_custom(self, _): def test_store_custom_calls_all_defined_outputs(self, _): self.pipeline._output.update({"dummy1": mock.MagicMock()}) self.pipeline._setup() + self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) + self.pipeline._pipeline[1].process.return_value = ProcessorResult( + processor_name="", + data=[ + ( + {"foo": "bar"}, + ({"dummy": "target"}, {"dummy1": "second_target"}), + ) + ], + ) + self.pipeline._pipeline.append(deepcopy(self.pipeline._pipeline[0])) self.pipeline._input.get_next.return_value = ({"some": "event"}, None) - processor_with_extra_data = mock.MagicMock() - processor_with_extra_data.process = mock.MagicMock() - processor_with_extra_data.process.return_value = [ - ( - {"foo": "bar"}, - ({"dummy": "target"}, {"dummy1": "second_target"}), - ) - ] - self.pipeline._pipeline = [mock.MagicMock(), processor_with_extra_data, mock.MagicMock()] self.pipeline.process_pipeline() assert self.pipeline._input.get_next.call_count == 1 assert self.pipeline._output["dummy"].store_custom.call_count == 1 @@ -416,11 +461,12 @@ def test_store_custom_calls_all_defined_outputs(self, _): def test_extra_data_list_is_passed_to_store_custom(self, _): self.pipeline._setup() + self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) + self.pipeline._pipeline[1].process.return_value = ProcessorResult( + processor_name="", data=[({"foo": "bar"}, ({"dummy": "target"},))] + ) + self.pipeline._pipeline.append(deepcopy(self.pipeline._pipeline[0])) self.pipeline._input.get_next.return_value = ({"some": "event"}, None) - processor_with_extra_data = mock.MagicMock() - processor_with_extra_data.process = mock.MagicMock() - processor_with_extra_data.process.return_value = [({"foo": "bar"}, ({"dummy": "target"},))] - self.pipeline._pipeline = [mock.MagicMock(), processor_with_extra_data, mock.MagicMock()] self.pipeline.process_pipeline() assert self.pipeline._input.get_next.call_count == 1 assert self.pipeline._output["dummy"].store_custom.call_count == 1 @@ -558,6 +604,27 @@ def test_shutdown_logs_fatal_errors(self, mock_error, _): logger_call = f"Couldn't gracefully shut down pipeline due to: {error}" mock_error.assert_called_with(logger_call) + def test_pipeline_result_provides_event_received(self, _): + self.pipeline._setup() + event = {"some": "event"} + self.pipeline._input.get_next.return_value = (event, None) + generic_adder = original_create( + { + "generic_adder": { + "type": "generic_adder", + "specific_rules": [ + {"filter": "some", "generic_adder": {"add": {"field": "foo"}}} + ], + "generic_rules": [], + } + } + ) + self.pipeline._pipeline = [generic_adder] + result = self.pipeline.process_pipeline() + assert result.event_received is not event, "event_received is a copy" + assert result.event_received == {"some": "event"}, "received event is as expected" + assert result.event == {"some": "event", "field": "foo"}, "processed event is as expected" + class TestPipelineWithActualInput: def setup_method(self): @@ -576,10 +643,9 @@ def test_pipeline_without_output_connector_and_one_input_event_and_preprocessors self.config.input["test_input"]["documents"] = [{"applyrule": "yes"}] pipeline = Pipeline(config=self.config) assert pipeline._output is None - event, extra_outputs = pipeline.process_pipeline() - assert event["label"] == {"reporter": ["windows"]} - assert "arrival_time" in event - assert extra_outputs == [] + result = pipeline.process_pipeline() + assert result.event["label"] == {"reporter": ["windows"]} + assert "arrival_time" in result.event def test_process_event_processes_without_input_and_without_output(self): event = {"applyrule": "yes"} @@ -599,14 +665,12 @@ def test_pipeline_without_output_connector_and_two_input_events_and_preprocessor self.config.input["test_input"]["documents"] = input_events pipeline = Pipeline(config=self.config) assert pipeline._output is None - event, extra_outputs = pipeline.process_pipeline() - assert event["label"] == {"reporter": ["windows"]} - assert "arrival_time" in event - assert extra_outputs == [] - event, extra_outputs = pipeline.process_pipeline() - assert "pseudonym" in event.get("winlog", {}).get("event_data", {}).get("IpAddress") - assert "arrival_time" in event - assert len(extra_outputs) == 1 + result = pipeline.process_pipeline() + assert result.event["label"] == {"reporter": ["windows"]} + assert "arrival_time" in result.event + result = pipeline.process_pipeline() + assert "pseudonym" in result.event.get("winlog", {}).get("event_data", {}).get("IpAddress") + assert "arrival_time" in result.event def test_pipeline_hmac_error_message_without_output_connector(self): self.config.input["test_input"]["documents"] = [{"applyrule": "yes"}] @@ -615,8 +679,8 @@ def test_pipeline_hmac_error_message_without_output_connector(self): } pipeline = Pipeline(config=self.config) assert pipeline._output is None - event, _ = pipeline.process_pipeline() - assert event["hmac"]["hmac"] == "error" + result = pipeline.process_pipeline() + assert result.event["hmac"]["hmac"] == "error" def test_pipeline_run_raises_assertion_when_run_without_input(self): self.config.input = {} diff --git a/tests/unit/processor/amides/test_amides.py b/tests/unit/processor/amides/test_amides.py index 4438a3c29..4254cf71c 100644 --- a/tests/unit/processor/amides/test_amides.py +++ b/tests/unit/processor/amides/test_amides.py @@ -1,7 +1,6 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access import hashlib -import logging import re from copy import deepcopy from multiprocessing import current_process @@ -157,7 +156,7 @@ def test_classification_results_from_cache(self): # end strange mock assert self.object.metrics.cached_results == 1 - def test_process_event_raise_duplication_error(self, caplog): + def test_process_event_raise_duplication_error(self): self.object.setup() document = { "winlog": { @@ -168,10 +167,12 @@ def test_process_event_raise_duplication_error(self, caplog): } self.object.process(document) assert document.get("amides") - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(r".*missing source_fields: \['process.command_line'].*", caplog.messages[0]) - assert re.match(".*FieldExistsWarning.*", caplog.messages[1]) + result = self.object.process(document) + assert len(result.warnings) > 0 + assert re.match( + r".*missing source_fields: \['process.command_line'].*", str(result.warnings) + ) + assert re.match(".*FieldExistsWarning.*", str(result.warnings)) def test_setup_get_model_via_file_getter(self, tmp_path, monkeypatch): model_uri = "file://tests/testdata/unit/amides/model.zip" diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index 6fb4ea71b..d0adc908b 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -15,10 +15,12 @@ from attrs import asdict from ruamel.yaml import YAML -from logprep.abc.processor import Processor +from logprep.abc.processor import Processor, ProcessorResult from logprep.factory import Factory from logprep.framework.rule_tree.rule_tree import RuleTree from logprep.metrics.metrics import CounterMetric, HistogramMetric +from logprep.processor.base.exceptions import ProcessingCriticalError +from logprep.processor.base.rule import Rule from logprep.util.json_handling import list_json_files_in_directory from tests.unit.component.base import BaseComponentTestCase @@ -75,10 +77,12 @@ def set_rules(rules_dirs): rules.append(rule) return rules - def _load_specific_rule(self, rule): + def _load_specific_rule(self, rule: dict | Rule): self.object._generic_tree = RuleTree() self.object._specific_tree = RuleTree() - specific_rule = self.object.rule_class._create_from_dict(rule) + specific_rule = ( + self.object.rule_class._create_from_dict(rule) if isinstance(rule, dict) else rule + ) self.object._specific_tree.add_rule(specific_rule, self.logger) def setup_method(self) -> None: @@ -95,6 +99,28 @@ def setup_method(self) -> None: self.object = Factory.create(configuration=config) self.specific_rules = self.set_rules(self.specific_rules_dirs) self.generic_rules = self.set_rules(self.generic_rules_dirs) + self.match_all_event = { + "message": "event", + "winlog": { + "event_id": 1, + "provider_name": "Microsoft-Windows-Sysmon", + "event_data": {"IpAddress": "127.0.0.54"}, + }, + "field1": "foo", + "field2": "bar", + "another_random_field": "baz", + "@timestamp": "2021-01-01T00:00:00.000Z", + "delete_event": "does not matter", + "irrelevant": "does not matter", + "url": "http://example.local", + "drop_me": "does not matter", + "add_generic_test": "does not matter", + "anything": "does not matter", + "client": {"ip": "127.0.0.54"}, + "ips": ["127.0.0.54", "192.168.4.33"], + "applyrule": "yes", + "A": "foobarfoo", + } # this is an event that can be used in all processor tests, cause it matches everywhere def teardown_method(self) -> None: """teardown for all methods""" @@ -285,3 +311,26 @@ def test_no_metrics_with_same_name(self): pairs = itertools.combinations(metric_attributes.values(), 2) for metric1, metric2 in pairs: assert metric1.name != metric2.name, f"{metric1.name} == {metric2.name}" + + def test_process_return_result_object(self): + event = {"some": "event"} + result = self.object.process(event) + assert isinstance(result, ProcessorResult) + assert isinstance(result.data, list) + assert isinstance(result.errors, list) + assert result.processor_name == "Test Instance Name" + + def test_process_collects_errors_in_result_object(self): + with mock.patch.object( + self.object, + "_apply_rules", + side_effect=ProcessingCriticalError( + "side effect", rule=self.object.rules[0], event=self.match_all_event + ), + ): + result = self.object.process(self.match_all_event) + assert len(result.errors) > 0, "minimum one error should be in result object" + + def test_result_object_has_reference_to_event(self): + result = self.object.process(self.match_all_event) + assert result.event is self.match_all_event diff --git a/tests/unit/processor/calculator/test_calculator.py b/tests/unit/processor/calculator/test_calculator.py index 40acbeedf..be49fac5e 100644 --- a/tests/unit/processor/calculator/test_calculator.py +++ b/tests/unit/processor/calculator/test_calculator.py @@ -1,5 +1,4 @@ # pylint: disable=missing-docstring -import logging import math import re @@ -352,13 +351,12 @@ def test_testcases(self, testcase, rule, event, expected): # pylint: disable=un assert event == expected @pytest.mark.parametrize("testcase, rule, event, expected, error_message", failure_test_cases) - def test_testcases_failure_handling( - self, caplog, testcase, rule, event, expected, error_message - ): + def test_testcases_failure_handling(self, testcase, rule, event, expected, error_message): self._load_specific_rule(rule) - with caplog.at_level(logging.WARNING): - self.object.process(event) - assert re.match(rf".*{error_message}", caplog.text) + + result = self.object.process(event) + assert len(result.warnings) == 1 + assert re.match(rf".*{error_message}", str(result.warnings[0])) assert event == expected, testcase @pytest.mark.parametrize( diff --git a/tests/unit/processor/concatenator/test_concatenator.py b/tests/unit/processor/concatenator/test_concatenator.py index 3e6ad9979..e29f6a8e5 100644 --- a/tests/unit/processor/concatenator/test_concatenator.py +++ b/tests/unit/processor/concatenator/test_concatenator.py @@ -1,10 +1,9 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access -import logging -import re import pytest +from logprep.processor.base.exceptions import FieldExistsWarning from tests.unit.processor.base import BaseProcessorTestCase @@ -171,8 +170,8 @@ def test_for_expected_output(self, test_case, rule, document, expected_output): self.object.process(document) assert document == expected_output, test_case - def test_process_raises_duplication_error_if_target_field_exists_and_should_not_be_overwritten( - self, caplog + def test_process_raises_field_exists_warning_if_target_field_exists_and_should_not_be_overwritten( + self, ): rule = { "filter": "field.a", @@ -186,9 +185,9 @@ def test_process_raises_duplication_error_if_target_field_exists_and_should_not_ } self._load_specific_rule(rule) document = {"field": {"a": "first", "b": "second"}, "target_field": "has already content"} - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert "target_field" in document assert document.get("target_field") == "has already content" assert document.get("tags") == ["_concatenator_failure"] diff --git a/tests/unit/processor/datetime_extractor/test_datetime_extractor.py b/tests/unit/processor/datetime_extractor/test_datetime_extractor.py index db1024aa9..2521194eb 100644 --- a/tests/unit/processor/datetime_extractor/test_datetime_extractor.py +++ b/tests/unit/processor/datetime_extractor/test_datetime_extractor.py @@ -2,11 +2,10 @@ # pylint: disable=missing-module-docstring # pylint: disable=wrong-import-position # pylint: disable=wrong-import-order -import logging -import re from dateutil.tz import tzutc +from logprep.processor.base.exceptions import FieldExistsWarning from logprep.processor.datetime_extractor.processor import DatetimeExtractor from tests.unit.processor.base import BaseProcessorTestCase @@ -169,7 +168,7 @@ def test_overwrite_target(self): } assert document == expected - def test_existing_target_raises_if_not_overwrite_target(self, caplog): + def test_existing_target_raises_if_not_overwrite_target(self): document = {"@timestamp": "2019-07-30T14:37:42.861+00:00", "winlog": {"event_id": 123}} rule = { "filter": "@timestamp", @@ -181,9 +180,9 @@ def test_existing_target_raises_if_not_overwrite_target(self, caplog): "description": "", } self._load_specific_rule(rule) - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) @staticmethod def _parse_local_tz(tz_local_name): diff --git a/tests/unit/processor/dissector/test_dissector.py b/tests/unit/processor/dissector/test_dissector.py index fb088d554..92c41fd96 100644 --- a/tests/unit/processor/dissector/test_dissector.py +++ b/tests/unit/processor/dissector/test_dissector.py @@ -1,9 +1,8 @@ # pylint: disable=missing-docstring -import logging -import re import pytest +from logprep.processor.base.exceptions import ProcessingWarning from tests.unit.processor.base import BaseProcessorTestCase test_cases = [ # testcase, rule, event, expected @@ -729,9 +728,9 @@ def test_testcases(self, testcase, rule, event, expected): # pylint: disable=un assert event == expected @pytest.mark.parametrize("testcase, rule, event, expected", failure_test_cases) - def test_testcases_failure_handling(self, caplog, testcase, rule, event, expected): + def test_testcases_failure_handling(self, testcase, rule, event, expected): self._load_specific_rule(rule) - with caplog.at_level(logging.WARNING): - self.object.process(event) - assert re.match(".*ProcessingWarning.*", caplog.text) + result = self.object.process(event) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], ProcessingWarning) assert event == expected, testcase diff --git a/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py b/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py index fa2c66cb9..fc0dcc308 100644 --- a/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py +++ b/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py @@ -2,9 +2,7 @@ # pylint: disable=missing-docstring import hashlib -import logging import os -import re import shutil import tempfile from pathlib import Path @@ -12,6 +10,7 @@ import responses from logprep.factory import Factory +from logprep.processor.base.exceptions import FieldExistsWarning from tests.unit.processor.base import BaseProcessorTestCase @@ -243,12 +242,11 @@ def test_domain_extraction_with_ipv6_target(self): self.object.process(document) assert document == expected_output - def test_domain_extraction_with_existing_output_field(self, caplog): + def test_domain_extraction_with_existing_output_field(self): document = {"url": {"domain": "test.domain.de", "subdomain": "exists already"}} - - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) def test_domain_extraction_overwrites_target_field(self): document = {"url": {"domain": "test.domain.de", "subdomain": "exists already"}} @@ -316,7 +314,7 @@ def test_does_nothing_if_source_field_not_exits(self): self.object.process(document) assert document == expected - def test_raises_duplication_error_if_target_field_exits(self, caplog): + def test_raises_field_exists_warning_if_target_field_exits(self): document = {"url": {"domain": "test.domain.de", "subdomain": "exists already"}} expected = { "tags": ["_domain_label_extractor_failure"], @@ -337,9 +335,9 @@ def test_raises_duplication_error_if_target_field_exits(self, caplog): "description": "", } self._load_specific_rule(rule_dict) - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected @responses.activate diff --git a/tests/unit/processor/domain_resolver/test_domain_resolver.py b/tests/unit/processor/domain_resolver/test_domain_resolver.py index bf8962d6e..07e208d6d 100644 --- a/tests/unit/processor/domain_resolver/test_domain_resolver.py +++ b/tests/unit/processor/domain_resolver/test_domain_resolver.py @@ -1,9 +1,7 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access import hashlib -import logging import os -import re import shutil import tempfile from copy import deepcopy @@ -15,7 +13,7 @@ import responses from logprep.factory import Factory -from logprep.processor.base.exceptions import ProcessingWarning +from logprep.processor.base.exceptions import FieldExistsWarning, ProcessingWarning from tests.unit.processor.base import BaseProcessorTestCase REL_TLD_LIST_PATH = "tests/testdata/external/public_suffix_list.dat" @@ -230,12 +228,12 @@ def test_configured_dotted_subfield(self, _): assert document == expected @mock.patch("socket.gethostbyname", return_value="1.2.3.4") - def test_duplication_error(self, _, caplog): + def test_field_exits_warning(self, _): document = {"client": "google.de"} - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) @mock.patch("socket.gethostbyname", return_value="1.2.3.4") def test_no_duplication_error(self, _): diff --git a/tests/unit/processor/field_manager/test_field_manager.py b/tests/unit/processor/field_manager/test_field_manager.py index 907466198..5a71fb849 100644 --- a/tests/unit/processor/field_manager/test_field_manager.py +++ b/tests/unit/processor/field_manager/test_field_manager.py @@ -4,6 +4,7 @@ import pytest +from logprep.processor.base.exceptions import FieldExistsWarning from tests.unit.processor.base import BaseProcessorTestCase test_cases = [ # testcase, rule, event, expected @@ -587,15 +588,15 @@ def test_testcases(self, testcase, rule, event, expected): # pylint: disable=un assert event == expected @pytest.mark.parametrize("testcase, rule, event, expected, error", failure_test_cases) - def test_testcases_failure_handling(self, testcase, rule, event, expected, error, caplog): + def test_testcases_failure_handling(self, testcase, rule, event, expected, error): self._load_specific_rule(rule) - with caplog.at_level(logging.WARNING): - self.object.process(event) - assert re.match(error, caplog.text) + result = self.object.process(event) + assert len(result.warnings) == 1 + assert re.match(error, str(result.warnings[0])) assert event == expected, testcase - def test_process_raises_duplication_error_if_target_field_exists_and_should_not_be_overwritten( - self, caplog + def test_process_raises_field_exists_warning_if_target_field_exists_and_should_not_be_overwritten( + self, ): rule = { "filter": "field.a", @@ -608,14 +609,13 @@ def test_process_raises_duplication_error_if_target_field_exists_and_should_not_ } self._load_specific_rule(rule) document = {"field": {"a": "first", "b": "second"}, "target_field": "has already content"} - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert isinstance(result.warnings[0], FieldExistsWarning) assert "target_field" in document assert document.get("target_field") == "has already content" assert document.get("tags") == ["_field_manager_failure"] - def test_process_raises_processing_warning_with_missing_fields(self, caplog): + def test_process_raises_processing_warning_with_missing_fields(self): rule = { "filter": "field.a", "field_manager": { @@ -625,15 +625,14 @@ def test_process_raises_processing_warning_with_missing_fields(self, caplog): } self._load_specific_rule(rule) document = {"field": {"a": "first", "b": "second"}} - with caplog.at_level(logging.WARNING): - self.object.process(document) + result = self.object.process(document) + assert len(result.warnings) == 1 assert re.match( - r".*ProcessingWarning.*missing source_fields: \['does.not.exists'\]", caplog.text + r".*ProcessingWarning.*missing source_fields: \['does.not.exists'\]", + str(result.warnings[0]), ) - def test_process_raises_processing_warning_with_missing_fields_but_event_is_processed( - self, caplog - ): + def test_process_raises_processing_warning_with_missing_fields_but_event_is_processed(self): rule = { "filter": "field.a", "field_manager": { @@ -650,10 +649,11 @@ def test_process_raises_processing_warning_with_missing_fields_but_event_is_proc "target_field": "first", "tags": ["_field_manager_missing_field_warning"], } - with caplog.at_level(logging.WARNING): - self.object.process(document) + result = self.object.process(document) + assert len(result.warnings) == 1 assert re.match( - r".*ProcessingWarning.*missing source_fields: \['does.not.exists'\]", caplog.text + r".*ProcessingWarning.*missing source_fields: \['does.not.exists'\]", + str(result.warnings[0]), ) assert document == expected diff --git a/tests/unit/processor/generic_adder/test_generic_adder.py b/tests/unit/processor/generic_adder/test_generic_adder.py index 649f62195..5fe29930b 100644 --- a/tests/unit/processor/generic_adder/test_generic_adder.py +++ b/tests/unit/processor/generic_adder/test_generic_adder.py @@ -3,7 +3,6 @@ # pylint: disable=unused-argument # pylint: disable=too-many-arguments import json -import logging import os import re import tempfile @@ -15,7 +14,10 @@ from logprep.factory import Factory from logprep.factory_error import InvalidConfigurationError -from logprep.processor.base.exceptions import InvalidRuleDefinitionError +from logprep.processor.base.exceptions import ( + FieldExistsWarning, + InvalidRuleDefinitionError, +) from tests.unit.processor.base import BaseProcessorTestCase RULES_DIR_MISSING = "tests/testdata/unit/generic_adder/rules_missing" @@ -403,12 +405,12 @@ def test_generic_adder_testcases( @pytest.mark.parametrize("testcase, rule, event, expected, error_message", failure_test_cases) def test_generic_adder_testcases_failure_handling( - self, testcase, rule, event, expected, error_message, caplog + self, testcase, rule, event, expected, error_message ): self._load_specific_rule(rule) - with caplog.at_level(logging.WARNING): - self.object.process(event) - assert re.match(rf".*FieldExistsWarning.*{error_message}", caplog.text) + result = self.object.process(event) + assert len(result.warnings) == 1 + assert re.match(rf".*FieldExistsWarning.*{error_message}", str(result.warnings[0])) assert event == expected, testcase def test_add_generic_fields_from_file_missing_and_existing_with_all_required(self): @@ -608,9 +610,9 @@ def test_sql_database_raises_exception_on_duplicate(self, caplog): document = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} self.object.process(document) - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected diff --git a/tests/unit/processor/generic_resolver/test_generic_resolver.py b/tests/unit/processor/generic_resolver/test_generic_resolver.py index f59dfee66..de9237741 100644 --- a/tests/unit/processor/generic_resolver/test_generic_resolver.py +++ b/tests/unit/processor/generic_resolver/test_generic_resolver.py @@ -2,13 +2,12 @@ # pylint: disable=protected-access # pylint: disable=missing-docstring # pylint: disable=wrong-import-position -import logging -import re from collections import OrderedDict -import pytest - -from logprep.processor.base.exceptions import ProcessingCriticalError +from logprep.processor.base.exceptions import ( + FieldExistsWarning, + ProcessingCriticalError, +) from logprep.processor.generic_resolver.processor import GenericResolver from tests.unit.processor.base import BaseProcessorTestCase @@ -294,13 +293,9 @@ def test_resolve_dotted_field_no_conflict_match_from_file_group_mapping_does_not } self._load_specific_rule(rule) document = {"to_resolve": "ab"} - - with pytest.raises( - ProcessingCriticalError, - match=r"GenericResolver \(Test Instance Name\)\: Mapping group is missing in mapping " - r"file pattern!", - ): - self.object.process(document) + result = self.object.process(document) + assert isinstance(result.errors[0], ProcessingCriticalError) + assert "Mapping group is missing in mapping" in result.errors[0].args[0] def test_resolve_generic_match_from_file_and_file_does_not_exist(self): rule = { @@ -311,14 +306,10 @@ def test_resolve_generic_match_from_file_and_file_does_not_exist(self): }, } self._load_specific_rule(rule) - document = {"to": {"resolve": "something HELLO1"}} - - with pytest.raises( - ProcessingCriticalError, - match=r"GenericResolver \(Test Instance Name\)\: Additions file \'foo\' not found!", - ): - self.object.process(document) + result = self.object.process(document) + assert isinstance(result.errors[0], ProcessingCriticalError) + assert "Additions file 'foo' not found" in result.errors[0].args[0] def test_resolve_dotted_field_no_conflict_no_match(self): rule = { @@ -419,7 +410,6 @@ def test_resolve_dotted_src_and_dest_field_and_conflict_match(self, caplog): }, } self._load_specific_rule(rule) - document = { "to": {"resolve": "something HELLO1"}, "re": {"solved": "I already exist!"}, @@ -429,10 +419,9 @@ def test_resolve_dotted_src_and_dest_field_and_conflict_match(self, caplog): "to": {"resolve": "something HELLO1"}, "re": {"solved": "I already exist!"}, } - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) - + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_resolve_generic_and_multiple_match_first_only(self): diff --git a/tests/unit/processor/geoip_enricher/test_geoip_enricher.py b/tests/unit/processor/geoip_enricher/test_geoip_enricher.py index 68e1c8178..5d95772db 100644 --- a/tests/unit/processor/geoip_enricher/test_geoip_enricher.py +++ b/tests/unit/processor/geoip_enricher/test_geoip_enricher.py @@ -3,7 +3,6 @@ # pylint: disable=protected-access # pylint: disable=too-many-statements import hashlib -import logging import os import re import shutil @@ -117,17 +116,18 @@ def test_geoip_data_added_not_exists(self): def test_no_geoip_data_added_if_source_field_is_none(self): document = {"client": {"ip": None}} - self.object.process(document) - assert document.get("geoip") is None - def test_source_field_is_none_emits_missing_fields_warning(self, caplog): + def test_source_field_is_none_emits_missing_fields_warning(self): document = {"client": {"ip": None}} expected = {"client": {"ip": None}, "tags": ["_geoip_enricher_missing_field_warning"]} - with caplog.at_level(logging.WARNING): - self.object._apply_rules(document, self.object.rules[0]) - assert re.match(r".*missing source_fields: \['client\.ip'].*", caplog.text) + self._load_specific_rule(self.object.rules[0]) + self.object.process(document) + assert len(self.object.result.warnings) == 1 + assert re.match( + r".*missing source_fields: \['client\.ip'].*", str(self.object.result.warnings[0]) + ) assert document == expected def test_nothing_to_enrich(self): @@ -161,12 +161,11 @@ def test_enrich_an_event_geoip(self): assert geoip["properties"].get("country") == "MyCountry" assert geoip["properties"].get("accuracy_radius") == 1337 - def test_enrich_an_event_geoip_with_existing_differing_geoip(self, caplog): + def test_enrich_an_event_geoip_with_existing_differing_geoip(self): document = {"client": {"ip": "8.8.8.8"}, "geoip": {"type": "Feature"}} - - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*geoip.type", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert re.match(".*FieldExistsWarning.*geoip.type", str(result.warnings[0])) def test_configured_dotted_output_field(self): document = {"source": {"ip": "8.8.8.8"}} diff --git a/tests/unit/processor/grokker/test_grokker.py b/tests/unit/processor/grokker/test_grokker.py index 73e717690..dc092a27b 100644 --- a/tests/unit/processor/grokker/test_grokker.py +++ b/tests/unit/processor/grokker/test_grokker.py @@ -1,7 +1,6 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access # pylint: disable=line-too-long -import logging import re from copy import deepcopy from unittest import mock @@ -429,17 +428,17 @@ def test_testcases(self, testcase, rule, event, expected): assert event == expected, testcase @pytest.mark.parametrize("testcase, rule, event, expected, error", failure_test_cases) - def test_testcases_failure_handling(self, caplog, testcase, rule, event, expected, error): + def test_testcases_failure_handling(self, testcase, rule, event, expected, error): self._load_specific_rule(rule) self.object.setup() if isinstance(error, str): - with caplog.at_level(logging.WARNING): - self.object.process(event) - assert re.match(rf".*{error}", caplog.text) - assert event == expected, testcase + result = self.object.process(event) + assert len(result.warnings) == 1 + assert re.match(rf".*{error}", str(result.warnings[0])) + assert event == expected, testcase else: - with pytest.raises(error): - self.object.process(event) + result = self.object.process(event) + assert isinstance(result.errors[0], ProcessingCriticalError) def test_load_custom_patterns_from_http_as_zip_file(self): rule = { diff --git a/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py b/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py index 2b38b5cd8..f644c7cf5 100644 --- a/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py +++ b/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py @@ -2,14 +2,15 @@ # pylint: disable=missing-docstring # pylint: disable=wrong-import-position # pylint: disable=wrong-import-order -import logging -import re from collections import OrderedDict from copy import deepcopy import pytest -from logprep.processor.base.exceptions import ProcessingCriticalError +from logprep.processor.base.exceptions import ( + FieldExistsWarning, + ProcessingCriticalError, +) pytest.importorskip("hyperscan") @@ -21,9 +22,7 @@ pytest.importorskip("logprep.processor.hyperscan_resolver") -from logprep.processor.hyperscan_resolver.processor import ( - HyperscanResolver, -) +from logprep.processor.hyperscan_resolver.processor import HyperscanResolver class TestHyperscanResolverProcessor(BaseProcessorTestCase): @@ -363,7 +362,7 @@ def test_resolve_dotted_and_dest_field_no_conflict_match(self): assert document == expected - def test_resolve_dotted_and_dest_field_with_conflict_match(self, caplog): + def test_resolve_dotted_and_dest_field_with_conflict_match(self): rule = { "filter": "to.resolve", "hyperscan_resolver": { @@ -371,20 +370,16 @@ def test_resolve_dotted_and_dest_field_with_conflict_match(self, caplog): "resolve_list": {".*HELLO\\d": "Greeting"}, }, } - self._load_specific_rule(rule) - document = {"to": {"resolve": "something HELLO1"}, "re": {"solved": "I already exist!"}} expected = { "to": {"resolve": "something HELLO1"}, "re": {"solved": "I already exist!"}, "tags": ["_hyperscan_resolver_failure"], } - - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) - + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_resolve_with_multiple_match_first_only(self): @@ -641,15 +636,10 @@ def test_resolve_no_matching_pattern(self): "extend_target_list": True, }, } - self._load_specific_rule(rule) - document = {"to_resolve": "12ab34"} - - with pytest.raises( - ProcessingCriticalError, match=r"No patter to compile for hyperscan database!" - ): - self.object.process(document) + result = self.object.process(document) + assert isinstance(result.errors[0], ProcessingCriticalError) def test_resolve_no_conflict_from_file_and_list_has_conflict( self, diff --git a/tests/unit/processor/ip_informer/test_ip_informer.py b/tests/unit/processor/ip_informer/test_ip_informer.py index ecff57fe4..0ef5e25b1 100644 --- a/tests/unit/processor/ip_informer/test_ip_informer.py +++ b/tests/unit/processor/ip_informer/test_ip_informer.py @@ -1,10 +1,9 @@ # pylint: disable=missing-docstring # pylint: disable=line-too-long -import logging -import re import pytest +from logprep.processor.base.exceptions import ProcessingWarning from tests.unit.processor.base import BaseProcessorTestCase test_cases = [ @@ -423,9 +422,9 @@ def test_testcases(self, testcase, rule, event, expected): assert event == expected, testcase @pytest.mark.parametrize("testcase, rule, event, expected", failure_test_cases) - def test_testcases_failure_handling(self, testcase, rule, event, expected, caplog): + def test_testcases_failure_handling(self, testcase, rule, event, expected): self._load_specific_rule(rule) - with caplog.at_level(logging.WARNING): - self.object.process(event) - assert re.match(".*ProcessingWarning.*", caplog.text) + result = self.object.process(event) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], ProcessingWarning) assert event == expected, testcase diff --git a/tests/unit/processor/key_checker/test_key_checker.py b/tests/unit/processor/key_checker/test_key_checker.py index a316fd9e9..0391c3e82 100644 --- a/tests/unit/processor/key_checker/test_key_checker.py +++ b/tests/unit/processor/key_checker/test_key_checker.py @@ -1,11 +1,10 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access # pylint: disable=import-error -import logging -import re import pytest +from logprep.processor.base.exceptions import FieldExistsWarning from tests.unit.processor.base import BaseProcessorTestCase test_cases = [ # testcase, rule, event, expected @@ -256,7 +255,7 @@ def test_testcases_positiv( self.object.process(event) assert event == expected - def test_raises_duplication_error(self, caplog): + def test_field_exists_warning(self): rule_dict = { "filter": "*", "key_checker": { @@ -273,6 +272,6 @@ def test_raises_duplication_error(self, caplog): "randomkey2": "randomvalue2", "missing_fields": ["i.exists.already"], } - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) diff --git a/tests/unit/processor/list_comparison/test_list_comparison.py b/tests/unit/processor/list_comparison/test_list_comparison.py index 9601421de..385227ea7 100644 --- a/tests/unit/processor/list_comparison/test_list_comparison.py +++ b/tests/unit/processor/list_comparison/test_list_comparison.py @@ -1,11 +1,10 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access -import logging -import re import responses from logprep.factory import Factory +from logprep.processor.base.exceptions import FieldExistsWarning from tests.unit.processor.base import BaseProcessorTestCase @@ -139,7 +138,7 @@ def test_dotted_parent_field_exists_but_subfield_doesnt(self): len(document.get("dotted", {}).get("preexistent_output_field", {}).get("in_list")) == 1 ) - def test_target_field_exists_and_cant_be_extended(self, caplog): + def test_target_field_exists_and_cant_be_extended(self): document = {"dot_channel": "test", "user": "Franz", "dotted": "dotted_Franz"} expected = { "dot_channel": "test", @@ -159,12 +158,12 @@ def test_target_field_exists_and_cant_be_extended(self, caplog): } self._load_specific_rule(rule_dict) self.object.setup() - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected - def test_intermediate_output_field_is_wrong_type(self, caplog): + def test_intermediate_output_field_is_wrong_type(self): document = { "dot_channel": "test", "user": "Franz", @@ -188,9 +187,9 @@ def test_intermediate_output_field_is_wrong_type(self, caplog): } self._load_specific_rule(rule_dict) self.object.setup() - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_check_in_dotted_subfield(self): @@ -229,7 +228,7 @@ def test_delete_source_field(self): self.object.process(document) assert document == expected - def test_overwrite_target_field(self, caplog): + def test_overwrite_target_field(self): document = {"user": "Franz"} expected = {"user": "Franz", "tags": ["_list_comparison_failure"]} rule_dict = { @@ -244,9 +243,9 @@ def test_overwrite_target_field(self, caplog): } self._load_specific_rule(rule_dict) self.object.setup() - with caplog.at_level(logging.WARNING): - self.object.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = self.object.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected @responses.activate diff --git a/tests/unit/processor/pre_detector/test_pre_detector.py b/tests/unit/processor/pre_detector/test_pre_detector.py index d3129cb3c..53da723cd 100644 --- a/tests/unit/processor/pre_detector/test_pre_detector.py +++ b/tests/unit/processor/pre_detector/test_pre_detector.py @@ -36,7 +36,7 @@ def test_perform_successful_pre_detection(self): ] detection_results = self.object.process(document) self._assert_equality_of_results( - document, expected, detection_results, expected_detection_results + document, expected, detection_results.data, expected_detection_results ) def test_perform_pre_detection_that_fails_if_filter_children_were_slots(self): @@ -58,12 +58,12 @@ def test_perform_pre_detection_that_fails_if_filter_children_were_slots(self): ] detection_results = self.object.process(document) self._assert_equality_of_results( - document, expected, detection_results, expected_detection_results + document, expected, detection_results.data, expected_detection_results ) document = {"A": "foo X bar Y baz"} detection_results = self.object.process(document) - assert detection_results is None + assert detection_results.data == [] def test_perform_successful_pre_detection_with_host_name(self): document = { @@ -88,7 +88,7 @@ def test_perform_successful_pre_detection_with_host_name(self): ] detection_results = self.object.process(document) self._assert_equality_of_results( - document, expected, detection_results, expected_detection_results + document, expected, detection_results.data, expected_detection_results ) def test_perform_successful_pre_detection_with_same_existing_pre_detection(self): @@ -112,7 +112,7 @@ def test_perform_successful_pre_detection_with_same_existing_pre_detection(self) document["pre_detection_id"] = "11fdfc1f-8e00-476e-b88f-753d92af989c" detection_results = self.object.process(document) self._assert_equality_of_results( - document, expected, detection_results, expected_detection_results + document, expected, detection_results.data, expected_detection_results ) def test_perform_successful_pre_detection_with_pre_detector_complex_rule_suceeds_msg_t1(self): @@ -135,7 +135,7 @@ def test_perform_successful_pre_detection_with_pre_detector_complex_rule_suceeds ] detection_results = self.object.process(document) self._assert_equality_of_results( - document, expected, detection_results, expected_detection_results + document, expected, detection_results.data, expected_detection_results ) def test_perform_successful_pre_detection_with_pre_detector_complex_rule_succeeds_msg_t2(self): @@ -158,7 +158,7 @@ def test_perform_successful_pre_detection_with_pre_detector_complex_rule_succeed ] detection_results = self.object.process(document) self._assert_equality_of_results( - document, expected, detection_results, expected_detection_results + document, expected, detection_results.data, expected_detection_results ) def test_perform_successful_pre_detection_with_two_rules(self): @@ -192,7 +192,7 @@ def test_perform_successful_pre_detection_with_two_rules(self): ] detection_results = self.object.process(document) self._assert_equality_of_results( - document, expected, detection_results, expected_detection_results + document, expected, detection_results.data, expected_detection_results ) def test_correct_star_wildcard_behavior(self): @@ -276,7 +276,7 @@ def test_ignores_case(self): ] detection_results = self.object.process(document) self._assert_equality_of_results( - document, expected, detection_results, expected_detection_results + document, expected, detection_results.data, expected_detection_results ) def test_ignores_case_list(self): @@ -298,7 +298,7 @@ def test_ignores_case_list(self): ] detection_results = self.object.process(document) self._assert_equality_of_results( - document, expected, detection_results, expected_detection_results + document, expected, detection_results.data, expected_detection_results ) def _assert_equality_of_results( @@ -333,4 +333,4 @@ def test_adds_timestamp_to_extra_data_if_provided_by_event(self): "winlog": {"event_id": 123, "event_data": {"ServiceName": "VERY BAD"}}, } detection_results = self.object.process(document) - assert detection_results[0][0].get("@timestamp") == "custom timestamp" + assert detection_results.data[0][0].get("@timestamp") == "custom timestamp" diff --git a/tests/unit/processor/pseudonymizer/test_pseudonymizer.py b/tests/unit/processor/pseudonymizer/test_pseudonymizer.py index 3ca0562c8..215ed622c 100644 --- a/tests/unit/processor/pseudonymizer/test_pseudonymizer.py +++ b/tests/unit/processor/pseudonymizer/test_pseudonymizer.py @@ -6,9 +6,11 @@ import re from copy import deepcopy from pathlib import Path +from unittest import mock import pytest +from logprep.abc.processor import ProcessorResult from logprep.factory import Factory from logprep.factory_error import InvalidConfigurationError from logprep.util.pseudo.encrypter import ( @@ -813,8 +815,9 @@ def test_replace_regex_keywords_by_regex_expression_is_idempotent(self): assert self.object._specific_tree.rules[0].pseudonyms == {"something": expected_pattern} def test_pseudonymize_string_adds_pseudonyms(self): + self.object.result = ProcessorResult(processor_name="test") assert self.object._pseudonymize_string("foo").startswith(" 0 - assert isinstance(tuple_list[0], tuple) + assert isinstance(tuple_list, ProcessorResult) + assert len(tuple_list.data) > 0 def test_process_returns_tuple_list_with_extraction_fields_from_rule(self): field_name = f"{uuid.uuid4()}" @@ -43,7 +43,7 @@ def test_process_returns_tuple_list_with_extraction_fields_from_rule(self): self.object._specific_tree.add_rule(rule) document = {field_name: "the value"} tuple_list = self.object.process(document) - for filtered_event, _ in tuple_list: + for filtered_event, _ in tuple_list.data: if field_name in filtered_event: break else: @@ -61,7 +61,7 @@ def test_process_returns_selective_extractor_target_topic(self): self._load_specific_rule(rule) document = {field_name: "test_message", "other": "field"} result = self.object.process(document) - output = result[0][1][0] + output = result.data[0][1][0] assert "my topic" in output.values() def test_process_returns_selective_extractor_target_output(self): @@ -76,7 +76,7 @@ def test_process_returns_selective_extractor_target_output(self): self._load_specific_rule(rule) document = {field_name: "test_message", "other": "field"} result = self.object.process(document) - output = result[0][1][0] + output = result.data[0][1][0] assert "opensearch" in output.keys() def test_process_returns_extracted_fields(self): @@ -90,7 +90,7 @@ def test_process_returns_extracted_fields(self): } self._load_specific_rule(rule) result = self.object.process(document) - for filtered_event, *_ in result: + for filtered_event, *_ in result.data: if filtered_event == {"message": "test_message"}: break else: @@ -99,7 +99,10 @@ def test_process_returns_extracted_fields(self): def test_process_returns_none_when_no_extraction_field_matches(self): document = {"nomessage": "test_message", "other": "field"} result = self.object.process(document) - assert result is None + assert isinstance(result, ProcessorResult) + assert result.data == [] + assert result.errors == [] + assert result.processor_name == "Test Instance Name" def test_gets_matching_rules_from_rules_trees(self): rule_trees = [self.object._generic_tree, self.object._specific_tree] @@ -128,19 +131,18 @@ def test_process_extracts_dotted_fields(self): document = {"message": "test_message", "other": {"message": "my message value"}} result = self.object.process(document) - for extracted_event, *_ in result: + for extracted_event, *_ in result.data: if extracted_event.get("other", {}).get("message") is not None: break else: assert False, f"other.message not in {result}" def test_process_clears_internal_filtered_events_list_before_every_event(self): - assert len(self.object._extra_data) == 0 document = {"message": "test_message", "other": {"message": "my message value"}} _ = self.object.process(document) - assert len(self.object._extra_data) == 1 + assert len(self.object.result.data) == 1 _ = self.object.process(document) - assert len(self.object._extra_data) == 1 + assert len(self.object.result.data) == 1 def test_process_extracts_dotted_fields_complains_on_missing_fields(self): rule = { diff --git a/tests/unit/processor/string_splitter/test_string_splitter.py b/tests/unit/processor/string_splitter/test_string_splitter.py index d09b127f4..3db351526 100644 --- a/tests/unit/processor/string_splitter/test_string_splitter.py +++ b/tests/unit/processor/string_splitter/test_string_splitter.py @@ -1,5 +1,4 @@ # pylint: disable=missing-docstring -import logging import re import pytest @@ -69,11 +68,9 @@ def test_testcases(self, testcase, rule, event, expected): # pylint: disable=un assert event == expected @pytest.mark.parametrize("testcase, rule, event, expected, error_message", failure_test_cases) - def test_testcases_failure_handling( - self, testcase, rule, event, expected, error_message, caplog - ): + def test_testcases_failure_handling(self, testcase, rule, event, expected, error_message): self._load_specific_rule(rule) - with caplog.at_level(logging.WARNING): - self.object.process(event) - assert re.match(error_message, caplog.text) + result = self.object.process(event) + assert len(result.warnings) == 1 + assert re.match(error_message, str(result.warnings[0])) assert event == expected, testcase diff --git a/tests/unit/processor/template_replacer/test_template_replacer.py b/tests/unit/processor/template_replacer/test_template_replacer.py index 468c79ade..2eca9f0fd 100644 --- a/tests/unit/processor/template_replacer/test_template_replacer.py +++ b/tests/unit/processor/template_replacer/test_template_replacer.py @@ -1,11 +1,10 @@ # pylint: disable=missing-module-docstring -import logging -import re from copy import deepcopy import pytest from logprep.factory import Factory +from logprep.processor.base.exceptions import FieldExistsWarning from logprep.processor.template_replacer.processor import TemplateReplacerError from tests.unit.processor.base import BaseProcessorTestCase @@ -140,7 +139,7 @@ def test_replace_existing_dotted_message_dict_via_template(self): assert document["dotted"].get("message") assert document["dotted"]["message"] == "Test %1 Test %2" - def test_replace_incompatible_existing_dotted_message_parent_via_template(self, caplog): + def test_replace_incompatible_existing_dotted_message_parent_via_template(self): config = deepcopy(self.CONFIG) config.get("pattern").update({"target_field": "dotted.message"}) template_replacer = self._create_template_replacer(config) @@ -148,10 +147,9 @@ def test_replace_incompatible_existing_dotted_message_parent_via_template(self, "winlog": {"channel": "System", "provider_name": "Test", "event_id": 123}, "dotted": "foo", } - - with caplog.at_level(logging.WARNING): - template_replacer.process(document) - assert re.match(".*FieldExistsWarning.*", caplog.text) + result = template_replacer.process(document) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) def test_replace_fails_with_invalid_template(self): config = deepcopy(self.CONFIG) diff --git a/tests/unit/processor/test_process.py b/tests/unit/processor/test_process.py index 392bd9883..5d985c99b 100644 --- a/tests/unit/processor/test_process.py +++ b/tests/unit/processor/test_process.py @@ -1,6 +1,6 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access -from logging import getLogger +import re from unittest import mock from unittest.mock import call @@ -8,6 +8,7 @@ from logprep.factory import Factory from logprep.framework.pipeline import Pipeline +from logprep.processor.base.exceptions import FieldExistsWarning from logprep.processor.dissector.rule import DissectorRule from logprep.processor.generic_adder.rule import GenericAdderRule from logprep.util.configuration import Configuration @@ -121,8 +122,7 @@ def test_applies_rules_in_deterministic_order(self, execution_number): processor.process(event=event) mock_callback.assert_has_calls(expected_call_order, any_order=False) - @mock.patch("logging.Logger.warning") - def test_processes_generic_rules_after_processor_error_in_specific_rules(self, mock_warning): + def test_processes_generic_rules_after_processor_error_in_specific_rules(self): config = Configuration() config.pipeline = [ {"adder": {"type": "generic_adder", "specific_rules": [], "generic_rules": []}} @@ -157,10 +157,13 @@ def test_processes_generic_rules_after_processor_error_in_specific_rules(self, m pipeline._pipeline[0]._generic_tree.add_rule(generic_rule) pipeline._pipeline[0]._specific_tree.add_rule(specific_rule_two) pipeline._pipeline[0]._specific_tree.add_rule(specific_rule_one) - pipeline.process_event(event) - assert ( + res = pipeline.process_event(event) + assert len(res.results[0].warnings) == 1 + assert isinstance(res.results[0].warnings[0], FieldExistsWarning) + re.match( "The following fields could not be written, " - "because one or more subfields existed and could not be extended: first" - in mock_warning.call_args[0][0] + "because one or more subfields existed and could not be extended: first", + str(res.results[0].warnings[0]), ) + assert event == expected_event diff --git a/tests/unit/processor/timestamp_differ/test_timestamp_differ.py b/tests/unit/processor/timestamp_differ/test_timestamp_differ.py index a36b8b5d2..dd082562a 100644 --- a/tests/unit/processor/timestamp_differ/test_timestamp_differ.py +++ b/tests/unit/processor/timestamp_differ/test_timestamp_differ.py @@ -1,5 +1,4 @@ # pylint: disable=missing-docstring -import logging import re import pytest @@ -447,11 +446,9 @@ def test_testcases(self, testcase, rule, event, expected): assert event == expected, testcase @pytest.mark.parametrize("testcase, rule, event, expected, error_message", failure_test_cases) - def test_testcases_failure_handling( - self, testcase, rule, event, expected, error_message, caplog - ): + def test_testcases_failure_handling(self, testcase, rule, event, expected, error_message): self._load_specific_rule(rule) - with caplog.at_level(logging.WARNING): - self.object.process(event) - assert re.match(error_message, caplog.text) + result = self.object.process(event) + assert len(result.warnings) == 1 + assert re.match(error_message, str(result.warnings[0])) assert event == expected, testcase diff --git a/tests/unit/processor/timestamper/test_timestamper.py b/tests/unit/processor/timestamper/test_timestamper.py index 210b67aa3..228d5ffc9 100644 --- a/tests/unit/processor/timestamper/test_timestamper.py +++ b/tests/unit/processor/timestamper/test_timestamper.py @@ -1,5 +1,4 @@ # pylint: disable=missing-docstring -import logging import re import pytest @@ -318,11 +317,9 @@ def test_testcases(self, testcase, rule, event, expected): assert event == expected, testcase @pytest.mark.parametrize("testcase, rule, event, expected, error_message", failure_test_cases) - def test_testcases_failure_handling( - self, caplog, testcase, rule, event, expected, error_message - ): + def test_testcases_failure_handling(self, testcase, rule, event, expected, error_message): self._load_specific_rule(rule) - with caplog.at_level(logging.WARNING): - self.object.process(event) - assert re.match(rf".*{error_message}", caplog.text) + result = self.object.process(event) + assert len(result.warnings) == 1 + assert re.match(rf".*{error_message}", str(result.warnings[0])) assert event == expected, testcase diff --git a/tests/unit/util/test_auto_rule_corpus_tester.py b/tests/unit/util/test_auto_rule_corpus_tester.py index 8c0403c33..b0576678d 100644 --- a/tests/unit/util/test_auto_rule_corpus_tester.py +++ b/tests/unit/util/test_auto_rule_corpus_tester.py @@ -10,8 +10,9 @@ import pytest +from logprep.abc.processor import ProcessorResult +from logprep.framework.pipeline import PipelineResult from logprep.util.auto_rule_tester.auto_rule_corpus_tester import RuleCorpusTester -from logprep.util.configuration import Configuration from logprep.util.defaults import DEFAULT_LOG_CONFIG from logprep.util.getter import GetterFactory @@ -42,7 +43,6 @@ def prepare_corpus_tester(corpus_tester, tmp_path, test_data): class TestAutoRuleTester: - def setup_method(self): dictConfig(DEFAULT_LOG_CONFIG) @@ -320,7 +320,15 @@ def test_run_prints_expected_outputs_to_console( with mock.patch( "logprep.util.auto_rule_tester.auto_rule_corpus_tester.Pipeline.process_pipeline" ) as mock_process_pipeline: - mock_process_pipeline.return_value = mock_output + mock_process_pipeline.return_value = PipelineResult( + results=[], + event=mock_output[0], + event_received=mock_output[0], + pipeline=[], + ) + mock_process_pipeline.return_value.results = [ + ProcessorResult(processor_name="test", data=test_data["expected_extra_output"]) + ] corpus_tester.run() else: corpus_tester.run() @@ -470,3 +478,34 @@ def test_corpus_tests_dont_share_cache_between_runs_by_resetting_processors( for expected_print in expected_prints: assert expected_print in console_output mock_exit.assert_called_with(0) + + @mock.patch("logprep.util.auto_rule_tester.auto_rule_corpus_tester.sys.exit") + def test_warnings_are_printed_inside_the_detailed_reports(self, mock_exit, tmp_path, capsys): + test_case_data = { + "input": { + "field1": 2, + "field2": 2, + "new_field": "exists already", + }, + "expected_output": { + "field1": 2, + "field2": 2, + "new_field": "exists already", + }, + "expected_extra_output": [], + } + test_data_dir = tmp_path / "test_data" + os.makedirs(test_data_dir, exist_ok=True) + write_test_case_data_tmp_files(test_data_dir, "test_case_one", test_case_data) + config_path = ["tests/testdata/config/config.yml"] + corpus_tester = RuleCorpusTester(config_path, test_data_dir) + corpus_tester.run() + console_output, console_error = capsys.readouterr() + assert console_error == "" + warnings_inside_details_pattern = ( + r".*Test Cases Detailed Reports.*test_case_one.*" + r"Logprep Warnings.*FieldExistsWarning.*test_case_one.*" + r"Test Overview" + ) + assert re.match(warnings_inside_details_pattern, console_output, flags=re.DOTALL) + mock_exit.assert_called_with(1)