Skip to content

Commit

Permalink
(#64) Support tasks retry & propagate raised exception
Browse files Browse the repository at this point in the history
For documentation, see:
1. Docstring of `task.task`
2. Tests in `tests.test_task` e.g. `test_retry_as_per_task_definition`
3. Sample usages in `tests.apps.simple_app` e.g. `append_to_file`

Changelist:

* Formalize serialization and deserialization
* Serialize & deserialize exceptions correctly
* Encapsulate retry & retry_on in a new dict 'options'
* Implement serde for AsyncResult
* Ensure generated file deleted after test
* Add jsonpickle to toml file
* Exclude `if TYPE_CHECKING:` from coverage
* Add test for singleton
* Add logging for worker
* Wrap all constants inside `Config` class
* Handle case when `options.retry.on` is empty

Signed-off-by: Imran Ariffin <ariffin.imran@gmail.com>

Requested changes:

* Rename constants.py to config.py
* Split file into config.py & constants.py
* Avoid retrying forever
* Move logic from `AsyncResult.from_publisher` to
  `Task._get_result`. This way `AsyncResult` can be
  just a pure class with no side-effect

Fixups:

* Fix docker-compose cmd not found on GithubAction
  • Loading branch information
imranariffin committed Aug 12, 2024
1 parent f02f6ce commit ed6c94f
Show file tree
Hide file tree
Showing 27 changed files with 972 additions and 136 deletions.
6 changes: 6 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ source = src/
parallel = True
concurrency = multiprocessing
sigterm = True

[report]
exclude_lines =
pragma: no cover
if TYPE_CHECKING:
if t.TYPE_CHECKING:
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- run: docker-compose -f ./docker-compose.yml up --detach redis
- run: docker compose -f ./docker-compose.yml up --detach redis
- uses: actions/setup-python@v3
with:
python-version: "3.10.x"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- run: docker-compose -f ./docker-compose.yml up --detach redis
- run: docker compose -f ./docker-compose.yml up --detach redis
- uses: actions/setup-python@v3
with:
python-version: "3.10.x"
Expand Down
12 changes: 8 additions & 4 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,16 @@ ignored-parents=
max-args=10

# Maximum number of attributes for a class (see R0902).
max-attributes=7
max-attributes=10

# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5

# Maximum number of branch for function / method body.
max-branches=12
max-branches=15

# Maximum number of locals for function / method body.
max-locals=15
max-locals=20

# Maximum number of parents for a class (see R0901).
max-parents=7
Expand Down Expand Up @@ -458,7 +458,11 @@ good-names=i,
a,
b,
c,
n
n,
id,
e,
s,
on

# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
Expand Down
6 changes: 5 additions & 1 deletion .pylintrc.tests
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,12 @@ good-names=i,
a,
b,
c,
e,
n,
ls
t,
ls,
fo,
fi

# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
Expand Down
13 changes: 12 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Attach",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
},
"justMyCode": false
},
{
"name": "Main",
"type": "python",
Expand Down Expand Up @@ -31,7 +41,8 @@
"-s",
],
"request": "launch",
"console": "integratedTerminal"
"console": "integratedTerminal",
"justMyCode": false
},
{
"name": "Sample Worker (Simple App)",
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import asyncio
import aiotaskq


@aiotaskq.task
@aiotaskq.task()
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down Expand Up @@ -132,22 +132,22 @@ import asyncio
from aiotaskq import task


@task
@task()
def task_1(*args, **kwargs):
pass


@task
@task()
def task_2(*args, **kwargs):
pass


@task
@task()
def task_3(*args, **kwargs):
pass


@task
@task()
def task_4(*args, **kwargs):
pass

Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ build-backend = "setuptools.build_meta"
requires-python = ">=3.9"
dependencies = [
"aioredis >= 2.0.0, < 2.1.0",
"jsonpickle >= 3.0.0, < 3.1.0",
"tomlkit >= 0.11.0, < 0.12.0",
"typer >= 0.4.0, < 0.5.0",
]
name = "aiotaskq"
version = "0.0.12"
version = "0.0.13"
readme = "README.md"
description = "A simple asynchronous task queue"
authors = [
Expand All @@ -28,7 +29,7 @@ license = { file = "LICENSE" }

[project.optional-dependencies]
dev = [
"black >= 22.1.0, < 22.2.0",
"black >= 22.2.0, < 23.0.0",
"coverage >= 6.4.0, < 6.5.0",
"mypy >= 0.931, < 1.0",
"mypy-extensions >= 0.4.0, < 0.5.0",
Expand Down
2 changes: 1 addition & 1 deletion src/aiotaskq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import aiotaskq
@aiotaskq.task
@aiotaskq.task()
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down
5 changes: 4 additions & 1 deletion src/aiotaskq/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

#!/usr/bin/env python

import logging
import typing as t

import typer

from . import __version__
from .config import Config
from .interfaces import ConcurrencyType
from .worker import Defaults, run_worker_forever
from . import __version__

cli = typer.Typer()
logging.basicConfig(level=Config.log_level())


def _version_callback(value: bool):
Expand Down
47 changes: 47 additions & 0 deletions src/aiotaskq/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
Module to define and store all configuration values used across the library.
The public object from this module is `Config`. This object wraps
all the configuration values, which include:
- Variables
- Environment variables
"""

import logging
from os import environ

from .interfaces import SerializationType

_REDIS_URL = "redis://127.0.0.1:6379"


class Config:
"""
Provide configuration values.
These include:
- Variables
- Environment variables
"""

@staticmethod
def serialization_type() -> SerializationType:
"""Return the serialization type as provided via env var AIOTASKQ_SERIALIZATION."""
s: str = environ.get("AIOTASKQ_SERIALIZATION", SerializationType.DEFAULT.value)
return SerializationType[s.upper()]

@staticmethod
def log_level() -> int:
"""Return the log level as provided via env var LOG_LEVEL."""
level: int = int(environ.get("AIOTASKQ_LOG_LEVEL", logging.DEBUG))
return level

@staticmethod
def broker_url() -> str:
"""
Return the broker url as provided via env var BROKER_URL.
Defaults to "redis://127.0.0.1:6379".
"""
broker_url: str = environ.get("BROKER_URL", _REDIS_URL)
return broker_url
33 changes: 29 additions & 4 deletions src/aiotaskq/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
"""Module to define and store all constants used across the library."""
"""
Module to define and store all constants used across the library.
REDIS_URL = "redis://127.0.0.1:6379"
TASKS_CHANNEL = "channel:tasks"
RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"
The public object from this module is `Constants`. This object wraps
all the constants, which include:
- Static methods that return constant values
"""


_TASKS_CHANNEL = "channel:tasks"
_RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"


class Constants:
"""
Provide all the constants.
These include:
- Static methods that return constant values
"""

@staticmethod
def tasks_channel() -> str:
"""Return the channel name used for transporting task requests on the broker."""
return _TASKS_CHANNEL

@staticmethod
def results_channel_template() -> str:
"""Return the template chnnale name used for transporting task results on the broker."""
return _RESULTS_CHANNEL_TEMPLATE
4 changes: 4 additions & 0 deletions src/aiotaskq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ class ConcurrencyTypeNotSupported(Exception):

class InvalidArgument(Exception):
"""A task is applied with invalid arguments."""


class InvalidRetryOptions(Exception):
"""A task is defined with invalid retry options."""
58 changes: 58 additions & 0 deletions src/aiotaskq/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,61 @@ class IWorkerManager(IWorker):
"""

concurrency_manager: IConcurrencyManager


class SerializationType(str, enum.Enum):
"""Specify the types of serialization supported."""

JSON = "json"
DEFAULT = JSON


T = t.TypeVar("T")


class ISerialization(t.Protocol, t.Generic[T]):
"""Define the interface required to serialize and deserialize a generic object."""

@classmethod
def serialize(cls, obj: T) -> bytes:
"""Serialize any object into bytes."""

@classmethod
def deserialize(cls, klass: type[T], s: bytes) -> T:
"""Deserialize bytes into any object."""


class RetryOptions(t.TypedDict):
"""
Specify the available retry options.
max_retries int | None: The number times to keep retrying the execution of the task
until the task executes successfully. Counting starts from
0 so if max_retries = 2 for example, then the task will execute
1 + 2 times (1 time for first execution, 2 times for re-try) in the
worst case scenario.
on tuple[type[Exception], ...]: The tuple of exception classes to retry on. The task will
will only be retried if that exception that is raised
during task execution is an instance of one of the listed
exception classes.
Examples:
If `on=(Exception,)` then any kind of exception will trigger
a retry.
If `on=(ExceptionA, ExceptionB,)` and during task
execution ExceptionC was raised, then retry is not triggered.
If `on=tuple()` then during task definition aiotaskq will raise
`InvalidRetryOptions`
"""

max_retries: int | None
on: tuple[type[Exception], ...]


class TaskOptions(t.TypedDict):
"""Specify the options available for a task."""

retry: RetryOptions | None
Loading

0 comments on commit ed6c94f

Please sign in to comment.