diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index ade4f5447..fde55fe6e 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -42,32 +42,25 @@ class ProcessorResult: """ The generated extra data """ errors: list = field( validator=validators.deep_iterable( - member_validator=validators.instance_of((ProcessingError, ProcessingWarning)), + member_validator=validators.instance_of(ProcessingError), iterable_validator=validators.instance_of(list), ), factory=list, ) - """ The errors and warnings that occurred during processing """ + """ The errors that occurred during processing """ + warnings: list = field( + validator=validators.deep_iterable( + member_validator=validators.instance_of(ProcessingWarning), + iterable_validator=validators.instance_of(list), + ), + factory=list, + ) + """ The warnings that occurred during processing """ processor_name: str = field(validator=validators.instance_of(str)) """ The name of the processor """ event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None) """ A reference to the event that was processed """ - def __contains__(self, error_class): - return any(isinstance(item, error_class) for item in self.errors) - - def get_warning_string(self): - """creates a string containing the warnings""" - return ", ".join( - [error.args[0] for error in self.errors if isinstance(error, ProcessingWarning)] - ) - - def get_error_string(self): - """creates a string containing the errors""" - return ", ".join( - [error.args[0] for error in self.errors if isinstance(error, ProcessingError)] - ) - class Processor(Component): """Abstract Processor Class to define the Interface""" @@ -331,9 +324,9 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None): else: add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags}))) if isinstance(error, ProcessingWarning): - self.result.errors.append(error) + self.result.warnings.append(error) else: - self.result.errors.append(ProcessingWarning(str(error), rule, event)) + self.result.warnings.append(ProcessingWarning(str(error), rule, event)) def _has_missing_values(self, event, rule, source_field_dict): missing_fields = list( diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index cb235979d..e911f450e 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -69,6 +69,26 @@ class PipelineResult: pipeline: list[Processor] """The pipeline that processed the event""" + @property + def has_processing_errors(self) -> bool: + """Check if any of the results has processing errors.""" + return any(result.errors for result in self) + + @property + def has_processing_warnings(self) -> bool: + """Check if any of the results has processing errors.""" + return any(result.warnings for result in self) + + @property + def errors(self) -> List[ProcessingError]: + """Return all processing errors.""" + return itertools.chain(*[result.errors for result in self]) + + @property + def warnings(self) -> List[ProcessingWarning]: + """Return all processing warnings.""" + return itertools.chain(*[result.warnings for result in self]) + def __attrs_post_init__(self): self.results = list( (processor.process(self.event) for processor in self.pipeline if self.event) @@ -256,17 +276,12 @@ def process_pipeline(self) -> PipelineResult: if not event: return None, None result: PipelineResult = self.process_event(event) - for processor_result in result: - if not processor_result.errors: - continue - if ProcessingWarning in processor_result: - self.logger.warning(processor_result.get_warning_string()) - if ProcessingError in processor_result: - self.logger.error(processor_result.get_error_string()) - if self._output: - self._store_failed_event(processor_result.errors, result.event_received, event) - # pipeline is aborted on processing error - return + if result.has_processing_warnings: + self.logger.warning(",".join((str(warning) for warning in result.warnings))) + if result.has_processing_errors: + self.logger.error(",".join((str(error) for error in result.errors))) + self._store_failed_event(result.errors, result.event_received, event) + return if self._output: result_data = [res.data for res in result if res.data] if result_data: diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 213dbe289..10a836872 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -26,7 +26,7 @@ ) from logprep.abc.processor import ProcessorResult from logprep.factory import Factory -from logprep.framework.pipeline import Pipeline +from logprep.framework.pipeline import Pipeline, PipelineResult from logprep.processor.base.exceptions import ( FieldExistsWarning, ProcessingCriticalError, @@ -160,7 +160,9 @@ def test_not_empty_documents_are_stored_in_the_output(self, _): def test_empty_documents_are_not_stored_in_the_output(self, _): def mock_process_event(event): event.clear() - return [ProcessorResult(processor_name="")] + return PipelineResult( + event=event, event_received=event, results=[], pipeline=self.pipeline._pipeline + ) self.pipeline.process_event = mock_process_event self.pipeline._setup() @@ -246,13 +248,14 @@ def test_processor_warning_error_is_logged_but_processing_continues(self, mock_w mock_rule = mock.MagicMock() processing_warning = ProcessingWarning("not so bad", mock_rule, {"message": "test"}) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - processor_name="mock_processor", errors=[processing_warning] + processor_name="mock_processor", warnings=[processing_warning] ) self.pipeline.process_pipeline() self.pipeline._input.get_next.return_value = ({"message": "test"}, None) result = self.pipeline.process_pipeline() - assert processing_warning in result.results[0].errors + assert processing_warning in result.results[0].warnings mock_warning.assert_called() + assert "ProcessingWarning: not so bad" in mock_warning.call_args[0][0] assert self.pipeline._output["dummy"].store.call_count == 2, "all events are processed" @mock.patch("logging.Logger.error") @@ -278,24 +281,7 @@ def test_processor_critical_error_is_logged_event_is_stored_in_error_output( self.pipeline.process_pipeline() assert self.pipeline._input.get_next.call_count == 2, "2 events gone into processing" assert mock_error.call_count == 2, f"two errors occurred: {mock_error.mock_calls}" - - logger_calls = ( - mock.call( - str( - ProcessingCriticalError( - "really bad things happened", mock_rule, {"message": "first event"} - ) - ) - ), - mock.call( - str( - ProcessingCriticalError( - "really bad things happened", mock_rule, {"message": "second event"} - ) - ) - ), - ) - mock_error.assert_has_calls(logger_calls) + assert "ProcessingCriticalError: really bad things happened" in mock_error.call_args[0][0] assert self.pipeline._output["dummy"].store.call_count == 0, "no event in output" assert ( self.pipeline._output["dummy"].store_failed.call_count == 2 @@ -313,7 +299,7 @@ def test_processor_logs_processing_error_and_warnings_separately( self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) warning = FieldExistsWarning(mock_rule, input_event1, ["foo"]) self.pipeline._pipeline[0].process.return_value = ProcessorResult( - processor_name="", errors=[warning] + processor_name="", warnings=[warning] ) error = ProcessingCriticalError("really bad things happened", mock_rule, input_event1) self.pipeline._pipeline[1].process.return_value = ProcessorResult(