diff --git a/pyproject.toml b/pyproject.toml index ed11e53..f06e373 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ "typer >= 0.4.0, < 0.5.0", ] name = "aiotaskq" -version = "0.0.11" +version = "0.0.12" readme = "README.md" description = "A simple asynchronous task queue" authors = [ diff --git a/src/aiotaskq/exceptions.py b/src/aiotaskq/exceptions.py index 4f58a4f..2a5037c 100644 --- a/src/aiotaskq/exceptions.py +++ b/src/aiotaskq/exceptions.py @@ -15,3 +15,7 @@ class UrlNotSupported(Exception): class ConcurrencyTypeNotSupported(Exception): """This concurrency type is currently not supported.""" + + +class InvalidArgument(Exception): + """A task is applied with invalid arguments.""" diff --git a/src/aiotaskq/task.py b/src/aiotaskq/task.py index 1fa5639..90e8b94 100644 --- a/src/aiotaskq/task.py +++ b/src/aiotaskq/task.py @@ -8,7 +8,7 @@ import uuid from .constants import REDIS_URL, RESULTS_CHANNEL_TEMPLATE, TASKS_CHANNEL -from .exceptions import ModuleInvalidForTask +from .exceptions import InvalidArgument, ModuleInvalidForTask from .interfaces import IPubSub, PollResponse from .pubsub import PubSub @@ -104,6 +104,9 @@ async def apply_async(self, *args: P.args, **kwargs: P.kwargs) -> RT: 5. The worker process will publish the result of the task to Results Channel 6. The main process (the caller) will pick up the result and return the result. DONE """ + # Raise error if arguments provided are invalid, before enything + self._validate_arguments(task_args=args, task_kwargs=kwargs) + task_id: str = self.generate_task_id() message: str = json.dumps( { @@ -125,6 +128,15 @@ async def apply_async(self, *args: P.args, **kwargs: P.kwargs) -> RT: return result + def _validate_arguments(self, task_args: tuple, task_kwargs: dict): + try: + func_sig: "inspect.Signature" = inspect.signature(self.func) + func_sig.bind(*task_args, **task_kwargs) + except TypeError as exc: + raise InvalidArgument( + f"These arguments are invalid: args={task_args}, kwargs={task_kwargs}" + ) from exc + def task(func: t.Callable[P, RT]) -> Task[P, RT]: """Decorator to convert a callable into an aiotaskq Task instance.""" diff --git a/src/aiotaskq/worker.py b/src/aiotaskq/worker.py index e1e7f76..6d60a2d 100755 --- a/src/aiotaskq/worker.py +++ b/src/aiotaskq/worker.py @@ -151,13 +151,13 @@ def _handle_murder_signals(self): self.concurrency_manager.terminate() async def _main_loop(self): - self._logger.info("Started main loop") + self._logger.info("[%s] Started main loop", self._pid) async with self.pubsub as pubsub: # pylint: disable=not-async-context-manager counter = -1 await pubsub.subscribe(TASKS_CHANNEL) while True: - self._logger.debug("Polling for a new task until it's available") + self._logger.debug("[%s] Polling for a new task until it's available", self._pid) message = await pubsub.poll() # A new task is now available @@ -200,7 +200,7 @@ async def _pre_run(self): pass async def _main_loop(self): - self._logger.debug("Started main loop") + self._logger.debug("[%s] Started main loop", self._pid) channel: str = self._get_child_worker_tasks_channel(pid=self._pid) batch_size = self._worker_rate_limit if self._worker_rate_limit != -1 else 99 diff --git a/src/tests/test_task.py b/src/tests/test_task.py new file mode 100644 index 0000000..171a16c --- /dev/null +++ b/src/tests/test_task.py @@ -0,0 +1,55 @@ +from typing import TYPE_CHECKING + +import pytest + +from aiotaskq.exceptions import InvalidArgument +from tests.apps import simple_app + +if TYPE_CHECKING: # pragma: no cover + from aiotaskq.task import Task + from tests.conftest import WorkerFixture + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "task,invalid_args,invalid_kwargs", + [ + pytest.param( + *(simple_app.add, tuple(), {"invalid_kwarg_1": 1, "invalid_kwarg_2": 2}), + id="Provide invalid keyword arguments", + ), + pytest.param( + *(simple_app.power, (1, 2, 3), {}), + id="Provide more positional arguments than allowed", + ), + pytest.param( + *(simple_app.echo, tuple(), {"y": 1}), + id="Provide positional arguments as keyword, but missing one", + ), + pytest.param( + *(simple_app.add, (1,), {}), + id="Provide positional argument, but missing one", + ), + ], +) +async def test_invalid_argument_provided_to_apply_async( + worker: "WorkerFixture", + task: "Task", + invalid_args: tuple, + invalid_kwargs: dict, +): + # Given a worker running in the background + await worker.start(simple_app.__name__) + + # When a task has been applied with invalid arguments + # Then an error should raised + error = None + try: + _ = await task.apply_async(*invalid_args, **invalid_kwargs) + except InvalidArgument as exc: + error = exc + finally: + assert str(error) == ( + f"These arguments are invalid: args={invalid_args}," + f" kwargs={invalid_kwargs}" + )