From 4d501c10c5579b3fd649bd2ddf1162007b10a3e5 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sat, 19 Oct 2024 12:24:32 +0000 Subject: [PATCH 1/4] Remove asgiref dependency from non Django code --- docs/discussions.md | 6 +++--- docs/howto/advanced/sync_defer.md | 6 +++--- poetry.lock | 4 ++-- procrastinate/utils.py | 9 ++++++--- procrastinate_demos/demo_async/__main__.py | 6 +----- pyproject.toml | 4 ++-- tests/acceptance/test_sync.py | 3 +-- tests/integration/contrib/aiopg/test_aiopg_connector.py | 4 +--- tests/integration/test_psycopg_connector.py | 4 +--- 9 files changed, 20 insertions(+), 26 deletions(-) diff --git a/docs/discussions.md b/docs/discussions.md index b2202a96a..a3f008f73 100644 --- a/docs/discussions.md +++ b/docs/discussions.md @@ -193,11 +193,11 @@ When you define an asynchronous connector, Procrastinate will try to seamlessly give you the right connector for your context. When you call the synchronous API, it will either create a sync connector based on your async connector, or let you use the async connector directly with -`asgiref.sync.async_to_sync`. +`asyncio.run`. ::: For running jobs, support of synchronous task functions is through -`asgiref.sync.sync_to_async`. This means your synchronous function will be +`asyncio.to_thread`. This means your synchronous function will be executed by an asynchronous worker in a thread. Because of the [Global Interpreter Lock][global interpreter lock], you will not benefit from parallelism, but you will still be able to parallelize (thread-safe) I/Os. @@ -290,7 +290,7 @@ Procrastinate: driver. Under the hood, we have factored as much as possible the non-I/O parts of the code, so that the synchronous and asynchronous versions are only separate in the way they handle I/Os. -- For executing a synchronous task: we use `asgiref.sync.sync_to_async` to run the +- For executing a synchronous task: we use `asyncio.to_thread` to run the synchronous code in a thread. - There are a few case where we facilitate calling Procrastinate from synchronous codebases, by providing a synchronous API, where we'll create an diff --git a/docs/howto/advanced/sync_defer.md b/docs/howto/advanced/sync_defer.md index 629cfb4b9..a573dbb63 100644 --- a/docs/howto/advanced/sync_defer.md +++ b/docs/howto/advanced/sync_defer.md @@ -71,7 +71,7 @@ be using a synchronous connector. If you request the synchronous connector after opening the app, you will get the asynchronous connector, with a compatibility layer to make synchronous -operations. This will only work if you call it inside a function decorated -with `asgiref.sync.sync_to_async` (such as inside a sync job). Otherwise, -you will likely get a `RuntimeError`. +operations. This will only work if you call it inside a function that runs +in its own thread (such as inside a sync job). Otherwise, you will likely +get a `RuntimeError`. ::: diff --git a/poetry.lock b/poetry.lock index 4d8c28903..2877bfdb8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1893,7 +1893,7 @@ type = ["pytest-mypy"] [extras] aiopg = ["aiopg", "psycopg2-binary"] -django = ["django"] +django = ["asgiref", "django"] psycopg2 = ["psycopg2-binary"] sphinx = ["sphinx"] sqlalchemy = ["sqlalchemy"] @@ -1901,4 +1901,4 @@ sqlalchemy = ["sqlalchemy"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "8d4350e830593b495e8caa03a3be60037cdcd221d240b26adf75199a2eb02ff1" +content-hash = "3624f62ad4cec7f623dcf8341125f75543d37981010008f738f2c4b437fc549d" diff --git a/procrastinate/utils.py b/procrastinate/utils.py index 37d100634..2908d472b 100644 --- a/procrastinate/utils.py +++ b/procrastinate/utils.py @@ -24,7 +24,6 @@ ) import dateutil.parser -from asgiref import sync from procrastinate import exceptions from procrastinate.types import TimeDeltaParams @@ -98,7 +97,11 @@ def async_to_sync(awaitable: Callable[..., Awaitable[T]], *args, **kwargs) -> T: Given a callable returning an awaitable, call the callable, await it synchronously. Returns the result after it's done. """ - return sync.async_to_sync(awaitable)(*args, **kwargs) + + async def wrapper() -> T: + return await awaitable(*args, **kwargs) + + return asyncio.run(wrapper()) async def sync_to_async(func: Callable[..., T], *args, **kwargs) -> T: @@ -106,7 +109,7 @@ async def sync_to_async(func: Callable[..., T], *args, **kwargs) -> T: Given a callable, return a callable that will call the original one in an async context. """ - return await sync.sync_to_async(func, thread_sensitive=False)(*args, **kwargs) + return await asyncio.to_thread(func, *args, **kwargs) def causes(exc: BaseException | None): diff --git a/procrastinate_demos/demo_async/__main__.py b/procrastinate_demos/demo_async/__main__.py index 71d1706e9..87f4fb71f 100644 --- a/procrastinate_demos/demo_async/__main__.py +++ b/procrastinate_demos/demo_async/__main__.py @@ -4,13 +4,9 @@ import json import logging -import asgiref.sync - from . import app as app_module from . import tasks -ainput = asgiref.sync.sync_to_async(input) - async def main(): logging.info("Running app in async context") @@ -22,7 +18,7 @@ async def main(): print("Enter an empty line to quit") print() while True: - response = (await ainput("Your input: ")).strip() + response = (await asyncio.to_thread(input, "Your input: ")).strip() if not response: break command, *args = (response).split(maxsplit=1) diff --git a/pyproject.toml b/pyproject.toml index 82641bbc0..4a70a781c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ procrastinate = 'procrastinate.cli:main' python = "^3.9" aiopg = { version = "*", optional = true } anyio = "*" -asgiref = "*" +asgiref = { version = "*", optional = true } attrs = "*" contextlib2 = { version = "*", python = "<3.10" } croniter = "*" @@ -39,7 +39,7 @@ typing-extensions = "*" sphinx = { version = "*", optional = true } [tool.poetry.extras] -django = ["django"] +django = ["asgiref", "django"] sqlalchemy = ["sqlalchemy"] aiopg = ["aiopg", "psycopg2-binary"] psycopg2 = ["psycopg2-binary"] diff --git a/tests/acceptance/test_sync.py b/tests/acceptance/test_sync.py index f74607743..cece47372 100644 --- a/tests/acceptance/test_sync.py +++ b/tests/acceptance/test_sync.py @@ -4,7 +4,6 @@ import time import pytest -from asgiref.sync import sync_to_async import procrastinate from procrastinate.contrib import psycopg2 @@ -70,7 +69,7 @@ def _inner_sum_task_sync(a, b): sum_results.append(a + b) # Only works if the worker runs the sync task in a separate thread - await sync_to_async(_inner_sum_task_sync)(a, b) + await asyncio.to_thread(_inner_sum_task_sync, a, b) asyncio.run(_sum_task_async(a, b)) diff --git a/tests/integration/contrib/aiopg/test_aiopg_connector.py b/tests/integration/contrib/aiopg/test_aiopg_connector.py index b413bdb0b..7c23b5053 100644 --- a/tests/integration/contrib/aiopg/test_aiopg_connector.py +++ b/tests/integration/contrib/aiopg/test_aiopg_connector.py @@ -4,7 +4,6 @@ import functools import json -import asgiref.sync import attr import pytest @@ -101,7 +100,6 @@ async def test_get_sync_connector(aiopg_connector_factory): aiopg_connector = await aiopg_connector_factory(open=False) - @asgiref.sync.sync_to_async def f(): sync_conn = aiopg_connector.get_sync_connector() sync_conn.open() @@ -110,7 +108,7 @@ def f(): finally: sync_conn.close() - await f() + await asyncio.to_thread(f) assert list(result[0].values()) == [1] diff --git a/tests/integration/test_psycopg_connector.py b/tests/integration/test_psycopg_connector.py index 48fe3e0c3..a4d60d98f 100644 --- a/tests/integration/test_psycopg_connector.py +++ b/tests/integration/test_psycopg_connector.py @@ -4,7 +4,6 @@ import functools import json -import asgiref.sync import attr import pytest @@ -112,7 +111,6 @@ async def test_wrap_exceptions(psycopg_connector): async def test_execute_query_sync(psycopg_connector): - @asgiref.sync.sync_to_async() def sync(): assert ( psycopg_connector.execute_query( @@ -130,7 +128,7 @@ def sync(): ) assert result == [{"obj_description": "foo"}] - await sync() + await asyncio.to_thread(sync) async def test_execute_query_interpolate(psycopg_connector): From 23ae905b8cab29323a5ceced5c99938dc82fcbcc Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 21 Oct 2024 19:37:28 +0000 Subject: [PATCH 2/4] Get rid of utility functions sync_to_async and async_to_sync --- procrastinate/cli.py | 4 ++-- procrastinate/connector.py | 9 +++++---- .../contrib/aiopg/aiopg_connector.py | 2 +- procrastinate/psycopg_connector.py | 3 ++- procrastinate/shell.py | 3 ++- procrastinate/utils.py | 20 ------------------- procrastinate/worker.py | 2 +- tests/unit/test_utils.py | 11 ---------- 8 files changed, 13 insertions(+), 41 deletions(-) diff --git a/procrastinate/cli.py b/procrastinate/cli.py index 0ea894549..b4757d899 100644 --- a/procrastinate/cli.py +++ b/procrastinate/cli.py @@ -669,9 +669,9 @@ async def shell_(app: procrastinate.App, shell_command: list[str]): ) if shell_command: - await utils.sync_to_async(shell_obj.onecmd, line=shlex.join(shell_command)) + await asyncio.to_thread(shell_obj.onecmd, line=shlex.join(shell_command)) else: - await utils.sync_to_async(shell_obj.cmdloop) + await asyncio.to_thread(shell_obj.cmdloop) def main(): diff --git a/procrastinate/connector.py b/procrastinate/connector.py index 8ca8676f6..9d080bca3 100644 --- a/procrastinate/connector.py +++ b/procrastinate/connector.py @@ -1,11 +1,12 @@ from __future__ import annotations +import asyncio from collections.abc import Awaitable, Iterable from typing import Any, Callable, Protocol from typing_extensions import LiteralString -from procrastinate import exceptions, utils +from procrastinate import exceptions Pool = Any Engine = Any @@ -91,17 +92,17 @@ async def execute_query_all_async( raise NotImplementedError def execute_query(self, query: LiteralString, **arguments: Any) -> None: - return utils.async_to_sync(self.execute_query_async, query, **arguments) + return asyncio.run(self.execute_query_async(query, **arguments)) def execute_query_one( self, query: LiteralString, **arguments: Any ) -> dict[str, Any]: - return utils.async_to_sync(self.execute_query_one_async, query, **arguments) + return asyncio.run(self.execute_query_one_async(query, **arguments)) def execute_query_all( self, query: LiteralString, **arguments: Any ) -> list[dict[str, Any]]: - return utils.async_to_sync(self.execute_query_all_async, query, **arguments) + return asyncio.run(self.execute_query_all_async(query, **arguments)) async def listen_notify( self, on_notification: Notify, channels: Iterable[str] diff --git a/procrastinate/contrib/aiopg/aiopg_connector.py b/procrastinate/contrib/aiopg/aiopg_connector.py index 80fe31718..43796dee0 100644 --- a/procrastinate/contrib/aiopg/aiopg_connector.py +++ b/procrastinate/contrib/aiopg/aiopg_connector.py @@ -198,7 +198,7 @@ async def open_async(self, pool: aiopg.Pool | None = None) -> None: @wrap_exceptions() async def _create_pool(self, pool_args: dict[str, Any]) -> aiopg.Pool: if self._sync_connector is not None: - await utils.sync_to_async(self._sync_connector.close) + await asyncio.to_thread(self._sync_connector.close) self._sync_connector = None return await aiopg.create_pool(**pool_args) diff --git a/procrastinate/psycopg_connector.py b/procrastinate/psycopg_connector.py index 24e15728f..1e2e37439 100644 --- a/procrastinate/psycopg_connector.py +++ b/procrastinate/psycopg_connector.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import contextlib import logging from collections.abc import AsyncGenerator, AsyncIterator, Iterable @@ -133,7 +134,7 @@ async def open_async( if self._sync_connector is not None: logger.debug("Closing automatically created SyncPsycopgConnector.") - await utils.sync_to_async(self._sync_connector.close) + await asyncio.to_thread(self._sync_connector.close) self._sync_connector = None if pool: diff --git a/procrastinate/shell.py b/procrastinate/shell.py index 206c78026..0a6658b87 100644 --- a/procrastinate/shell.py +++ b/procrastinate/shell.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import cmd import traceback from typing import Any @@ -37,7 +38,7 @@ def __init__( self.job_manager = job_manager def async_to_sync(self, coro: Any, **kwargs) -> Any: - return utils.async_to_sync(coro, **kwargs) + return asyncio.run(coro(**kwargs)) def onecmd(self, line): try: diff --git a/procrastinate/utils.py b/procrastinate/utils.py index 2908d472b..e993756bd 100644 --- a/procrastinate/utils.py +++ b/procrastinate/utils.py @@ -92,26 +92,6 @@ def caller_module_name(prefix: str = "procrastinate") -> str: raise exceptions.CallerModuleUnknown from exc -def async_to_sync(awaitable: Callable[..., Awaitable[T]], *args, **kwargs) -> T: - """ - Given a callable returning an awaitable, call the callable, await it - synchronously. Returns the result after it's done. - """ - - async def wrapper() -> T: - return await awaitable(*args, **kwargs) - - return asyncio.run(wrapper()) - - -async def sync_to_async(func: Callable[..., T], *args, **kwargs) -> T: - """ - Given a callable, return a callable that will call the original one in an - async context. - """ - return await asyncio.to_thread(func, *args, **kwargs) - - def causes(exc: BaseException | None): """ From a single exception with a chain of causes and contexts, make an iterable diff --git a/procrastinate/worker.py b/procrastinate/worker.py index f4227d9d8..752632c8f 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -234,7 +234,7 @@ async def ensure_async() -> Callable[..., Awaitable]: if inspect.iscoroutinefunction(task.func): await_func = task else: - await_func = functools.partial(utils.sync_to_async, task) + await_func = functools.partial(asyncio.to_thread, task) job_args = [context] if task.pass_context else [] task_result = await await_func(*job_args, **job.task_kwargs) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index c28b044f5..6d7298c41 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -40,17 +40,6 @@ def test_import_all(): assert module in sys.modules -def test_sync_await(): - result = [] - - async def coro(): - result.append(1) - - utils.async_to_sync(coro) - - assert result == [1] - - def test_causes(): e1, e2, e3 = AttributeError("foo"), KeyError("bar"), IndexError("baz") From 41517f78c1169ca7648fc14d9c2082d8dee79c2c Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 21 Oct 2024 20:07:51 +0000 Subject: [PATCH 3/4] Refactor shell functions --- procrastinate/shell.py | 105 ++++++++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 44 deletions(-) diff --git a/procrastinate/shell.py b/procrastinate/shell.py index 0a6658b87..abb12dfd9 100644 --- a/procrastinate/shell.py +++ b/procrastinate/shell.py @@ -37,9 +37,6 @@ def __init__( super().__init__() self.job_manager = job_manager - def async_to_sync(self, coro: Any, **kwargs) -> Any: - return asyncio.run(coro(**kwargs)) - def onecmd(self, line): try: return super().onecmd(line) @@ -68,8 +65,12 @@ def do_list_jobs(self, arg: str) -> None: details = kwargs.pop("details", None) is not None if "id" in kwargs: kwargs["id"] = int(kwargs["id"]) - for job in self.async_to_sync(self.job_manager.list_jobs_async, **kwargs): - print_job(job, details=details) + + async def do_list_jobs_async(): + for job in await self.job_manager.list_jobs_async(**kwargs): + print_job(job, details=details) + + asyncio.run(do_list_jobs_async()) def do_list_queues(self, arg: str) -> None: """ @@ -82,16 +83,20 @@ def do_list_queues(self, arg: str) -> None: Example: list_queues task=sums status=failed """ kwargs = parse_argument(arg) - for queue in self.async_to_sync(self.job_manager.list_queues_async, **kwargs): - print( - f"{queue['name']}: {queue['jobs_count']} jobs (" - f"todo: {queue['todo']}, " - f"doing: {queue['doing']}, " - f"succeeded: {queue['succeeded']}, " - f"failed: {queue['failed']}, " - f"cancelled: {queue['cancelled']}, " - f"aborted: {queue['aborted']})" - ) + + async def do_list_queues_async(): + for queue in await self.job_manager.list_queues_async(**kwargs): + print( + f"{queue['name']}: {queue['jobs_count']} jobs (" + f"todo: {queue['todo']}, " + f"doing: {queue['doing']}, " + f"succeeded: {queue['succeeded']}, " + f"failed: {queue['failed']}, " + f"cancelled: {queue['cancelled']}, " + f"aborted: {queue['aborted']})" + ) + + asyncio.run(do_list_queues_async()) def do_list_tasks(self, arg: str) -> None: """ @@ -104,16 +109,20 @@ def do_list_tasks(self, arg: str) -> None: Example: list_tasks queue=default status=failed """ kwargs = parse_argument(arg) - for task in self.async_to_sync(self.job_manager.list_tasks_async, **kwargs): - print( - f"{task['name']}: {task['jobs_count']} jobs (" - f"todo: {task['todo']}, " - f"doing: {task['doing']}, " - f"succeeded: {task['succeeded']}, " - f"failed: {task['failed']}, " - f"cancelled: {task['cancelled']}, " - f"aborted: {task['aborted']})" - ) + + async def do_list_tasks_async(): + for task in await self.job_manager.list_tasks_async(**kwargs): + print( + f"{task['name']}: {task['jobs_count']} jobs (" + f"todo: {task['todo']}, " + f"doing: {task['doing']}, " + f"succeeded: {task['succeeded']}, " + f"failed: {task['failed']}, " + f"cancelled: {task['cancelled']}, " + f"aborted: {task['aborted']})" + ) + + asyncio.run(do_list_tasks_async()) def do_list_locks(self, arg: str) -> None: """ @@ -126,16 +135,20 @@ def do_list_locks(self, arg: str) -> None: Example: list_locks queue=default status=todo """ kwargs = parse_argument(arg) - for lock in self.async_to_sync(self.job_manager.list_locks_async, **kwargs): - print( - f"{lock['name']}: {lock['jobs_count']} jobs (" - f"todo: {lock['todo']}, " - f"doing: {lock['doing']}, " - f"succeeded: {lock['succeeded']}, " - f"failed: {lock['failed']}, " - f"cancelled: {lock['cancelled']}, " - f"aborted: {lock['aborted']})" - ) + + async def do_list_locks_async(): + for lock in await self.job_manager.list_locks_async(**kwargs): + print( + f"{lock['name']}: {lock['jobs_count']} jobs (" + f"todo: {lock['todo']}, " + f"doing: {lock['doing']}, " + f"succeeded: {lock['succeeded']}, " + f"failed: {lock['failed']}, " + f"cancelled: {lock['cancelled']}, " + f"aborted: {lock['aborted']})" + ) + + asyncio.run(do_list_locks_async()) def do_retry(self, arg: str) -> None: """ @@ -147,14 +160,15 @@ def do_retry(self, arg: str) -> None: Example: retry 2 """ job_id = int(arg) - self.async_to_sync( - self.job_manager.retry_job_by_id_async, - job_id=job_id, - retry_at=utils.utcnow().replace(microsecond=0), - ) - (job,) = self.async_to_sync(self.job_manager.list_jobs_async, id=job_id) - print_job(job) + async def do_retry_async(): + await self.job_manager.retry_job_by_id_async( + job_id=job_id, retry_at=utils.utcnow().replace(microsecond=0) + ) + (job,) = await self.job_manager.list_jobs_async(id=job_id) + print_job(job) + + asyncio.run(do_retry_async()) def do_cancel(self, arg: str) -> None: """ @@ -168,5 +182,8 @@ def do_cancel(self, arg: str) -> None: job_id = int(arg) self.job_manager.cancel_job_by_id(job_id=job_id) - (job,) = self.async_to_sync(self.job_manager.list_jobs_async, id=job_id) - print_job(job) + async def do_cancel_async(): + (job,) = await self.job_manager.list_jobs_async(id=job_id) + print_job(job) + + asyncio.run(do_cancel_async()) From 457a762b613279ccc3d32e51fa03581ac4efe9c8 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Wed, 23 Oct 2024 15:04:12 +0000 Subject: [PATCH 4/4] Refactor shell to use methods instead of nested functions --- procrastinate/shell.py | 121 ++++++++++++++++++++--------------------- 1 file changed, 58 insertions(+), 63 deletions(-) diff --git a/procrastinate/shell.py b/procrastinate/shell.py index abb12dfd9..c0228cbf2 100644 --- a/procrastinate/shell.py +++ b/procrastinate/shell.py @@ -61,16 +61,16 @@ def do_list_jobs(self, arg: str) -> None: Example: list_jobs queue=default task=sums status=failed details """ + asyncio.run(self.do_list_jobs_async(arg)) + + async def do_list_jobs_async(self, arg: str): kwargs: dict[str, Any] = parse_argument(arg) details = kwargs.pop("details", None) is not None if "id" in kwargs: kwargs["id"] = int(kwargs["id"]) - async def do_list_jobs_async(): - for job in await self.job_manager.list_jobs_async(**kwargs): - print_job(job, details=details) - - asyncio.run(do_list_jobs_async()) + for job in await self.job_manager.list_jobs_async(**kwargs): + print_job(job, details=details) def do_list_queues(self, arg: str) -> None: """ @@ -82,21 +82,20 @@ def do_list_queues(self, arg: str) -> None: Example: list_queues task=sums status=failed """ - kwargs = parse_argument(arg) + asyncio.run(self.do_list_queues_async(arg)) - async def do_list_queues_async(): - for queue in await self.job_manager.list_queues_async(**kwargs): - print( - f"{queue['name']}: {queue['jobs_count']} jobs (" - f"todo: {queue['todo']}, " - f"doing: {queue['doing']}, " - f"succeeded: {queue['succeeded']}, " - f"failed: {queue['failed']}, " - f"cancelled: {queue['cancelled']}, " - f"aborted: {queue['aborted']})" - ) - - asyncio.run(do_list_queues_async()) + async def do_list_queues_async(self, arg: str): + kwargs = parse_argument(arg) + for queue in await self.job_manager.list_queues_async(**kwargs): + print( + f"{queue['name']}: {queue['jobs_count']} jobs (" + f"todo: {queue['todo']}, " + f"doing: {queue['doing']}, " + f"succeeded: {queue['succeeded']}, " + f"failed: {queue['failed']}, " + f"cancelled: {queue['cancelled']}, " + f"aborted: {queue['aborted']})" + ) def do_list_tasks(self, arg: str) -> None: """ @@ -108,21 +107,20 @@ def do_list_tasks(self, arg: str) -> None: Example: list_tasks queue=default status=failed """ - kwargs = parse_argument(arg) - - async def do_list_tasks_async(): - for task in await self.job_manager.list_tasks_async(**kwargs): - print( - f"{task['name']}: {task['jobs_count']} jobs (" - f"todo: {task['todo']}, " - f"doing: {task['doing']}, " - f"succeeded: {task['succeeded']}, " - f"failed: {task['failed']}, " - f"cancelled: {task['cancelled']}, " - f"aborted: {task['aborted']})" - ) + asyncio.run(self.do_list_tasks_async(arg)) - asyncio.run(do_list_tasks_async()) + async def do_list_tasks_async(self, arg: str): + kwargs = parse_argument(arg) + for task in await self.job_manager.list_tasks_async(**kwargs): + print( + f"{task['name']}: {task['jobs_count']} jobs (" + f"todo: {task['todo']}, " + f"doing: {task['doing']}, " + f"succeeded: {task['succeeded']}, " + f"failed: {task['failed']}, " + f"cancelled: {task['cancelled']}, " + f"aborted: {task['aborted']})" + ) def do_list_locks(self, arg: str) -> None: """ @@ -134,21 +132,20 @@ def do_list_locks(self, arg: str) -> None: Example: list_locks queue=default status=todo """ - kwargs = parse_argument(arg) + asyncio.run(self.do_list_locks_async(arg)) - async def do_list_locks_async(): - for lock in await self.job_manager.list_locks_async(**kwargs): - print( - f"{lock['name']}: {lock['jobs_count']} jobs (" - f"todo: {lock['todo']}, " - f"doing: {lock['doing']}, " - f"succeeded: {lock['succeeded']}, " - f"failed: {lock['failed']}, " - f"cancelled: {lock['cancelled']}, " - f"aborted: {lock['aborted']})" - ) - - asyncio.run(do_list_locks_async()) + async def do_list_locks_async(self, arg: str): + kwargs = parse_argument(arg) + for lock in await self.job_manager.list_locks_async(**kwargs): + print( + f"{lock['name']}: {lock['jobs_count']} jobs (" + f"todo: {lock['todo']}, " + f"doing: {lock['doing']}, " + f"succeeded: {lock['succeeded']}, " + f"failed: {lock['failed']}, " + f"cancelled: {lock['cancelled']}, " + f"aborted: {lock['aborted']})" + ) def do_retry(self, arg: str) -> None: """ @@ -159,16 +156,15 @@ def do_retry(self, arg: str) -> None: Example: retry 2 """ - job_id = int(arg) - - async def do_retry_async(): - await self.job_manager.retry_job_by_id_async( - job_id=job_id, retry_at=utils.utcnow().replace(microsecond=0) - ) - (job,) = await self.job_manager.list_jobs_async(id=job_id) - print_job(job) + asyncio.run(self.do_retry_async(arg)) - asyncio.run(do_retry_async()) + async def do_retry_async(self, arg: str): + job_id = int(arg) + await self.job_manager.retry_job_by_id_async( + job_id=job_id, retry_at=utils.utcnow().replace(microsecond=0) + ) + (job,) = await self.job_manager.list_jobs_async(id=job_id) + print_job(job) def do_cancel(self, arg: str) -> None: """ @@ -179,11 +175,10 @@ def do_cancel(self, arg: str) -> None: Example: cancel 3 """ - job_id = int(arg) - self.job_manager.cancel_job_by_id(job_id=job_id) + asyncio.run(self.do_cancel_async(arg)) - async def do_cancel_async(): - (job,) = await self.job_manager.list_jobs_async(id=job_id) - print_job(job) - - asyncio.run(do_cancel_async()) + async def do_cancel_async(self, arg: str): + job_id = int(arg) + await self.job_manager.cancel_job_by_id_async(job_id=job_id) + (job,) = await self.job_manager.list_jobs_async(id=job_id) + print_job(job)