Skip to content

Commit

Permalink
refactor pipeline_result
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Jul 10, 2024
1 parent 3aa980d commit 9267242
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from functools import cached_property, partial
from importlib.metadata import version
from multiprocessing import Lock, Value, current_process
from typing import Any, List, Optional, Tuple
from typing import Any, Generator, List, Optional, Tuple

import attrs
import msgspec
Expand Down Expand Up @@ -53,7 +53,7 @@ class PipelineResult:

results: List[ProcessorResult] = attrs.field(
validator=[
attrs.validators.instance_of(list),
attrs.validators.instance_of((list, Generator)),
attrs.validators.deep_iterable(
member_validator=attrs.validators.instance_of(ProcessorResult)
),
Expand All @@ -66,9 +66,14 @@ class PipelineResult:
validator=attrs.validators.instance_of(dict), converter=copy.deepcopy
)
"""The event that was received"""
pipeline: "Pipeline"
pipeline: list[Processor]
"""The pipeline that processed the event"""

def __attrs_post_init__(self):
self.results = list(
(processor.process(self.event) for processor in self.pipeline if self.event)
)

def __iter__(self):
return iter(self.results)

Expand Down Expand Up @@ -243,14 +248,13 @@ def run(self) -> None: # pylint: disable=method-hidden
self._shut_down()

@_handle_pipeline_error
def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]:
def process_pipeline(self) -> PipelineResult:
"""Retrieve next event, process event with full pipeline and store or return results"""
Component.run_pending_tasks()

event = self._get_event()
if not event:
return None, None
event_received = copy.deepcopy(event)
result: PipelineResult = self.process_event(event)
for processor_result in result:
if not processor_result.errors:
Expand All @@ -260,9 +264,9 @@ def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]:
if ProcessingError in processor_result:
self.logger.error(processor_result.get_error_string())
if self._output:
self._store_failed_event(processor_result.errors, event_received, event)
self._store_failed_event(processor_result.errors, result.event_received, event)
# pipeline is aborted on processing error
return result
return
if self._output:
result_data = [res.data for res in result if res.data]
if result_data:
Expand Down Expand Up @@ -301,9 +305,8 @@ def process_event(self, event: dict):
results=[],
event_received=event,
event=event,
pipeline=self,
pipeline=self._pipeline,
)
result.results = [processor.process(event) for processor in self._pipeline if event]
return result

def _store_extra_data(self, result_data: List | itertools.chain) -> None:
Expand Down

0 comments on commit 9267242

Please sign in to comment.