Skip to content

Commit

Permalink
refactor pipeline_result
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Jul 10, 2024
1 parent 9267242 commit 5cfb156
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 53 deletions.
31 changes: 12 additions & 19 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,25 @@ class ProcessorResult:
""" The generated extra data """
errors: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of((ProcessingError, ProcessingWarning)),
member_validator=validators.instance_of(ProcessingError),
iterable_validator=validators.instance_of(list),
),
factory=list,
)
""" The errors and warnings that occurred during processing """
""" The errors that occurred during processing """
warnings: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of(ProcessingWarning),
iterable_validator=validators.instance_of(list),
),
factory=list,
)
""" The warnings that occurred during processing """
processor_name: str = field(validator=validators.instance_of(str))
""" The name of the processor """
event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None)
""" A reference to the event that was processed """

def __contains__(self, error_class):
return any(isinstance(item, error_class) for item in self.errors)

def get_warning_string(self):
"""creates a string containing the warnings"""
return ", ".join(
[error.args[0] for error in self.errors if isinstance(error, ProcessingWarning)]
)

def get_error_string(self):
"""creates a string containing the errors"""
return ", ".join(
[error.args[0] for error in self.errors if isinstance(error, ProcessingError)]
)


class Processor(Component):
"""Abstract Processor Class to define the Interface"""
Expand Down Expand Up @@ -331,9 +324,9 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None):
else:
add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags})))
if isinstance(error, ProcessingWarning):
self.result.errors.append(error)
self.result.warnings.append(error)
else:
self.result.errors.append(ProcessingWarning(str(error), rule, event))
self.result.warnings.append(ProcessingWarning(str(error), rule, event))

def _has_missing_values(self, event, rule, source_field_dict):
missing_fields = list(
Expand Down
37 changes: 26 additions & 11 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,26 @@ class PipelineResult:
pipeline: list[Processor]
"""The pipeline that processed the event"""

@property
def has_processing_errors(self) -> bool:
"""Check if any of the results has processing errors."""
return any(result.errors for result in self)

@property
def has_processing_warnings(self) -> bool:
"""Check if any of the results has processing errors."""
return any(result.warnings for result in self)

@property
def errors(self) -> List[ProcessingError]:
"""Return all processing errors."""
return itertools.chain(*[result.errors for result in self])

@property
def warnings(self) -> List[ProcessingWarning]:
"""Return all processing warnings."""
return itertools.chain(*[result.warnings for result in self])

def __attrs_post_init__(self):
self.results = list(
(processor.process(self.event) for processor in self.pipeline if self.event)
Expand Down Expand Up @@ -256,17 +276,12 @@ def process_pipeline(self) -> PipelineResult:
if not event:
return None, None
result: PipelineResult = self.process_event(event)
for processor_result in result:
if not processor_result.errors:
continue
if ProcessingWarning in processor_result:
self.logger.warning(processor_result.get_warning_string())
if ProcessingError in processor_result:
self.logger.error(processor_result.get_error_string())
if self._output:
self._store_failed_event(processor_result.errors, result.event_received, event)
# pipeline is aborted on processing error
return
if result.has_processing_warnings:
self.logger.warning(",".join((str(warning) for warning in result.warnings)))
if result.has_processing_errors:
self.logger.error(",".join((str(error) for error in result.errors)))
self._store_failed_event(result.errors, result.event_received, event)
return
if self._output:
result_data = [res.data for res in result if res.data]
if result_data:
Expand Down
32 changes: 9 additions & 23 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
)
from logprep.abc.processor import ProcessorResult
from logprep.factory import Factory
from logprep.framework.pipeline import Pipeline
from logprep.framework.pipeline import Pipeline, PipelineResult
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
Expand Down Expand Up @@ -160,7 +160,9 @@ def test_not_empty_documents_are_stored_in_the_output(self, _):
def test_empty_documents_are_not_stored_in_the_output(self, _):
def mock_process_event(event):
event.clear()
return [ProcessorResult(processor_name="")]
return PipelineResult(
event=event, event_received=event, results=[], pipeline=self.pipeline._pipeline
)

self.pipeline.process_event = mock_process_event
self.pipeline._setup()
Expand Down Expand Up @@ -246,13 +248,14 @@ def test_processor_warning_error_is_logged_but_processing_continues(self, mock_w
mock_rule = mock.MagicMock()
processing_warning = ProcessingWarning("not so bad", mock_rule, {"message": "test"})
self.pipeline._pipeline[1].process.return_value = ProcessorResult(
processor_name="mock_processor", errors=[processing_warning]
processor_name="mock_processor", warnings=[processing_warning]
)
self.pipeline.process_pipeline()
self.pipeline._input.get_next.return_value = ({"message": "test"}, None)
result = self.pipeline.process_pipeline()
assert processing_warning in result.results[0].errors
assert processing_warning in result.results[0].warnings
mock_warning.assert_called()
assert "ProcessingWarning: not so bad" in mock_warning.call_args[0][0]
assert self.pipeline._output["dummy"].store.call_count == 2, "all events are processed"

@mock.patch("logging.Logger.error")
Expand All @@ -278,24 +281,7 @@ def test_processor_critical_error_is_logged_event_is_stored_in_error_output(
self.pipeline.process_pipeline()
assert self.pipeline._input.get_next.call_count == 2, "2 events gone into processing"
assert mock_error.call_count == 2, f"two errors occurred: {mock_error.mock_calls}"

logger_calls = (
mock.call(
str(
ProcessingCriticalError(
"really bad things happened", mock_rule, {"message": "first event"}
)
)
),
mock.call(
str(
ProcessingCriticalError(
"really bad things happened", mock_rule, {"message": "second event"}
)
)
),
)
mock_error.assert_has_calls(logger_calls)
assert "ProcessingCriticalError: really bad things happened" in mock_error.call_args[0][0]
assert self.pipeline._output["dummy"].store.call_count == 0, "no event in output"
assert (
self.pipeline._output["dummy"].store_failed.call_count == 2
Expand All @@ -313,7 +299,7 @@ def test_processor_logs_processing_error_and_warnings_separately(
self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0])
warning = FieldExistsWarning(mock_rule, input_event1, ["foo"])
self.pipeline._pipeline[0].process.return_value = ProcessorResult(
processor_name="", errors=[warning]
processor_name="", warnings=[warning]
)
error = ProcessingCriticalError("really bad things happened", mock_rule, input_event1)
self.pipeline._pipeline[1].process.return_value = ProcessorResult(
Expand Down

0 comments on commit 5cfb156

Please sign in to comment.