Skip to content

Commit

Permalink
implement review remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
dtrai2 committed Jul 2, 2024
1 parent 2b99597 commit 713df6e
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 16 deletions.
1 change: 0 additions & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ jobs:
run: |
pip install --upgrade pip wheel
pip install .[dev]
pip list
- name: Perform ${{ matrix.test-type }} test
env:
PYTEST_ADDOPTS: "--color=yes"
Expand Down
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
### Features
### Improvements

* a result object was added which is returned by every processor
* includes the processor name, generated extra_data, warnings and errors
* a result object was added to processors and pipelines
* each processor returns an object including the processor name, generated extra_data, warnings
and errors
* the pipeline returns an object with the list of all processor result objects

### Bugfix

Expand Down
5 changes: 4 additions & 1 deletion logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@

@define(kw_only=True)
class ProcessorResult:
"""Result object to be returned by every processor. It contains all extra_data and errors."""
"""
Result object to be returned by every processor. It contains the processor name, created data
and errors (incl. warnings).
"""

name: str = field(validator=validators.instance_of(str))
data: list = field(validator=validators.instance_of(list), factory=list)
Expand Down
13 changes: 6 additions & 7 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from functools import cached_property, partial
from importlib.metadata import version
from multiprocessing import Lock, Value, current_process
from typing import Any, List, Tuple
from typing import Any, List, Tuple, Optional

import attrs
import msgspec
Expand Down Expand Up @@ -49,7 +49,7 @@
@attrs.define(kw_only=True)
class PipelineResult:
"""Result object to be returned after processing the event.
It contains all generated data and includes errors and warnings."""
It contains all results of each processor of the pipeline."""

results: List[ProcessorResult] = attrs.field(
validator=[
Expand Down Expand Up @@ -234,14 +234,14 @@ def run(self) -> None: # pylint: disable=method-hidden
self._shut_down()

@_handle_pipeline_error
def process_pipeline(self) -> Tuple[dict, list]:
def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]:
"""Retrieve next event, process event with full pipeline and store or return results"""
Component.run_pending_tasks()

event = self._get_event()
event_received = copy.deepcopy(event)
if not event:
return None, None
event_received = copy.deepcopy(event)
result: PipelineResult = self.process_event(event)
for processor_result in result:
if not processor_result.errors:
Expand All @@ -255,8 +255,7 @@ def process_pipeline(self) -> Tuple[dict, list]:
# pipeline is aborted on processing error
return event, result
if self._output:
result_data = [res.data for res in result if res.data]
result_data = itertools.chain(*result_data)
result_data = itertools.chain(*[res.data for res in result if res.data])
if result_data:
self._store_extra_data(result_data)
if event:
Expand Down Expand Up @@ -299,7 +298,7 @@ def process_event(self, event: dict):
break
return PipelineResult(results=results)

def _store_extra_data(self, result_data: List) -> None:
def _store_extra_data(self, result_data: List | itertools.chain) -> None:
self.logger.debug("Storing extra data")
for document, outputs in result_data:
for output in outputs:
Expand Down
9 changes: 4 additions & 5 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# pylint: disable=attribute-defined-outside-init
import logging
import multiprocessing
import re
from copy import deepcopy
from logging import DEBUG
from multiprocessing import Lock
Expand All @@ -27,7 +26,7 @@
)
from logprep.abc.processor import ProcessorResult
from logprep.factory import Factory
from logprep.framework.pipeline import Pipeline, PipelineResult
from logprep.framework.pipeline import Pipeline
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
Expand Down Expand Up @@ -250,8 +249,8 @@ def test_processor_warning_error_is_logged_but_processing_continues(self, _):

self.pipeline.process_pipeline()
self.pipeline._input.get_next.return_value = ({"message": "test"}, None)
event, res = self.pipeline.process_pipeline()
re.match(str(processing_warning), str(res.results[0].errors))
_, result = self.pipeline.process_pipeline()
assert processing_warning in result.results[0].errors
assert self.pipeline._output["dummy"].store.call_count == 2, "all events are processed"

@mock.patch("logging.Logger.error")
Expand Down Expand Up @@ -418,7 +417,7 @@ def raise_fatal_input_error(event):
@mock.patch("logprep.framework.pipeline.Pipeline._shut_down")
@mock.patch("logging.Logger.error")
def test_processor_fatal_output_error_is_logged_pipeline_is_shutdown(
self, mock_log_error, mock_shut_down, __
self, mock_log_error, mock_shut_down, _
):
self.pipeline._setup()
add_empty_processor_result_to_process_mocks(self.pipeline._pipeline)
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,6 @@ def test_process_return_result_object(self):
event = {"some": "event"}
result = self.object.process(event)
assert isinstance(result, ProcessorResult)
assert result.data == []
assert result.errors == []
assert result.name == "Test Instance Name"
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ def test_process_returns_none_when_no_extraction_field_matches(self):
document = {"nomessage": "test_message", "other": "field"}
result = self.object.process(document)
assert isinstance(result, ProcessorResult)
assert result.data == []
assert result.errors == []
assert result.name == "Test Instance Name"

def test_gets_matching_rules_from_rules_trees(self):
rule_trees = [self.object._generic_tree, self.object._specific_tree]
Expand Down

0 comments on commit 713df6e

Please sign in to comment.