Skip to content

Commit

Permalink
The data flow has been redesigned. Improved handler callbacks.
Browse files Browse the repository at this point in the history
  • Loading branch information
morington committed Feb 5, 2024
1 parent f981b74 commit 986b9ef
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 20 deletions.
3 changes: 0 additions & 3 deletions examples/setup_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@


class SetupLogger:
def __init__(self) -> None:
self.web_url = "http://127.0.0.1:3325/addlog"

def __str__(self) -> str:
return f"<{__class__.__name__} dev:{sys.stderr.isatty()}>"

Expand Down
12 changes: 10 additions & 2 deletions taskorbit/dispatching/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ async def listen(self, metadata: Metadata) -> None:
logger.error(e.args[0])

async def _metadata_processing(self, metadata: Metadata) -> None:
data = self.context_data.copy()

try:
call_processing: partial = await self.middleware.middleware_processing(handler=self._message_processing, metadata=metadata)
await call_processing(metadata=metadata, data=self.context_data)
await call_processing(metadata=metadata, data=data)
except Exception as e:
logger.error(f"{e.args[0]}")

Expand All @@ -71,8 +73,14 @@ async def _handler_processing(metadata: Message, data: dict[str, Any]) -> Any:

fields_cls: dict = get_list_parameters(handler.__call__, metadata=metadata, data=data)
fields_handle: dict = get_list_parameters(handler.handle, metadata=metadata, data=data, is_handler=True)
fields_execution_callback: dict = get_list_parameters(handler.on_execution_timeout, metadata=metadata, data=data, is_handler=True)
fields_close_callback: dict = get_list_parameters(handler.on_close, metadata=metadata, data=data, is_handler=True)

return await handler(**{**fields_cls, **fields_handle})
return await handler(**{
**fields_cls, **fields_handle,
'fields_execution_callback': fields_execution_callback,
'fields_close_callback': fields_close_callback
})

call_processing: partial | Callable = await self.inner_middleware.middleware_processing(handler=_handler_processing, metadata=metadata)

Expand Down
20 changes: 10 additions & 10 deletions taskorbit/dispatching/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from abc import ABC, abstractmethod
from types import NoneType
from typing import Callable, Awaitable, Optional, Union
from typing import Callable, Awaitable, Optional, Union, Any

from taskorbit.timer import TimerManager

Expand All @@ -18,10 +18,10 @@ def __init__(self) -> None:
self.uuid: Optional[str] = None

self.execution_timeout: Optional[int] = None
self.on_execution_timeout: Optional[Callable[[], Awaitable[None]]] = None
self.on_execution_timeout: Optional[Callable[[...], Awaitable[None]]] = None

self.close_timeout: Optional[int] = None
self.on_close: Optional[Callable[[], Awaitable[None]]] = None
self.on_close: Optional[Callable[[...], Awaitable[None]]] = None

if (
not isinstance(self.on_execution_timeout, Callable | NoneType) or
Expand All @@ -31,15 +31,15 @@ def __init__(self) -> None:
):
raise TypeError("The callback must be either a function or NoneType")

async def _execution(self) -> None:
async def _execution(self, **kwargs) -> None:
if self.on_execution_timeout is not None:
await self.on_execution_timeout()
await self.on_execution_timeout(**kwargs)
else:
logger.debug(f"Please wait, the task-{self.uuid} is still in progress...")

async def _close(self, *args, **kwargs) -> None:
async def _close(self, **kwargs) -> None:
if self.on_close is not None:
await self.on_close()
await self.on_close(**kwargs)

logger.debug("The timeout has expired and the task is being closed...")
if self.__task is not None:
Expand All @@ -55,13 +55,13 @@ def cancel(self, _) -> None:
@abstractmethod
async def handle(self, *args, **kwargs) -> None: ...

async def __call__(self, **kwargs) -> None:
async def __call__(self, fields_execution_callback: dict[str, Any], fields_close_callback: dict[str, Any], **kwargs) -> None:
try:
self.__task = asyncio.create_task(self.handle(**kwargs))
self.__task.add_done_callback(self.cancel)

await self._timer_manager.start_timer(self.execution_timeout, self._execution)
await self._timer_manager.start_timer(self.close_timeout, self._close)
await self._timer_manager.start_timer(self.execution_timeout, self._execution, **fields_execution_callback)
await self._timer_manager.start_timer(self.close_timeout, self._close, **fields_close_callback)
await self.__task
except Exception as e:
logger.debug(f"An error occurred: {e.args[0]}")
Expand Down
4 changes: 2 additions & 2 deletions taskorbit/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ class TimerManager:
def __init__(self):
self.timers: list[asyncio.Task] = []

async def start_timer(self, timeout: Optional[int], callback: Callable[[], Awaitable[None]]) -> Optional[asyncio.Task]:
async def start_timer(self, timeout: Optional[int], callback: Callable[[...], Awaitable[None]], **kwargs) -> Optional[asyncio.Task]:
async def timer() -> None:
await asyncio.sleep(timeout)
await callback()
await callback(**kwargs)

if timeout is not None:
task = asyncio.create_task(timer())
Expand Down
10 changes: 7 additions & 3 deletions taskorbit/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import logging
from types import NoneType
from typing import Callable, Any

from magic_filter import MagicFilter, AttrDict
Expand Down Expand Up @@ -40,6 +41,9 @@ def validate_filters(filters: FilterType) -> tuple[FilterType]:


def get_list_parameters(func: Callable, metadata: Message, data: dict[str, Any], is_handler: bool = False) -> dict[str, Any]:
_sig = inspect.signature(func)
_data = {**data} if is_handler else {"data": data}
return {param: value for param, value in {"metadata": metadata, **_data}.items() if param in _sig.parameters}
if isinstance(func, NoneType):
return {}
else:
_sig = inspect.signature(func)
_data = {**data} if is_handler else {"data": data}
return {param: value for param, value in {"metadata": metadata, **_data}.items() if param in _sig.parameters}

0 comments on commit 986b9ef

Please sign in to comment.