Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make process method always return a result object #609

Merged
merged 40 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
db44347
add first draft for processor result object and fix some tests
djkhl Jun 17, 2024
de4ead8
fix tests
dtrai2 Jun 18, 2024
5b560c6
handle errors and warnings
dtrai2 Jun 18, 2024
f766dfd
fix black
dtrai2 Jun 18, 2024
94555cc
fix warnings and update changelog
djkhl Jun 19, 2024
7063919
fix test
djkhl Jun 19, 2024
92c8dca
test ensure every processor returns a result object
dtrai2 Jun 19, 2024
b1a24cc
reintroduce warnings into the rulecorpustester
dtrai2 Jun 19, 2024
ee6345b
increase wait time for http_input acceptance test and split test work…
dtrai2 Jun 19, 2024
537e255
tryout random port
dtrai2 Jun 20, 2024
2273a8e
reset port and extract workflow tests into separate yaml file
dtrai2 Jun 20, 2024
81761dc
add print for debugging
dtrai2 Jun 20, 2024
a79ea59
improve debug log
dtrai2 Jun 20, 2024
f8b6bb3
improve debug log
dtrai2 Jun 20, 2024
3ac4cfe
use default logconfig for uvicorn
dtrai2 Jun 20, 2024
bafb673
print installed python modules
dtrai2 Jun 20, 2024
4a7ce9c
restore logprep logging config and fix python version to 3.12.3 in ci…
dtrai2 Jun 20, 2024
28d8f5b
remove debug prints for acceptance test
dtrai2 Jun 20, 2024
8389ec1
use daemon config again for http input connector thread
dtrai2 Jun 20, 2024
242e44e
rename extra_data and errors in processor result objcet
djkhl Jun 20, 2024
2461583
make result.errors one list containing errors and warnings
djkhl Jun 20, 2024
c53ad94
first draft of adding a pipeline result object
djkhl Jun 24, 2024
57c6ba7
fix some pipeline tests
djkhl Jun 24, 2024
643e9ac
fix pipeline tests
djkhl Jun 25, 2024
0ec5d15
fix corpus tests?
djkhl Jun 25, 2024
eda6965
update changelog
djkhl Jun 25, 2024
cbbeae7
add deep validator for pipeline result
djkhl Jun 27, 2024
6a6845c
refactor process pipeline and process event methods and fix tests
djkhl Jun 27, 2024
17cff96
add test for separately logging warnings and errors
djkhl Jun 27, 2024
f9a82d5
Update tests/unit/framework/test_pipeline.py
djkhl Jun 27, 2024
84f6c9f
fix test readability
djkhl Jun 27, 2024
7801e0f
Merge branch '512-make-processorprocess-always-return-a-result-object…
djkhl Jun 27, 2024
675ba57
Merge branch 'main' into 512-make-processorprocess-always-return-a-re…
djkhl Jun 27, 2024
2b99597
fix black
djkhl Jun 27, 2024
713df6e
implement review remarks
dtrai2 Jul 2, 2024
472448e
fix mysql-connector-python dependency to 8.4.0
dtrai2 Jul 2, 2024
1819608
improve mock_create in test_pipeline.py
dtrai2 Jul 2, 2024
92ef6da
simplify process_event method in pipeline.py
dtrai2 Jul 2, 2024
2c46f5e
add attributes to processor result (#630)
ekneg54 Jul 12, 2024
ed30612
Merge branch 'main' into 512-make-processorprocess-always-return-a-re…
ekneg54 Jul 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
run: |
pip install --upgrade pip wheel
pip install .[dev]
pip list
dtrai2 marked this conversation as resolved.
Show resolved Hide resolved
- name: Perform ${{ matrix.test-type }} test
env:
PYTEST_ADDOPTS: "--color=yes"
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
### Breaking
### Features
### Improvements

* a result object was added which is returned by every processor
* includes the processor name, generated extra_data, warnings and errors

dtrai2 marked this conversation as resolved.
Show resolved Hide resolved
### Bugfix

## 13.0.0
Expand Down
51 changes: 41 additions & 10 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from abc import abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, List, Optional

from attr import define, field, validators

Expand All @@ -13,6 +13,7 @@
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
ProcessingError,
ProcessingWarning,
)
from logprep.util import getter
Expand All @@ -30,6 +31,36 @@
logger = logging.getLogger("Processor")


@define(kw_only=True)
class ProcessorResult:
"""Result object to be returned by every processor. It contains all extra_data and errors."""
dtrai2 marked this conversation as resolved.
Show resolved Hide resolved

name: str = field(validator=validators.instance_of(str))
data: list = field(validator=validators.instance_of(list), factory=list)
errors: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of((ProcessingError, ProcessingWarning)),
iterable_validator=validators.instance_of(list),
),
factory=list,
)

def __contains__(self, error_class):
return any(isinstance(item, error_class) for item in self.errors)

def get_warning_string(self):
"""creates a string containing the warnings"""
return ", ".join(
[error.args[0] for error in self.errors if isinstance(error, ProcessingWarning)]
)

def get_error_string(self):
"""creates a string containing the errors"""
return ", ".join(
[error.args[0] for error in self.errors if isinstance(error, ProcessingError)]
)


class Processor(Component):
"""Abstract Processor Class to define the Interface"""

Expand Down Expand Up @@ -76,16 +107,16 @@ class Config(Component.Config):
"_event",
"_specific_tree",
"_generic_tree",
"_extra_data",
"result",
]

rule_class: "Rule"
has_custom_tests: bool
_event: dict
_specific_tree: RuleTree
_generic_tree: RuleTree
_extra_data: List[Tuple[dict, Tuple[dict]]]
_strategy = None
result: ProcessorResult

def __init__(self, name: str, configuration: "Processor.Config"):
super().__init__(name, configuration)
Expand All @@ -104,7 +135,7 @@ def __init__(self, name: str, configuration: "Processor.Config"):
specific_rules_targets=self._config.specific_rules,
)
self.has_custom_tests = False
self._extra_data = []
self.result = ProcessorResult(name=self.name)

@property
def _specific_rules(self):
Expand Down Expand Up @@ -156,11 +187,11 @@ def process(self, event: dict):
A dictionary representing a log event.

"""
self._extra_data.clear()
self.result = ProcessorResult(name=self.name)
logger.debug(f"{self.describe()} processing event {event}")
self._process_rule_tree(event, self._specific_tree)
self._process_rule_tree(event, self._generic_tree)
return self._extra_data if self._extra_data else None
return self.result

def _process_rule_tree(self, event: dict, tree: "RuleTree"):
applied_rules = set()
Expand Down Expand Up @@ -195,9 +226,9 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"):
except ProcessingWarning as error:
self._handle_warning_error(event, rule, error)
except ProcessingCriticalError as error:
raise error # is needed to prevent wrapping it in itself
self.result.errors.append(error) # is needed to prevent wrapping it in itself
except BaseException as error:
raise ProcessingCriticalError(str(error), rule, event) from error
self.result.errors.append(ProcessingCriticalError(str(error), rule, event))
if not hasattr(rule, "delete_source_fields"):
return
if rule.delete_source_fields:
Expand Down Expand Up @@ -285,9 +316,9 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None):
else:
add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags})))
if isinstance(error, ProcessingWarning):
logger.warning(str(error))
self.result.errors.append(error)
else:
logger.warning(str(ProcessingWarning(str(error), rule, event)))
self.result.errors.append(ProcessingWarning(str(error), rule, event))

def _has_missing_values(self, event, rule, source_field_dict):
missing_fields = list(
Expand Down
110 changes: 66 additions & 44 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import copy
import itertools
import logging
import logging.handlers
import multiprocessing
Expand Down Expand Up @@ -37,14 +38,32 @@
Output,
OutputWarning,
)
from logprep.abc.processor import Processor
from logprep.abc.processor import Processor, ProcessorResult
from logprep.factory import Factory
from logprep.metrics.metrics import HistogramMetric, Metric
from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning
from logprep.processor.base.exceptions import ProcessingError, ProcessingWarning
from logprep.util.configuration import Configuration
from logprep.util.pipeline_profiler import PipelineProfiler


@attrs.define(kw_only=True)
class PipelineResult:
"""Result object to be returned after processing the event.
It contains all generated data and includes errors and warnings."""
dtrai2 marked this conversation as resolved.
Show resolved Hide resolved

results: List[ProcessorResult] = attrs.field(
validator=[
attrs.validators.instance_of(list),
attrs.validators.deep_iterable(
member_validator=attrs.validators.instance_of(ProcessorResult)
),
]
)

def __iter__(self):
return iter(self.results)


def _handle_pipeline_error(func):
def _inner(self: "Pipeline") -> Any:
try:
Expand Down Expand Up @@ -218,68 +237,71 @@ def run(self) -> None: # pylint: disable=method-hidden
def process_pipeline(self) -> Tuple[dict, list]:
dtrai2 marked this conversation as resolved.
Show resolved Hide resolved
"""Retrieve next event, process event with full pipeline and store or return results"""
Component.run_pending_tasks()
extra_outputs = []
event = None
try:
event = self._get_event()
except CriticalInputParsingError as error:
input_data = error.raw_input
if isinstance(input_data, bytes):
input_data = input_data.decode("utf8")
error_event = self._encoder.encode({"invalid_json": input_data})
self._store_failed_event(error, "", error_event)
self.logger.error(f"{error}, event was written to error output")
if event:
extra_outputs = self.process_event(event)
if event and self._output:
self._store_event(event)
return event, extra_outputs

event = self._get_event()
event_received = copy.deepcopy(event)
if not event:
return None, None
dtrai2 marked this conversation as resolved.
Show resolved Hide resolved
result: PipelineResult = self.process_event(event)
for processor_result in result:
if not processor_result.errors:
continue
if ProcessingWarning in processor_result:
self.logger.warning(processor_result.get_warning_string())
if ProcessingError in processor_result:
self.logger.error(processor_result.get_error_string())
if self._output:
self._store_failed_event(processor_result.errors, event_received, event)
# pipeline is aborted on processing error
return event, result
if self._output:
result_data = [res.data for res in result if res.data]
result_data = itertools.chain(*result_data)
if result_data:
self._store_extra_data(result_data)
if event:
self._store_event(event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should document this behavior somewhere? The readme or the shpinx doc should describe when extra_data will be written to the output and that the pipeline is interrupted on errors, as well as the fact that extra_data is not stored when an error happens inside the pipeline.

return event, result

def _store_event(self, event: dict) -> None:
for output_name, output in self._output.items():
if output.default:
output.store(event)
self.logger.debug(f"Stored output in {output_name}")

def _store_failed_event(self, error, event, event_received):
def _store_failed_event(self, error, event_received, event):
for _, output in self._output.items():
if output.default:
output.store_failed(str(error), self._decoder.decode(event_received), event)
output.store_failed(str(error), event_received, event)

def _get_event(self) -> dict:
event, non_critical_error_msg = self._input.get_next(self._timeout)
if non_critical_error_msg and self._output:
for _, output in self._output.items():
if output.default:
output.store_failed(non_critical_error_msg, event, None)
return event
try:
event, non_critical_error_msg = self._input.get_next(self._timeout)
if non_critical_error_msg and self._output:
self._store_failed_event(non_critical_error_msg, event, None)
return event
except CriticalInputParsingError as error:
input_data = error.raw_input
if isinstance(input_data, bytes):
input_data = input_data.decode("utf8")
self._store_failed_event(error, {"invalid_json": input_data}, "")

@Metric.measure_time()
def process_event(self, event: dict):
"""process all processors for one event"""

event_received = self._encoder.encode(event)
extra_outputs = []
results = []
for processor in self._pipeline:
try:
if extra_data := processor.process(event):
if self._output:
self._store_extra_data(extra_data)
extra_outputs.append(extra_data)
except ProcessingWarning as error:
self.logger.warning(str(error))
except ProcessingCriticalError as error:
self.logger.error(str(error))
if self._output:
self._store_failed_event(error, copy.deepcopy(event), event_received)
event.clear()
result: ProcessorResult = processor.process(event)
results.append(result)
if ProcessingError in result:
event.clear()
if not event:
break
return extra_outputs
return PipelineResult(results=results)

ekneg54 marked this conversation as resolved.
Show resolved Hide resolved
def _store_extra_data(self, extra_data: List[tuple]) -> None:
def _store_extra_data(self, result_data: List) -> None:
self.logger.debug("Storing extra data")
for document, outputs in extra_data:
for document, outputs in result_data:
for output in outputs:
for output_name, target in output.items():
self._output[output_name].store_custom(document, target)
Expand Down
6 changes: 3 additions & 3 deletions logprep/processor/pre_detector/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from logprep.abc.processor import Processor
from logprep.processor.pre_detector.ip_alerter import IPAlerter
from logprep.processor.pre_detector.rule import PreDetectorRule
from logprep.util.helper import get_dotted_field_value, add_field_to
from logprep.util.helper import add_field_to, get_dotted_field_value
from logprep.util.time import TimeParser


Expand Down Expand Up @@ -98,7 +98,7 @@ def _apply_rules(self, event, rule):
and not self._ip_alerter.is_in_alerts_list(rule, event)
):
self._get_detection_result(event, rule)
for detection, _ in self._extra_data:
for detection, _ in self.result.data:
detection["creation_timestamp"] = TimeParser.now().isoformat()
timestamp = get_dotted_field_value(event, "@timestamp")
if timestamp is not None:
Expand All @@ -111,7 +111,7 @@ def _get_detection_result(self, event: dict, rule: PreDetectorRule):
add_field_to(event, "pre_detection_id", pre_detection_id)

detection_result = self._generate_detection_result(pre_detection_id, event, rule)
self._extra_data.append((detection_result, self._config.outputs))
self.result.data.append((detection_result, self._config.outputs))

@staticmethod
def _generate_detection_result(pre_detection_id: str, event: dict, rule: PreDetectorRule):
Expand Down
6 changes: 3 additions & 3 deletions logprep/processor/pseudonymizer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def _apply_rules(self, event: dict, rule: PseudonymizerRule):
field_value = self._pseudonymize_field(rule, dotted_field, regex, field_value)
_ = add_field_to(event, dotted_field, field_value, overwrite_output_field=True)
if "@timestamp" in event:
for pseudonym, _ in self._extra_data:
for pseudonym, _ in self.result.data:
pseudonym["@timestamp"] = event["@timestamp"]
self._update_cache_metrics()

Expand Down Expand Up @@ -297,8 +297,8 @@ def _pseudonymize_string(self, value: str) -> str:
return value
pseudonym_dict = self._get_pseudonym_dict_cached(value)
extra = (pseudonym_dict, self._config.outputs)
if extra not in self._extra_data:
self._extra_data.append(extra)
if extra not in self.result.data:
self.result.data.append(extra)
return self._wrap_hash(pseudonym_dict["pseudonym"])

def _pseudonymize(self, value):
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/selective_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ def _apply_rules(self, event: dict, rule: SelectiveExtractorRule):
filtered_event = {}
for field, content in flattened_fields.items():
add_field_to(filtered_event, field, content)
self._extra_data.append((filtered_event, rule.outputs))
self.result.data.append((filtered_event, rule.outputs))
Loading
Loading