Skip to content

Commit

Permalink
refactor result and remove lock from pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Jul 11, 2024
1 parent 201d769 commit 12d6443
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 37 deletions.
49 changes: 14 additions & 35 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
from ctypes import c_bool
from functools import cached_property, partial
from importlib.metadata import version
from multiprocessing import Lock, Value, current_process
from typing import Any, Generator, List, Optional, Tuple
from multiprocessing import Value, current_process
from typing import Any, Generator, List, Tuple

import attrs
import msgspec

from logprep.abc.component import Component
from logprep.abc.input import (
Expand Down Expand Up @@ -69,30 +68,20 @@ class PipelineResult:
pipeline: list[Processor]
"""The pipeline that processed the event"""

@property
def has_processing_errors(self) -> bool:
"""Check if any of the results has processing errors."""
return any(result.errors for result in self)

@property
def has_processing_warnings(self) -> bool:
"""Check if any of the results has processing errors."""
return any(result.warnings for result in self)

@property
@cached_property
def errors(self) -> List[ProcessingError]:
"""Return all processing errors."""
return itertools.chain(*[result.errors for result in self])
return list(itertools.chain(*[result.errors for result in self]))

@property
@cached_property
def warnings(self) -> List[ProcessingWarning]:
"""Return all processing warnings."""
return itertools.chain(*[result.warnings for result in self])
return list(itertools.chain(*[result.warnings for result in self]))

@property
@cached_property
def data(self) -> List[Tuple[dict, dict]]:
"""Return all extra data."""
return itertools.chain(*[result.data for result in self])
return list(itertools.chain(*[result.data for result in self]))

def __attrs_post_init__(self):
self.results = list(
Expand Down Expand Up @@ -152,9 +141,6 @@ class Metrics(Component.Metrics):
_continue_iterating: Value
""" a flag to signal if iterating continues """

_lock: Lock
""" the lock for the pipeline process """

pipeline_index: int
""" the index of this pipeline """

Expand Down Expand Up @@ -214,19 +200,13 @@ def _input(self) -> Input:
)
return Factory.create(input_connector_config)

def __init__(
self, config: Configuration, pipeline_index: int = None, lock: Lock = None
) -> None:
def __init__(self, config: Configuration, pipeline_index: int = None) -> None:
self.logger = logging.getLogger("Pipeline")
self.logger.name = f"Pipeline{pipeline_index}"
self._logprep_config = config
self._timeout = config.timeout
self._continue_iterating = Value(c_bool)

self._lock = lock
self.pipeline_index = pipeline_index
self._encoder = msgspec.msgpack.Encoder()
self._decoder = msgspec.msgpack.Decoder()
if self._logprep_config.profile_pipelines:
self.run = partial(PipelineProfiler.profile_function, self.run)

Expand Down Expand Up @@ -263,10 +243,9 @@ def run(self) -> None: # pylint: disable=method-hidden
self._continue_iterating.value = True
assert self._input, "Pipeline should not be run without input connector"
assert self._output, "Pipeline should not be run without output connector"
with self._lock:
with warnings.catch_warnings():
warnings.simplefilter("default")
self._setup()
with warnings.catch_warnings():
warnings.simplefilter("default")
self._setup()
self.logger.debug("Start iterating")
while self._continue_iterating.value:
self.process_pipeline()
Expand All @@ -281,9 +260,9 @@ def process_pipeline(self) -> PipelineResult:
if not event:
return None, None
result: PipelineResult = self.process_event(event)
if result.has_processing_warnings:
if result.warnings:
self.logger.warning(",".join((str(warning) for warning in result.warnings)))
if result.has_processing_errors:
if result.errors:
self.logger.error(",".join((str(error) for error in result.errors)))
self._store_failed_event(result.errors, result.event_received, event)
return
Expand Down
3 changes: 1 addition & 2 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def __init__(self, configuration: Configuration):
self._pipelines: list[multiprocessing.Process] = []
self._configuration = configuration

self._lock = multiprocessing.Lock()
prometheus_config = self._configuration.metrics
if prometheus_config.enabled:
self.prometheus_exporter = PrometheusExporter(prometheus_config)
Expand Down Expand Up @@ -164,7 +163,7 @@ def restart(self):
self.prometheus_exporter.run()

def _create_pipeline(self, index) -> multiprocessing.Process:
pipeline = Pipeline(pipeline_index=index, config=self._configuration, lock=self._lock)
pipeline = Pipeline(pipeline_index=index, config=self._configuration)
logger.info("Created new pipeline")
process = multiprocessing.Process(target=pipeline.run, daemon=True)
process.stop = pipeline.stop
Expand Down

0 comments on commit 12d6443

Please sign in to comment.