Skip to content

Commit

Permalink
refactor pipeline_result
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Jul 10, 2024
1 parent b41a391 commit 3aa980d
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 24 deletions.
2 changes: 0 additions & 2 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ class ProcessorResult:
factory=list,
)
""" The errors and warnings that occurred during processing """
outputs: list = field(validator=validators.instance_of(list), factory=list)
""" The outputs of the processors extra data """
processor_name: str = field(validator=validators.instance_of(str))
""" The name of the processor """
event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None)
Expand Down
24 changes: 19 additions & 5 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from functools import cached_property, partial
from importlib.metadata import version
from multiprocessing import Lock, Value, current_process
from typing import Any, List, Tuple, Optional
from typing import Any, List, Optional, Tuple

import attrs
import msgspec
Expand Down Expand Up @@ -59,6 +59,15 @@ class PipelineResult:
),
]
)
"""List of ProcessorResults"""
event: dict = attrs.field(validator=attrs.validators.instance_of(dict))
"""The event that was processed"""
event_received: dict = attrs.field(
validator=attrs.validators.instance_of(dict), converter=copy.deepcopy
)
"""The event that was received"""
pipeline: "Pipeline"
"""The pipeline that processed the event"""

def __iter__(self):
return iter(self.results)
Expand Down Expand Up @@ -253,14 +262,14 @@ def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]:
if self._output:
self._store_failed_event(processor_result.errors, event_received, event)
# pipeline is aborted on processing error
return event, result
return result
if self._output:
result_data = [res.data for res in result if res.data]
if result_data:
self._store_extra_data(itertools.chain(*result_data))
if event:
self._store_event(event)
return event, result
return result

def _store_event(self, event: dict) -> None:
for output_name, output in self._output.items():
Expand Down Expand Up @@ -288,9 +297,14 @@ def _get_event(self) -> dict:
@Metric.measure_time()
def process_event(self, event: dict):
"""process all processors for one event"""
return PipelineResult(
results=[processor.process(event) for processor in self._pipeline if event]
result = PipelineResult(
results=[],
event_received=event,
event=event,
pipeline=self,
)
result.results = [processor.process(event) for processor in self._pipeline if event]
return result

def _store_extra_data(self, result_data: List | itertools.chain) -> None:
self.logger.debug("Storing extra data")
Expand Down
52 changes: 36 additions & 16 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,19 +239,20 @@ def test_output_warning_error_is_logged_but_processing_continues(self, mock_warn
assert mock_warning.call_count == 1
assert self.pipeline._output["dummy"].store.call_count == 3

def test_processor_warning_error_is_logged_but_processing_continues(self, _):
@mock.patch("logging.Logger.warning")
def test_processor_warning_error_is_logged_but_processing_continues(self, mock_warning, _):
self.pipeline._setup()
self.pipeline._input.get_next.return_value = ({"message": "test"}, None)
mock_rule = mock.MagicMock()
processing_warning = ProcessingWarning("not so bad", mock_rule, {"message": "test"})
self.pipeline._pipeline[1].process.return_value = ProcessorResult(
processor_name="mock_processor", errors=[processing_warning]
)

self.pipeline.process_pipeline()
self.pipeline._input.get_next.return_value = ({"message": "test"}, None)
_, result = self.pipeline.process_pipeline()
result = self.pipeline.process_pipeline()
assert processing_warning in result.results[0].errors
mock_warning.assert_called()
assert self.pipeline._output["dummy"].store.call_count == 2, "all events are processed"

@mock.patch("logging.Logger.error")
Expand Down Expand Up @@ -619,6 +620,27 @@ def test_shutdown_logs_fatal_errors(self, mock_error, _):
logger_call = f"Couldn't gracefully shut down pipeline due to: {error}"
mock_error.assert_called_with(logger_call)

def test_pipeline_result_provides_event_received(self, _):
self.pipeline._setup()
event = {"some": "event"}
self.pipeline._input.get_next.return_value = (event, None)
generic_adder = original_create(
{
"generic_adder": {
"type": "generic_adder",
"specific_rules": [
{"filter": "some", "generic_adder": {"add": {"field": "foo"}}}
],
"generic_rules": [],
}
}
)
self.pipeline._pipeline = [generic_adder]
result = self.pipeline.process_pipeline()
assert result.event_received is not event, "event_received is a copy"
assert result.event_received == {"some": "event"}, "received event is as expected"
assert result.event == {"some": "event", "field": "foo"}, "processed event is as expected"


class TestPipelineWithActualInput:
def setup_method(self):
Expand All @@ -637,10 +659,9 @@ def test_pipeline_without_output_connector_and_one_input_event_and_preprocessors
self.config.input["test_input"]["documents"] = [{"applyrule": "yes"}]
pipeline = Pipeline(config=self.config)
assert pipeline._output is None
event, extra_outputs = pipeline.process_pipeline()
assert event["label"] == {"reporter": ["windows"]}
assert "arrival_time" in event
assert extra_outputs.results[0].data == []
result = pipeline.process_pipeline()
assert result.event["label"] == {"reporter": ["windows"]}
assert "arrival_time" in result.event

def test_process_event_processes_without_input_and_without_output(self):
event = {"applyrule": "yes"}
Expand All @@ -660,13 +681,12 @@ def test_pipeline_without_output_connector_and_two_input_events_and_preprocessor
self.config.input["test_input"]["documents"] = input_events
pipeline = Pipeline(config=self.config)
assert pipeline._output is None
event, extra_outputs = pipeline.process_pipeline()
assert event["label"] == {"reporter": ["windows"]}
assert "arrival_time" in event
event, extra_outputs = pipeline.process_pipeline()
assert "pseudonym" in event.get("winlog", {}).get("event_data", {}).get("IpAddress")
assert "arrival_time" in event
assert len(extra_outputs.results) == len(pipeline._pipeline)
result = pipeline.process_pipeline()
assert result.event["label"] == {"reporter": ["windows"]}
assert "arrival_time" in result.event
result = pipeline.process_pipeline()
assert "pseudonym" in result.event.get("winlog", {}).get("event_data", {}).get("IpAddress")
assert "arrival_time" in result.event

def test_pipeline_hmac_error_message_without_output_connector(self):
self.config.input["test_input"]["documents"] = [{"applyrule": "yes"}]
Expand All @@ -675,8 +695,8 @@ def test_pipeline_hmac_error_message_without_output_connector(self):
}
pipeline = Pipeline(config=self.config)
assert pipeline._output is None
event, _ = pipeline.process_pipeline()
assert event["hmac"]["hmac"] == "error"
result = pipeline.process_pipeline()
assert result.event["hmac"]["hmac"] == "error"

def test_pipeline_run_raises_assertion_when_run_without_input(self):
self.config.input = {}
Expand Down
1 change: 0 additions & 1 deletion tests/unit/processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ def test_process_return_result_object(self):
result = self.object.process(event)
assert isinstance(result, ProcessorResult)
assert isinstance(result.data, list)
assert isinstance(result.outputs, list)
assert isinstance(result.errors, list)
assert result.processor_name == "Test Instance Name"

Expand Down

0 comments on commit 3aa980d

Please sign in to comment.