Skip to content

Commit

Permalink
(#54) Raise error early on invalid arguments to apply_async (#62)
Browse files Browse the repository at this point in the history
* Run validation on the arguments using the `inspect.Signature.bind`
  built-in utility
* If invalid, raise error. Otherwise, proceed to sending task to queue
* This aims to help user identity bug early on
* We might consider turning this feature on only when AIOTASKQ_DEBUG=1 or
  something like that in the future

Bump version to 0.0.12

Signed-off-by: Imran Ariffin <ariffin.imran@gmail.com>
  • Loading branch information
imranariffin authored Jul 22, 2023
1 parent 10f264e commit f02f6ce
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies = [
"typer >= 0.4.0, < 0.5.0",
]
name = "aiotaskq"
version = "0.0.11"
version = "0.0.12"
readme = "README.md"
description = "A simple asynchronous task queue"
authors = [
Expand Down
4 changes: 4 additions & 0 deletions src/aiotaskq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ class UrlNotSupported(Exception):

class ConcurrencyTypeNotSupported(Exception):
"""This concurrency type is currently not supported."""


class InvalidArgument(Exception):
"""A task is applied with invalid arguments."""
14 changes: 13 additions & 1 deletion src/aiotaskq/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uuid

from .constants import REDIS_URL, RESULTS_CHANNEL_TEMPLATE, TASKS_CHANNEL
from .exceptions import ModuleInvalidForTask
from .exceptions import InvalidArgument, ModuleInvalidForTask
from .interfaces import IPubSub, PollResponse
from .pubsub import PubSub

Expand Down Expand Up @@ -104,6 +104,9 @@ async def apply_async(self, *args: P.args, **kwargs: P.kwargs) -> RT:
5. The worker process will publish the result of the task to Results Channel
6. The main process (the caller) will pick up the result and return the result. DONE
"""
# Raise error if arguments provided are invalid, before enything
self._validate_arguments(task_args=args, task_kwargs=kwargs)

task_id: str = self.generate_task_id()
message: str = json.dumps(
{
Expand All @@ -125,6 +128,15 @@ async def apply_async(self, *args: P.args, **kwargs: P.kwargs) -> RT:

return result

def _validate_arguments(self, task_args: tuple, task_kwargs: dict):
try:
func_sig: "inspect.Signature" = inspect.signature(self.func)
func_sig.bind(*task_args, **task_kwargs)
except TypeError as exc:
raise InvalidArgument(
f"These arguments are invalid: args={task_args}, kwargs={task_kwargs}"
) from exc


def task(func: t.Callable[P, RT]) -> Task[P, RT]:
"""Decorator to convert a callable into an aiotaskq Task instance."""
Expand Down
6 changes: 3 additions & 3 deletions src/aiotaskq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ def _handle_murder_signals(self):
self.concurrency_manager.terminate()

async def _main_loop(self):
self._logger.info("Started main loop")
self._logger.info("[%s] Started main loop", self._pid)

async with self.pubsub as pubsub: # pylint: disable=not-async-context-manager
counter = -1
await pubsub.subscribe(TASKS_CHANNEL)
while True:
self._logger.debug("Polling for a new task until it's available")
self._logger.debug("[%s] Polling for a new task until it's available", self._pid)
message = await pubsub.poll()

# A new task is now available
Expand Down Expand Up @@ -200,7 +200,7 @@ async def _pre_run(self):
pass

async def _main_loop(self):
self._logger.debug("Started main loop")
self._logger.debug("[%s] Started main loop", self._pid)
channel: str = self._get_child_worker_tasks_channel(pid=self._pid)
batch_size = self._worker_rate_limit if self._worker_rate_limit != -1 else 99

Expand Down
55 changes: 55 additions & 0 deletions src/tests/test_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import TYPE_CHECKING

import pytest

from aiotaskq.exceptions import InvalidArgument
from tests.apps import simple_app

if TYPE_CHECKING: # pragma: no cover
from aiotaskq.task import Task
from tests.conftest import WorkerFixture


@pytest.mark.asyncio
@pytest.mark.parametrize(
"task,invalid_args,invalid_kwargs",
[
pytest.param(
*(simple_app.add, tuple(), {"invalid_kwarg_1": 1, "invalid_kwarg_2": 2}),
id="Provide invalid keyword arguments",
),
pytest.param(
*(simple_app.power, (1, 2, 3), {}),
id="Provide more positional arguments than allowed",
),
pytest.param(
*(simple_app.echo, tuple(), {"y": 1}),
id="Provide positional arguments as keyword, but missing one",
),
pytest.param(
*(simple_app.add, (1,), {}),
id="Provide positional argument, but missing one",
),
],
)
async def test_invalid_argument_provided_to_apply_async(
worker: "WorkerFixture",
task: "Task",
invalid_args: tuple,
invalid_kwargs: dict,
):
# Given a worker running in the background
await worker.start(simple_app.__name__)

# When a task has been applied with invalid arguments
# Then an error should raised
error = None
try:
_ = await task.apply_async(*invalid_args, **invalid_kwargs)
except InvalidArgument as exc:
error = exc
finally:
assert str(error) == (
f"These arguments are invalid: args={invalid_args},"
f" kwargs={invalid_kwargs}"
)

0 comments on commit f02f6ce

Please sign in to comment.