Skip to content

Commit

Permalink
make process method always return a result object (#609)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: dtrai2 <d.trai282@gmail.com>
Co-authored-by: Jörg Zimmermann <101292599+ekneg54@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 12, 2024
1 parent 7cc1e57 commit 8c1a340
Show file tree
Hide file tree
Showing 40 changed files with 673 additions and 466 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
### Breaking
### Features
### Improvements

* a result object was added to processors and pipelines
* each processor returns an object including the processor name, generated extra_data, warnings
and errors
* the pipeline returns an object with the list of all processor result objects

### Bugfix

## 13.0.0
Expand Down
73 changes: 56 additions & 17 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from abc import abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, List, Optional

from attr import define, field, validators

Expand All @@ -13,6 +13,7 @@
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
ProcessingError,
ProcessingWarning,
)
from logprep.util import getter
Expand All @@ -30,6 +31,37 @@
logger = logging.getLogger("Processor")


@define(kw_only=True)
class ProcessorResult:
"""
Result object to be returned by every processor. It contains the processor name, created data
and errors (incl. warnings).
"""

data: list = field(validator=validators.instance_of(list), factory=list)
""" The generated extra data """
errors: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of(ProcessingError),
iterable_validator=validators.instance_of(list),
),
factory=list,
)
""" The errors that occurred during processing """
warnings: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of(ProcessingWarning),
iterable_validator=validators.instance_of(list),
),
factory=list,
)
""" The warnings that occurred during processing """
processor_name: str = field(validator=validators.instance_of(str))
""" The name of the processor """
event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None)
""" A reference to the event that was processed """


class Processor(Component):
"""Abstract Processor Class to define the Interface"""

Expand Down Expand Up @@ -76,16 +108,16 @@ class Config(Component.Config):
"_event",
"_specific_tree",
"_generic_tree",
"_extra_data",
"result",
]

rule_class: "Rule"
has_custom_tests: bool
_event: dict
_specific_tree: RuleTree
_generic_tree: RuleTree
_extra_data: List[Tuple[dict, Tuple[dict]]]
_strategy = None
result: ProcessorResult

def __init__(self, name: str, configuration: "Processor.Config"):
super().__init__(name, configuration)
Expand All @@ -104,7 +136,7 @@ def __init__(self, name: str, configuration: "Processor.Config"):
specific_rules_targets=self._config.specific_rules,
)
self.has_custom_tests = False
self._extra_data = []
self.result = None

@property
def _specific_rules(self):
Expand Down Expand Up @@ -146,23 +178,28 @@ def metric_labels(self) -> dict:
"name": self.name,
}

def process(self, event: dict):
"""Process a log event by calling the implemented `process` method of the
strategy object set in `_strategy` attribute.
def process(self, event: dict) -> ProcessorResult:
"""Process a log event.
Parameters
----------
event : dict
A dictionary representing a log event.
Returns
-------
ProcessorResult
A ProcessorResult object containing the processed event, errors,
extra data and a list of target outputs.
"""
self._extra_data.clear()
self.result = ProcessorResult(processor_name=self.name, event=event)
logger.debug(f"{self.describe()} processing event {event}")
self._process_rule_tree(event, self._specific_tree)
self._process_rule_tree(event, self._generic_tree)
return self._extra_data if self._extra_data else None
return self.result

def _process_rule_tree(self, event: dict, tree: "RuleTree"):
def _process_rule_tree(self, event: dict, tree: RuleTree):
applied_rules = set()

@Metric.measure_time()
Expand All @@ -172,14 +209,14 @@ def _process_rule(rule, event):
applied_rules.add(rule)
return event

def _process_rule_tree_multiple_times(tree, event):
def _process_rule_tree_multiple_times(tree: RuleTree, event: dict):
matching_rules = tree.get_matching_rules(event)
while matching_rules:
for rule in matching_rules:
_process_rule(rule, event)
matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules)

def _process_rule_tree_once(tree, event):
def _process_rule_tree_once(tree: RuleTree, event: dict):
matching_rules = tree.get_matching_rules(event)
for rule in matching_rules:
_process_rule(rule, event)
Expand All @@ -195,9 +232,11 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"):
except ProcessingWarning as error:
self._handle_warning_error(event, rule, error)
except ProcessingCriticalError as error:
raise error # is needed to prevent wrapping it in itself
except BaseException as error:
raise ProcessingCriticalError(str(error), rule, event) from error
self.result.errors.append(error) # is needed to prevent wrapping it in itself
event.clear()
except Exception as error:
self.result.errors.append(ProcessingCriticalError(str(error), rule, event))
event.clear()
if not hasattr(rule, "delete_source_fields"):
return
if rule.delete_source_fields:
Expand Down Expand Up @@ -285,9 +324,9 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None):
else:
add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags})))
if isinstance(error, ProcessingWarning):
logger.warning(str(error))
self.result.warnings.append(error)
else:
logger.warning(str(ProcessingWarning(str(error), rule, event)))
self.result.warnings.append(ProcessingWarning(str(error), rule, event))

def _has_missing_values(self, event, rule, source_field_dict):
missing_fields = list(
Expand Down
Loading

0 comments on commit 8c1a340

Please sign in to comment.