Skip to content

Commit

Permalink
simplify process_event method in pipeline.py
Browse files Browse the repository at this point in the history
  • Loading branch information
dtrai2 committed Jul 2, 2024
1 parent 1819608 commit 92ef6da
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 12 deletions.
2 changes: 2 additions & 0 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"):
self._handle_warning_error(event, rule, error)
except ProcessingCriticalError as error:
self.result.errors.append(error) # is needed to prevent wrapping it in itself
event.clear()
except BaseException as error:
self.result.errors.append(ProcessingCriticalError(str(error), rule, event))
event.clear()
if not hasattr(rule, "delete_source_fields"):
return
if rule.delete_source_fields:
Expand Down
18 changes: 6 additions & 12 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def metric_labels(self) -> dict:
}

@cached_property
def _pipeline(self) -> tuple:
def _pipeline(self) -> list[Processor]:
self.logger.debug(f"Building '{self._process_name}'")
pipeline = [self._create_processor(entry) for entry in self._logprep_config.pipeline]
self.logger.debug("Finished building pipeline")
Expand Down Expand Up @@ -255,9 +255,9 @@ def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]:
# pipeline is aborted on processing error
return event, result
if self._output:
result_data = itertools.chain(*[res.data for res in result if res.data])
result_data = [res.data for res in result if res.data]
if result_data:
self._store_extra_data(result_data)
self._store_extra_data(itertools.chain(*result_data))
if event:
self._store_event(event)
return event, result
Expand Down Expand Up @@ -288,15 +288,9 @@ def _get_event(self) -> dict:
@Metric.measure_time()
def process_event(self, event: dict):
"""process all processors for one event"""
results = []
for processor in self._pipeline:
result: ProcessorResult = processor.process(event)
results.append(result)
if ProcessingError in result:
event.clear()
if not event:
break
return PipelineResult(results=results)
return PipelineResult(
results=[processor.process(event) for processor in self._pipeline if event]
)

def _store_extra_data(self, result_data: List | itertools.chain) -> None:
self.logger.debug("Storing extra data")
Expand Down

0 comments on commit 92ef6da

Please sign in to comment.