Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make process method always return a result object #609

Merged
merged 40 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
db44347
add first draft for processor result object and fix some tests
djkhl Jun 17, 2024
de4ead8
fix tests
dtrai2 Jun 18, 2024
5b560c6
handle errors and warnings
dtrai2 Jun 18, 2024
f766dfd
fix black
dtrai2 Jun 18, 2024
94555cc
fix warnings and update changelog
djkhl Jun 19, 2024
7063919
fix test
djkhl Jun 19, 2024
92c8dca
test ensure every processor returns a result object
dtrai2 Jun 19, 2024
b1a24cc
reintroduce warnings into the rulecorpustester
dtrai2 Jun 19, 2024
ee6345b
increase wait time for http_input acceptance test and split test work…
dtrai2 Jun 19, 2024
537e255
tryout random port
dtrai2 Jun 20, 2024
2273a8e
reset port and extract workflow tests into separate yaml file
dtrai2 Jun 20, 2024
81761dc
add print for debugging
dtrai2 Jun 20, 2024
a79ea59
improve debug log
dtrai2 Jun 20, 2024
f8b6bb3
improve debug log
dtrai2 Jun 20, 2024
3ac4cfe
use default logconfig for uvicorn
dtrai2 Jun 20, 2024
bafb673
print installed python modules
dtrai2 Jun 20, 2024
4a7ce9c
restore logprep logging config and fix python version to 3.12.3 in ci…
dtrai2 Jun 20, 2024
28d8f5b
remove debug prints for acceptance test
dtrai2 Jun 20, 2024
8389ec1
use daemon config again for http input connector thread
dtrai2 Jun 20, 2024
242e44e
rename extra_data and errors in processor result objcet
djkhl Jun 20, 2024
2461583
make result.errors one list containing errors and warnings
djkhl Jun 20, 2024
c53ad94
first draft of adding a pipeline result object
djkhl Jun 24, 2024
57c6ba7
fix some pipeline tests
djkhl Jun 24, 2024
643e9ac
fix pipeline tests
djkhl Jun 25, 2024
0ec5d15
fix corpus tests?
djkhl Jun 25, 2024
eda6965
update changelog
djkhl Jun 25, 2024
cbbeae7
add deep validator for pipeline result
djkhl Jun 27, 2024
6a6845c
refactor process pipeline and process event methods and fix tests
djkhl Jun 27, 2024
17cff96
add test for separately logging warnings and errors
djkhl Jun 27, 2024
f9a82d5
Update tests/unit/framework/test_pipeline.py
djkhl Jun 27, 2024
84f6c9f
fix test readability
djkhl Jun 27, 2024
7801e0f
Merge branch '512-make-processorprocess-always-return-a-result-object…
djkhl Jun 27, 2024
675ba57
Merge branch 'main' into 512-make-processorprocess-always-return-a-re…
djkhl Jun 27, 2024
2b99597
fix black
djkhl Jun 27, 2024
713df6e
implement review remarks
dtrai2 Jul 2, 2024
472448e
fix mysql-connector-python dependency to 8.4.0
dtrai2 Jul 2, 2024
1819608
improve mock_create in test_pipeline.py
dtrai2 Jul 2, 2024
92ef6da
simplify process_event method in pipeline.py
dtrai2 Jul 2, 2024
2c46f5e
add attributes to processor result (#630)
ekneg54 Jul 12, 2024
ed30612
Merge branch 'main' into 512-make-processorprocess-always-return-a-re…
ekneg54 Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
### Breaking
### Features
### Improvements

* a result object was added to processors and pipelines
* each processor returns an object including the processor name, generated extra_data, warnings
and errors
* the pipeline returns an object with the list of all processor result objects

dtrai2 marked this conversation as resolved.
Show resolved Hide resolved
### Bugfix

## 13.0.0
Expand Down
73 changes: 56 additions & 17 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from abc import abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, List, Optional

from attr import define, field, validators

Expand All @@ -13,6 +13,7 @@
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
ProcessingError,
ProcessingWarning,
)
from logprep.util import getter
Expand All @@ -30,6 +31,37 @@
logger = logging.getLogger("Processor")


@define(kw_only=True)
class ProcessorResult:
"""
Result object to be returned by every processor. It contains the processor name, created data
and errors (incl. warnings).
"""

data: list = field(validator=validators.instance_of(list), factory=list)
""" The generated extra data """
errors: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of(ProcessingError),
iterable_validator=validators.instance_of(list),
),
factory=list,
)
""" The errors that occurred during processing """
warnings: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of(ProcessingWarning),
iterable_validator=validators.instance_of(list),
),
factory=list,
)
""" The warnings that occurred during processing """
processor_name: str = field(validator=validators.instance_of(str))
""" The name of the processor """
event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None)
""" A reference to the event that was processed """


class Processor(Component):
"""Abstract Processor Class to define the Interface"""

Expand Down Expand Up @@ -76,16 +108,16 @@ class Config(Component.Config):
"_event",
"_specific_tree",
"_generic_tree",
"_extra_data",
"result",
]

rule_class: "Rule"
has_custom_tests: bool
_event: dict
_specific_tree: RuleTree
_generic_tree: RuleTree
_extra_data: List[Tuple[dict, Tuple[dict]]]
_strategy = None
result: ProcessorResult

def __init__(self, name: str, configuration: "Processor.Config"):
super().__init__(name, configuration)
Expand All @@ -104,7 +136,7 @@ def __init__(self, name: str, configuration: "Processor.Config"):
specific_rules_targets=self._config.specific_rules,
)
self.has_custom_tests = False
self._extra_data = []
self.result = None

@property
def _specific_rules(self):
Expand Down Expand Up @@ -146,23 +178,28 @@ def metric_labels(self) -> dict:
"name": self.name,
}

def process(self, event: dict):
"""Process a log event by calling the implemented `process` method of the
strategy object set in `_strategy` attribute.
def process(self, event: dict) -> ProcessorResult:
"""Process a log event.

Parameters
----------
event : dict
A dictionary representing a log event.

Returns
-------
ProcessorResult
A ProcessorResult object containing the processed event, errors,
extra data and a list of target outputs.

"""
self._extra_data.clear()
self.result = ProcessorResult(processor_name=self.name, event=event)
logger.debug(f"{self.describe()} processing event {event}")
self._process_rule_tree(event, self._specific_tree)
self._process_rule_tree(event, self._generic_tree)
return self._extra_data if self._extra_data else None
return self.result

def _process_rule_tree(self, event: dict, tree: "RuleTree"):
def _process_rule_tree(self, event: dict, tree: RuleTree):
applied_rules = set()

@Metric.measure_time()
Expand All @@ -172,14 +209,14 @@ def _process_rule(rule, event):
applied_rules.add(rule)
return event

def _process_rule_tree_multiple_times(tree, event):
def _process_rule_tree_multiple_times(tree: RuleTree, event: dict):
matching_rules = tree.get_matching_rules(event)
while matching_rules:
for rule in matching_rules:
_process_rule(rule, event)
matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules)

def _process_rule_tree_once(tree, event):
def _process_rule_tree_once(tree: RuleTree, event: dict):
matching_rules = tree.get_matching_rules(event)
for rule in matching_rules:
_process_rule(rule, event)
Expand All @@ -195,9 +232,11 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"):
except ProcessingWarning as error:
self._handle_warning_error(event, rule, error)
except ProcessingCriticalError as error:
raise error # is needed to prevent wrapping it in itself
except BaseException as error:
raise ProcessingCriticalError(str(error), rule, event) from error
self.result.errors.append(error) # is needed to prevent wrapping it in itself
event.clear()
except Exception as error:
self.result.errors.append(ProcessingCriticalError(str(error), rule, event))
event.clear()
if not hasattr(rule, "delete_source_fields"):
return
if rule.delete_source_fields:
Expand Down Expand Up @@ -285,9 +324,9 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None):
else:
add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags})))
if isinstance(error, ProcessingWarning):
logger.warning(str(error))
self.result.warnings.append(error)
else:
logger.warning(str(ProcessingWarning(str(error), rule, event)))
self.result.warnings.append(ProcessingWarning(str(error), rule, event))

def _has_missing_values(self, event, rule, source_field_dict):
missing_fields = list(
Expand Down
Loading
Loading