Skip to content

Commit

Permalink
add attributes to processor result (#630)
Browse files Browse the repository at this point in the history
* add outputs, event to processor_result and rename name to processor_name
* add tests for collecting errors in result object
* add tests for event reference
* refactor pipeline_result
* refactor result and remove lock from pipeline
  • Loading branch information
ekneg54 committed Jul 12, 2024
1 parent 92ef6da commit 2c46f5e
Show file tree
Hide file tree
Showing 34 changed files with 293 additions and 218 deletions.
59 changes: 31 additions & 28 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,28 @@ class ProcessorResult:
and errors (incl. warnings).
"""

name: str = field(validator=validators.instance_of(str))
data: list = field(validator=validators.instance_of(list), factory=list)
""" The generated extra data """
errors: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of((ProcessingError, ProcessingWarning)),
member_validator=validators.instance_of(ProcessingError),
iterable_validator=validators.instance_of(list),
),
factory=list,
)

def __contains__(self, error_class):
return any(isinstance(item, error_class) for item in self.errors)

def get_warning_string(self):
"""creates a string containing the warnings"""
return ", ".join(
[error.args[0] for error in self.errors if isinstance(error, ProcessingWarning)]
)

def get_error_string(self):
"""creates a string containing the errors"""
return ", ".join(
[error.args[0] for error in self.errors if isinstance(error, ProcessingError)]
)
""" The errors that occurred during processing """
warnings: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of(ProcessingWarning),
iterable_validator=validators.instance_of(list),
),
factory=list,
)
""" The warnings that occurred during processing """
processor_name: str = field(validator=validators.instance_of(str))
""" The name of the processor """
event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None)
""" A reference to the event that was processed """


class Processor(Component):
Expand Down Expand Up @@ -138,7 +136,7 @@ def __init__(self, name: str, configuration: "Processor.Config"):
specific_rules_targets=self._config.specific_rules,
)
self.has_custom_tests = False
self.result = ProcessorResult(name=self.name)
self.result = None

@property
def _specific_rules(self):
Expand Down Expand Up @@ -180,23 +178,28 @@ def metric_labels(self) -> dict:
"name": self.name,
}

def process(self, event: dict):
"""Process a log event by calling the implemented `process` method of the
strategy object set in `_strategy` attribute.
def process(self, event: dict) -> ProcessorResult:
"""Process a log event.
Parameters
----------
event : dict
A dictionary representing a log event.
Returns
-------
ProcessorResult
A ProcessorResult object containing the processed event, errors,
extra data and a list of target outputs.
"""
self.result = ProcessorResult(name=self.name)
self.result = ProcessorResult(processor_name=self.name, event=event)
logger.debug(f"{self.describe()} processing event {event}")
self._process_rule_tree(event, self._specific_tree)
self._process_rule_tree(event, self._generic_tree)
return self.result

def _process_rule_tree(self, event: dict, tree: "RuleTree"):
def _process_rule_tree(self, event: dict, tree: RuleTree):
applied_rules = set()

@Metric.measure_time()
Expand All @@ -206,14 +209,14 @@ def _process_rule(rule, event):
applied_rules.add(rule)
return event

def _process_rule_tree_multiple_times(tree, event):
def _process_rule_tree_multiple_times(tree: RuleTree, event: dict):
matching_rules = tree.get_matching_rules(event)
while matching_rules:
for rule in matching_rules:
_process_rule(rule, event)
matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules)

def _process_rule_tree_once(tree, event):
def _process_rule_tree_once(tree: RuleTree, event: dict):
matching_rules = tree.get_matching_rules(event)
for rule in matching_rules:
_process_rule(rule, event)
Expand All @@ -231,7 +234,7 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"):
except ProcessingCriticalError as error:
self.result.errors.append(error) # is needed to prevent wrapping it in itself
event.clear()
except BaseException as error:
except Exception as error:
self.result.errors.append(ProcessingCriticalError(str(error), rule, event))
event.clear()
if not hasattr(rule, "delete_source_fields"):
Expand Down Expand Up @@ -321,9 +324,9 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None):
else:
add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags})))
if isinstance(error, ProcessingWarning):
self.result.errors.append(error)
self.result.warnings.append(error)
else:
self.result.errors.append(ProcessingWarning(str(error), rule, event))
self.result.warnings.append(ProcessingWarning(str(error), rule, event))

def _has_missing_values(self, event, rule, source_field_dict):
missing_fields = list(
Expand Down
84 changes: 50 additions & 34 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
from ctypes import c_bool
from functools import cached_property, partial
from importlib.metadata import version
from multiprocessing import Lock, Value, current_process
from typing import Any, List, Tuple, Optional
from multiprocessing import Value, current_process
from typing import Any, Generator, List, Tuple

import attrs
import msgspec

from logprep.abc.component import Component
from logprep.abc.input import (
Expand Down Expand Up @@ -53,12 +52,41 @@ class PipelineResult:

results: List[ProcessorResult] = attrs.field(
validator=[
attrs.validators.instance_of(list),
attrs.validators.instance_of((list, Generator)),
attrs.validators.deep_iterable(
member_validator=attrs.validators.instance_of(ProcessorResult)
),
]
)
"""List of ProcessorResults"""
event: dict = attrs.field(validator=attrs.validators.instance_of(dict))
"""The event that was processed"""
event_received: dict = attrs.field(
validator=attrs.validators.instance_of(dict), converter=copy.deepcopy
)
"""The event that was received"""
pipeline: list[Processor]
"""The pipeline that processed the event"""

@cached_property
def errors(self) -> List[ProcessingError]:
"""Return all processing errors."""
return list(itertools.chain(*[result.errors for result in self]))

@cached_property
def warnings(self) -> List[ProcessingWarning]:
"""Return all processing warnings."""
return list(itertools.chain(*[result.warnings for result in self]))

@cached_property
def data(self) -> List[Tuple[dict, dict]]:
"""Return all extra data."""
return list(itertools.chain(*[result.data for result in self]))

def __attrs_post_init__(self):
self.results = list(
(processor.process(self.event) for processor in self.pipeline if self.event)
)

def __iter__(self):
return iter(self.results)
Expand Down Expand Up @@ -113,9 +141,6 @@ class Metrics(Component.Metrics):
_continue_iterating: Value
""" a flag to signal if iterating continues """

_lock: Lock
""" the lock for the pipeline process """

pipeline_index: int
""" the index of this pipeline """

Expand Down Expand Up @@ -175,19 +200,13 @@ def _input(self) -> Input:
)
return Factory.create(input_connector_config)

def __init__(
self, config: Configuration, pipeline_index: int = None, lock: Lock = None
) -> None:
def __init__(self, config: Configuration, pipeline_index: int = None) -> None:
self.logger = logging.getLogger("Pipeline")
self.logger.name = f"Pipeline{pipeline_index}"
self._logprep_config = config
self._timeout = config.timeout
self._continue_iterating = Value(c_bool)

self._lock = lock
self.pipeline_index = pipeline_index
self._encoder = msgspec.msgpack.Encoder()
self._decoder = msgspec.msgpack.Decoder()
if self._logprep_config.profile_pipelines:
self.run = partial(PipelineProfiler.profile_function, self.run)

Expand Down Expand Up @@ -224,43 +243,36 @@ def run(self) -> None: # pylint: disable=method-hidden
self._continue_iterating.value = True
assert self._input, "Pipeline should not be run without input connector"
assert self._output, "Pipeline should not be run without output connector"
with self._lock:
with warnings.catch_warnings():
warnings.simplefilter("default")
self._setup()
with warnings.catch_warnings():
warnings.simplefilter("default")
self._setup()
self.logger.debug("Start iterating")
while self._continue_iterating.value:
self.process_pipeline()
self._shut_down()

@_handle_pipeline_error
def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]:
def process_pipeline(self) -> PipelineResult:
"""Retrieve next event, process event with full pipeline and store or return results"""
Component.run_pending_tasks()

event = self._get_event()
if not event:
return None, None
event_received = copy.deepcopy(event)
result: PipelineResult = self.process_event(event)
for processor_result in result:
if not processor_result.errors:
continue
if ProcessingWarning in processor_result:
self.logger.warning(processor_result.get_warning_string())
if ProcessingError in processor_result:
self.logger.error(processor_result.get_error_string())
if self._output:
self._store_failed_event(processor_result.errors, event_received, event)
# pipeline is aborted on processing error
return event, result
if result.warnings:
self.logger.warning(",".join((str(warning) for warning in result.warnings)))
if result.errors:
self.logger.error(",".join((str(error) for error in result.errors)))
self._store_failed_event(result.errors, result.event_received, event)
return
if self._output:
result_data = [res.data for res in result if res.data]
if result_data:
self._store_extra_data(itertools.chain(*result_data))
if event:
self._store_event(event)
return event, result
return result

def _store_event(self, event: dict) -> None:
for output_name, output in self._output.items():
Expand Down Expand Up @@ -288,9 +300,13 @@ def _get_event(self) -> dict:
@Metric.measure_time()
def process_event(self, event: dict):
"""process all processors for one event"""
return PipelineResult(
results=[processor.process(event) for processor in self._pipeline if event]
result = PipelineResult(
results=[],
event_received=event,
event=event,
pipeline=self._pipeline,
)
return result

def _store_extra_data(self, result_data: List | itertools.chain) -> None:
self.logger.debug("Storing extra data")
Expand Down
3 changes: 1 addition & 2 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def __init__(self, configuration: Configuration):
self._pipelines: list[multiprocessing.Process] = []
self._configuration = configuration

self._lock = multiprocessing.Lock()
prometheus_config = self._configuration.metrics
if prometheus_config.enabled:
self.prometheus_exporter = PrometheusExporter(prometheus_config)
Expand Down Expand Up @@ -164,7 +163,7 @@ def restart(self):
self.prometheus_exporter.run()

def _create_pipeline(self, index) -> multiprocessing.Process:
pipeline = Pipeline(pipeline_index=index, config=self._configuration, lock=self._lock)
pipeline = Pipeline(pipeline_index=index, config=self._configuration)
logger.info("Created new pipeline")
process = multiprocessing.Process(target=pipeline.run, daemon=True)
process.stop = pipeline.stop
Expand Down
17 changes: 13 additions & 4 deletions logprep/processor/generic_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@
import re
from typing import Union

from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
)
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.generic_resolver.rule import GenericResolverRule
from logprep.util.getter import GetterFactory
from logprep.util.helper import add_field_to, get_dotted_field_value


class GenericResolverError(BaseException):
class GenericResolverError(ProcessingCriticalError):
"""Base class for GenericResolver related exceptions."""

def __init__(self, name: str, message: str):
super().__init__(f"GenericResolver ({name}): {message}")
def __init__(self, name: str, message: str, rule: GenericResolverRule, event: dict):
super().__init__(f"{name}: {message}", rule=rule, event=event)


class GenericResolver(FieldManager):
Expand Down Expand Up @@ -78,6 +81,8 @@ def _apply_rules(self, event, rule):
raise GenericResolverError(
self.name,
"Mapping group is missing in mapping file pattern!",
rule=rule,
event=event,
)
dest_val = replacements.get(mapping)
if dest_val:
Expand Down Expand Up @@ -145,9 +150,13 @@ def ensure_rules_from_file(self, rule):
f"Additions file "
f'\'{rule.resolve_from_file["path"]}\''
f" must be a dictionary with string values!",
rule=rule,
event=None,
)
except FileNotFoundError as error:
raise GenericResolverError(
self.name,
f'Additions file \'{rule.resolve_from_file["path"]}' f"' not found!",
rule=rule,
event=None,
) from error
6 changes: 2 additions & 4 deletions logprep/processor/hyperscan_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from logprep.processor.base.exceptions import FieldExistsWarning, SkipImportError
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.generic_resolver.processor import GenericResolverError
from logprep.util.helper import add_field_to, get_dotted_field_value
from logprep.util.validators import directory_validator

Expand All @@ -56,12 +57,9 @@
# pylint: enable=ungrouped-imports


class HyperscanResolverError(BaseException):
class HyperscanResolverError(GenericResolverError):
"""Base class for HyperscanResolver related exceptions."""

def __init__(self, name: str, message: str):
super().__init__(f"HyperscanResolver ({name}): {message}")


class HyperscanResolver(FieldManager):
"""Resolve values in documents by referencing a mapping list."""
Expand Down
Loading

0 comments on commit 2c46f5e

Please sign in to comment.