Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close #9: Support multiple concurrent worker processes #20

Merged
merged 4 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[run]
source = src/
32 changes: 0 additions & 32 deletions .github/workflows/run-checks.yaml

This file was deleted.

28 changes: 22 additions & 6 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,38 @@
"program": "./src/aiotaskq/main.py",
"console": "integratedTerminal"
},
{
"name": "Sample App",
"type": "python",
"request": "launch",
"module": "aiotaskq.tests.apps.simple_app",
"args": [],
"console": "integratedTerminal"
},
{
"name": "Test",
"type": "python",
"module": "coverage",
"args": [
"run",
"-m",
"pytest",
"-vvv",
"-s",
],
"request": "launch",
"program": "./src/aiotaskq/tests/integration_test.py",
"console": "integratedTerminal"
},
{
"name": "Worker",
"name": "Sample Worker (Simple App)",
"type": "python",
"request": "launch",
"program": "./src/aiotaskq/worker.py",
"console": "integratedTerminal",
"module": "aiotaskq",
"args": [
"aiotaskq.main"
]
"worker",
"aiotaskq.tests.apps.simple_app"
],
"console": "integratedTerminal",
}
]
}
11 changes: 8 additions & 3 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
"**/.git": true,
"**/.venv": true,
"**/.hg": true,
"**/.DS_Store": true
}
}
"**/.DS_Store": true,
"**/*.egg-info": true,
"**/__pycache__": true,
"**/.mypy_cache": true
},
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false
}
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import asyncio
import aiotaskq


@aiotaskq.register_task
@aiotaskq.task
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down Expand Up @@ -130,25 +130,25 @@ Using `aiotaskq` we may end up with the following:
```python
import asyncio

from aiotaskq import register_task
from aiotaskq import task


@register_task
@task
def task_1(*args, **kwargs):
pass


@register_task
@task
def task_2(*args, **kwargs):
pass


@register_task
@task
def task_3(*args, **kwargs):
pass


@register_task
@task
def task_4(*args, **kwargs):
pass

Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ dependencies = [
"click==8.0.4"
]
name = "aiotaskq"
version = "0.0.4"
version = "0.0.5"
[project.optional-dependencies]
tests = [
"mypy==0.931",
"mypy-extensions==0.4.3",
"typing_extensions==4.1.1",
"black==22.1.0"
"black==22.1.0",
"pytest==7.1.2",
"pytest-asyncio==0.19.0"
]
[tool.black]
line-length = 100
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = aiotaskq
version = 0.0.4
version = 0.0.5
author = Imran Ariffin
author_email = ariffin.imran@gmail.com
description = A simple asynchronous task queue
Expand Down
6 changes: 3 additions & 3 deletions src/aiotaskq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import aiotaskq


@aiotaskq.register_task
@aiotaskq.task
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand All @@ -29,7 +29,7 @@ async def main():

"""

from .main import register_task
from .main import task


__all__ = ["register_task"]
__all__ = ["task"]
13 changes: 9 additions & 4 deletions src/aiotaskq/__main__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
"""Module to define the main commands for the cli."""

#!/usr/bin/env python

import asyncio
import typing as t

import typer

from aiotaskq.worker import worker
from aiotaskq.worker import Defaults, worker

cli = typer.Typer()


@cli.command(name="worker")
def _worker_command(app: str):
def worker_command(app: str, concurrency: t.Optional[int] = Defaults.concurrency):
"""Command to start workers."""
loop = asyncio.get_event_loop()
loop.run_until_complete(worker(app_import_path=app))
loop.run_until_complete(worker(app_import_path=app, concurrency=concurrency))


@cli.command(name="metric")
def _metric_server(app: str):
def metric_server(app: str):
"""Command to start server to collect and report tasks metrics (TODO)."""
print(f"TODO: Running metrics server for app={app}")


Expand Down
6 changes: 2 additions & 4 deletions src/aiotaskq/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Module to define and store all constants used across the library."""

REDIS_URL = "redis://127.0.0.1:6379"
TASKS_CHANNEL = "channel:tasks"
RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"

# REDIS_URL = "redis://127.0.0.1:6379"
# TASKS_CHANNEL = "channel:tasks"
# RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"
11 changes: 9 additions & 2 deletions src/aiotaskq/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
class WorkerNotReady(Exception):
"""Attempt to send task to worker but no worker is subscribing to tasks channel."""
"""
Define all exceptions that are possibly raised by the package.

Any raised thrown must be defined here.
"""


class ModuleInvalidForTask(Exception):
"""Attempt to convert to task a function in an invalid module."""
Loading