Skip to content

Commit

Permalink
Add multi input
Browse files Browse the repository at this point in the history
  • Loading branch information
rnv812 committed Jun 4, 2024
1 parent c1acec3 commit a1e44dd
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 64 deletions.
12 changes: 8 additions & 4 deletions eventum_core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@


class ApplicationConfig(BaseModel, extra='forbid', frozen=True):
input: InputConfigMapping # type: ignore[valid-type]
input: tuple[InputConfigMapping] # type: ignore[valid-type]
event: JinjaEventConfig
output: list[OutputConfigMapping] # type: ignore[valid-type]
output: tuple[OutputConfigMapping] # type: ignore[valid-type]


class Application:
Expand All @@ -45,8 +45,12 @@ def __init__(

# For all queues: The None element indicates that no more new
# elements will be put in that queue
self._input_queue: Queue[NDArray[np.datetime64]] = Queue()
self._event_queue: Queue[NDArray[np.str_]] = Queue()
self._input_queue: Queue[NDArray[np.datetime64]] = Queue(
maxsize=settings.input_queue_max_size
)
self._event_queue: Queue[NDArray[np.str_]] = Queue(
maxsize=settings.event_queue_max_size
)

# Regardless of whether the process ended with an error or not
# this flag must be set at the end of its execution.
Expand Down
9 changes: 3 additions & 6 deletions eventum_core/batcher.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from threading import Condition, RLock, Thread
from typing import Any, Callable

import numpy as np
from numpy.typing import NDArray
from typing import Any, Callable, Iterable


class Batcher:
Expand All @@ -14,7 +11,7 @@ def __init__(
self,
size: int,
timeout: float,
callback: Callable[[NDArray[Any]], Any]
callback: Callable[[Iterable], Any]
) -> None:
self._size = size
self._timeout = timeout
Expand Down Expand Up @@ -62,7 +59,7 @@ def _run_cycle(self):
def _flush_batch(self, batch):
"""Perform callback on current batch."""
if batch:
self._callback(np.array(batch))
self._callback(batch)

def add(self, element: Any) -> None:
"""Add element to current batch."""
Expand Down
16 changes: 9 additions & 7 deletions eventum_core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@ class Settings(BaseModel, extra='forbid', frozen=True):
# Time zone used in input plugins to generate timestamps.
timezone: str = 'UTC'

# The name of variable in template with timezone value (e.g. "+0300").
# The name of variable in template with timezone value (e.g. "+03:00")
timezone_field_name: str = 'tz'

# The name of variable in template with original event timestamp.
# The name of variable in template with original event timestamp
timestamp_field_name: str = Field('timestamp', min_length=1)

# Batch size / timeout (in seconds) for input-to-event plugins
# communication.
# Batch size / timeout (in seconds) for event plugin
events_batch_size: int = Field(1_000_000, ge=1)
events_batch_timeout: float = Field(1.0, ge=0)

# Batch size / timeout (in seconds) for output plugins.
# Batch size / timeout (in seconds) for output plugins
output_batch_size: int = Field(10_000, ge=1)
output_batch_timeout: float = Field(1.0, ge=0)

# Service name for keyring credentials storage
keyring_service_name: str = Field('eventum', min_length=1)
# Max size of input queue
input_queue_max_size: int = Field(10_000_000, ge=1)

# Max size of event queue
event_queue_max_size: int = Field(1_000_000, ge=1)

@field_validator('timezone')
def validate_timezone(cls, v: Any):
Expand Down
127 changes: 81 additions & 46 deletions eventum_core/subprocesses.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import asyncio
import logging
import signal
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from multiprocessing import Queue
from multiprocessing.sharedctypes import SynchronizedBase
from multiprocessing.synchronize import Event as EventClass
from typing import Callable, NoReturn, Optional
from typing import Callable, Iterable, NoReturn, Optional

import numpy as np
from eventum_plugins.event.base import (EventPluginConfigurationError,
Expand Down Expand Up @@ -60,62 +61,96 @@ def _terminate_subprocess(

@subprocess('input')
def start_input_subprocess(
config: MutexFieldsModel,
config: Iterable[MutexFieldsModel],
settings: Settings,
time_mode: TimeMode,
queue: Queue,
is_done: EventClass,
) -> None:
plugin_name = config.get_name()
input_conf = config.get_value()
plugins_list_fmt = ", ".join(
[f'"{item.get_name()}"' for item in config]
)

logger.info(f'Initializing "{plugin_name}" input plugin')
logger.info(f'Initializing [{plugins_list_fmt}] input plugins')

try:
plugin_class = load_input_plugin_class(plugin_name=plugin_name)
input_plugin = plugin_class(
config=input_conf,
tz=timezone(settings.timezone)
)
except ValueError as e:
logger.error(f'Failed to load input plugin: {e}')
_terminate_subprocess(is_done, 1, queue)
except InputPluginConfigurationError as e:
logger.error(f'Failed to initialize input plugin: {e}')
_terminate_subprocess(is_done, 1, queue)
except Exception as e:
logger.error(
'Unexpected error occurred during initializing '
f'input plugin: {e}'
)
_terminate_subprocess(is_done, 1, queue)
input_plugins: list[BaseOutputPlugin] = []
input_plugin_names: list[str] = []

logger.info('Input plugin is successfully initialized')
for item in config:
plugin_name = item.get_name()
input_conf = item.get_value()

try:
try:
plugin_class = load_input_plugin_class(plugin_name=plugin_name)
input_plugins.append(
plugin_class(
config=input_conf,
tz=timezone(settings.timezone)
)
)
input_plugin_names.append(plugin_name)
except ValueError as e:
logger.error(f'Failed to load "{plugin_name}" input plugin: {e}')
_terminate_subprocess(is_done, 1, queue)
except InputPluginConfigurationError as e:
logger.error(
f'Failed to initialize "{plugin_name}" input plugin: {e}'
)
_terminate_subprocess(is_done, 1, queue)
except Exception as e:
logger.error(
'Unexpected error occurred during initializing '
f'"{plugin_name}" input plugin: {e}'
)
_terminate_subprocess(is_done, 1, queue)

logger.info('Input plugins are successfully initialized')

plugin_tasks: list[Callable] = []
for plugin_name, plugin in zip(input_plugin_names, input_plugins):
if hasattr(plugin, time_mode.value):
plugin_tasks.append(plugin.__getattribute__(time_mode.value))
else:
logger.error(
f'"{plugin_name}" input plugin does not support '
f'"{time_mode}" mode'
)
_terminate_subprocess(is_done, 1, queue)

with ThreadPoolExecutor(max_workers=len(plugin_tasks)) as executor:
with Batcher(
size=settings.events_batch_size,
timeout=settings.events_batch_timeout,
callback=queue.put
callback=lambda batch: queue.put(np.array(batch))
) as batcher:
plugin_mode = input_plugin.__getattribute__(time_mode.value)
plugin_mode.__call__(on_event=batcher.add)
except AttributeError:
logger.error(
f'"{plugin_name}" input plugin does not support "{time_mode}" mode'
)
_terminate_subprocess(is_done, 1, queue)
except InputPluginRuntimeError as e:
logger.error(f'Error occurred during input plugin execution: {e}')
_terminate_subprocess(is_done, 1, queue)
except Exception as e:
logger.error(
f'Unexpected error occurred during input plugin execution: {e}'
)
_terminate_subprocess(is_done, 1, queue)
submitted_tasks: list[Future] = []
for task in plugin_tasks:
submitted_tasks.append(
executor.submit(task, on_event=batcher.add)
)

logger.info('Stopping input plugin')
_terminate_subprocess(is_done, 0, queue)
all_success = True
for plugin_name, task in zip(input_plugin_names, submitted_tasks):
try:
task.result()
except InputPluginRuntimeError as e:
logger.error(
f'Error occurred during "{plugin_name}" input plugin '
f'execution: {e}'
)
all_success = False
except Exception as e:
logger.error(
f'Unexpected error occurred during "{plugin_name}" '
f'input plugin execution: {e}'
)
all_success = False

if not all_success:
logger.info('Stopping input plugins')
_terminate_subprocess(is_done, 0, queue)
else:
_terminate_subprocess(is_done, 1, queue)


@subprocess('event')
Expand Down Expand Up @@ -150,7 +185,7 @@ def start_event_subprocess(
with Batcher(
size=settings.output_batch_size,
timeout=settings.output_batch_timeout,
callback=event_queue.put
callback=lambda batch: event_queue.put(batch)
) as batcher:
while True:
timestamps_batch = input_queue.get()
Expand Down Expand Up @@ -181,7 +216,7 @@ def start_event_subprocess(

@subprocess('output')
def start_output_subprocess(
config: list[MutexFieldsModel],
config: Iterable[MutexFieldsModel],
settings: Settings,
queue: Queue,
processed_events: SynchronizedBase,
Expand All @@ -203,7 +238,7 @@ def start_output_subprocess(
plugin_class = load_output_plugin_class(plugin_name=plugin_name)
output_plugins.append(plugin_class(config=output_conf))
except ValueError as e:
logger.error(f'Failed to load output plugin: {e}')
logger.error(f'Failed to load "{plugin_name}" output plugin: {e}')
_terminate_subprocess(is_done, 1)
except OutputPluginConfigurationError as e:
logger.error(
Expand Down
2 changes: 1 addition & 1 deletion eventum_core/tests/test_subprocesses.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_input_subprocess():
proc_input = Process(
target=start_input_subprocess,
args=(
InputConfigMapping(sample=SampleInputConfig(count=10)),
[InputConfigMapping(sample=SampleInputConfig(count=10))],
Settings(events_batch_timeout=0),
TimeMode.SAMPLE,
queue,
Expand Down

0 comments on commit a1e44dd

Please sign in to comment.