Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove asgiref dependency from non Django code #1226

Open
wants to merge 4 commits into
base: v3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/discussions.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ When you define an asynchronous connector, Procrastinate will try to
seamlessly give you the right connector for your context. When you call
the synchronous API, it will either create a sync connector based on your
async connector, or let you use the async connector directly with
`asgiref.sync.async_to_sync`.
`asyncio.run`.
:::

For running jobs, support of synchronous task functions is through
`asgiref.sync.sync_to_async`. This means your synchronous function will be
`asyncio.to_thread`. This means your synchronous function will be
executed by an asynchronous worker in a thread. Because of the [Global
Interpreter Lock][global interpreter lock], you will not benefit from parallelism, but you will still be able
to parallelize (thread-safe) I/Os.
Expand Down Expand Up @@ -290,7 +290,7 @@ Procrastinate:
driver. Under the hood, we have factored as much as possible the non-I/O
parts of the code, so that the synchronous and asynchronous versions are
only separate in the way they handle I/Os.
- For executing a synchronous task: we use `asgiref.sync.sync_to_async` to run the
- For executing a synchronous task: we use `asyncio.to_thread` to run the
synchronous code in a thread.
- There are a few case where we facilitate calling Procrastinate from
synchronous codebases, by providing a synchronous API, where we'll create an
Expand Down
6 changes: 3 additions & 3 deletions docs/howto/advanced/sync_defer.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ be using a synchronous connector.

If you request the synchronous connector after opening the app, you will get
the asynchronous connector, with a compatibility layer to make synchronous
operations. This will only work if you call it inside a function decorated
with `asgiref.sync.sync_to_async` (such as inside a sync job). Otherwise,
you will likely get a `RuntimeError`.
operations. This will only work if you call it inside a function that runs
in its own thread (such as inside a sync job). Otherwise, you will likely
get a `RuntimeError`.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this statement is still valid.

:::
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions procrastinate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,9 @@ async def shell_(app: procrastinate.App, shell_command: list[str]):
)

if shell_command:
await utils.sync_to_async(shell_obj.onecmd, line=shlex.join(shell_command))
await asyncio.to_thread(shell_obj.onecmd, line=shlex.join(shell_command))
else:
await utils.sync_to_async(shell_obj.cmdloop)
await asyncio.to_thread(shell_obj.cmdloop)


def main():
Expand Down
9 changes: 5 additions & 4 deletions procrastinate/connector.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from __future__ import annotations

import asyncio
from collections.abc import Awaitable, Iterable
from typing import Any, Callable, Protocol

from typing_extensions import LiteralString

from procrastinate import exceptions, utils
from procrastinate import exceptions

Pool = Any
Engine = Any
Expand Down Expand Up @@ -91,17 +92,17 @@ async def execute_query_all_async(
raise NotImplementedError

def execute_query(self, query: LiteralString, **arguments: Any) -> None:
return utils.async_to_sync(self.execute_query_async, query, **arguments)
return asyncio.run(self.execute_query_async(query, **arguments))

def execute_query_one(
self, query: LiteralString, **arguments: Any
) -> dict[str, Any]:
return utils.async_to_sync(self.execute_query_one_async, query, **arguments)
return asyncio.run(self.execute_query_one_async(query, **arguments))

def execute_query_all(
self, query: LiteralString, **arguments: Any
) -> list[dict[str, Any]]:
return utils.async_to_sync(self.execute_query_all_async, query, **arguments)
return asyncio.run(self.execute_query_all_async(query, **arguments))

async def listen_notify(
self, on_notification: Notify, channels: Iterable[str]
Expand Down
2 changes: 1 addition & 1 deletion procrastinate/contrib/aiopg/aiopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ async def open_async(self, pool: aiopg.Pool | None = None) -> None:
@wrap_exceptions()
async def _create_pool(self, pool_args: dict[str, Any]) -> aiopg.Pool:
if self._sync_connector is not None:
await utils.sync_to_async(self._sync_connector.close)
await asyncio.to_thread(self._sync_connector.close)
self._sync_connector = None

return await aiopg.create_pool(**pool_args)
Expand Down
3 changes: 2 additions & 1 deletion procrastinate/psycopg_connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import contextlib
import logging
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
Expand Down Expand Up @@ -133,7 +134,7 @@ async def open_async(

if self._sync_connector is not None:
logger.debug("Closing automatically created SyncPsycopgConnector.")
await utils.sync_to_async(self._sync_connector.close)
await asyncio.to_thread(self._sync_connector.close)
self._sync_connector = None

if pool:
Expand Down
45 changes: 29 additions & 16 deletions procrastinate/shell.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import cmd
import traceback
from typing import Any
Expand Down Expand Up @@ -36,9 +37,6 @@ def __init__(
super().__init__()
self.job_manager = job_manager

def async_to_sync(self, coro: Any, **kwargs) -> Any:
return utils.async_to_sync(coro, **kwargs)

def onecmd(self, line):
try:
return super().onecmd(line)
Expand All @@ -63,11 +61,15 @@ def do_list_jobs(self, arg: str) -> None:

Example: list_jobs queue=default task=sums status=failed details
"""
asyncio.run(self.do_list_jobs_async(arg))

async def do_list_jobs_async(self, arg: str):
kwargs: dict[str, Any] = parse_argument(arg)
details = kwargs.pop("details", None) is not None
if "id" in kwargs:
kwargs["id"] = int(kwargs["id"])
for job in self.async_to_sync(self.job_manager.list_jobs_async, **kwargs):

for job in await self.job_manager.list_jobs_async(**kwargs):
print_job(job, details=details)

def do_list_queues(self, arg: str) -> None:
Expand All @@ -80,8 +82,11 @@ def do_list_queues(self, arg: str) -> None:

Example: list_queues task=sums status=failed
"""
asyncio.run(self.do_list_queues_async(arg))

async def do_list_queues_async(self, arg: str):
kwargs = parse_argument(arg)
for queue in self.async_to_sync(self.job_manager.list_queues_async, **kwargs):
for queue in await self.job_manager.list_queues_async(**kwargs):
print(
f"{queue['name']}: {queue['jobs_count']} jobs ("
f"todo: {queue['todo']}, "
Expand All @@ -102,8 +107,11 @@ def do_list_tasks(self, arg: str) -> None:

Example: list_tasks queue=default status=failed
"""
asyncio.run(self.do_list_tasks_async(arg))

async def do_list_tasks_async(self, arg: str):
kwargs = parse_argument(arg)
for task in self.async_to_sync(self.job_manager.list_tasks_async, **kwargs):
for task in await self.job_manager.list_tasks_async(**kwargs):
print(
f"{task['name']}: {task['jobs_count']} jobs ("
f"todo: {task['todo']}, "
Expand All @@ -124,8 +132,11 @@ def do_list_locks(self, arg: str) -> None:

Example: list_locks queue=default status=todo
"""
asyncio.run(self.do_list_locks_async(arg))

async def do_list_locks_async(self, arg: str):
kwargs = parse_argument(arg)
for lock in self.async_to_sync(self.job_manager.list_locks_async, **kwargs):
for lock in await self.job_manager.list_locks_async(**kwargs):
print(
f"{lock['name']}: {lock['jobs_count']} jobs ("
f"todo: {lock['todo']}, "
Expand All @@ -145,14 +156,14 @@ def do_retry(self, arg: str) -> None:

Example: retry 2
"""
asyncio.run(self.do_retry_async(arg))

async def do_retry_async(self, arg: str):
job_id = int(arg)
self.async_to_sync(
self.job_manager.retry_job_by_id_async,
job_id=job_id,
retry_at=utils.utcnow().replace(microsecond=0),
await self.job_manager.retry_job_by_id_async(
job_id=job_id, retry_at=utils.utcnow().replace(microsecond=0)
)

(job,) = self.async_to_sync(self.job_manager.list_jobs_async, id=job_id)
(job,) = await self.job_manager.list_jobs_async(id=job_id)
print_job(job)

def do_cancel(self, arg: str) -> None:
Expand All @@ -164,8 +175,10 @@ def do_cancel(self, arg: str) -> None:

Example: cancel 3
"""
job_id = int(arg)
self.job_manager.cancel_job_by_id(job_id=job_id)
asyncio.run(self.do_cancel_async(arg))

(job,) = self.async_to_sync(self.job_manager.list_jobs_async, id=job_id)
async def do_cancel_async(self, arg: str):
job_id = int(arg)
await self.job_manager.cancel_job_by_id_async(job_id=job_id)
(job,) = await self.job_manager.list_jobs_async(id=job_id)
print_job(job)
17 changes: 0 additions & 17 deletions procrastinate/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
)

import dateutil.parser
from asgiref import sync

from procrastinate import exceptions
from procrastinate.types import TimeDeltaParams
Expand Down Expand Up @@ -93,22 +92,6 @@ def caller_module_name(prefix: str = "procrastinate") -> str:
raise exceptions.CallerModuleUnknown from exc


def async_to_sync(awaitable: Callable[..., Awaitable[T]], *args, **kwargs) -> T:
"""
Given a callable returning an awaitable, call the callable, await it
synchronously. Returns the result after it's done.
"""
return sync.async_to_sync(awaitable)(*args, **kwargs)


async def sync_to_async(func: Callable[..., T], *args, **kwargs) -> T:
"""
Given a callable, return a callable that will call the original one in an
async context.
"""
return await sync.sync_to_async(func, thread_sensitive=False)(*args, **kwargs)


def causes(exc: BaseException | None):
"""
From a single exception with a chain of causes and contexts, make an iterable
Expand Down
2 changes: 1 addition & 1 deletion procrastinate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ async def ensure_async() -> Callable[..., Awaitable]:
if inspect.iscoroutinefunction(task.func):
await_func = task
else:
await_func = functools.partial(utils.sync_to_async, task)
await_func = functools.partial(asyncio.to_thread, task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be interesting to discover if doing that causes issues with Django applications (https://docs.djangoproject.com/en/5.1/topics/async/#async-safety).

Also, according to https://docs.djangoproject.com/en/5.1/topics/async/#sync-to-async , the use of asgiref.sync_to_async with thread_sensitive=True (the default) needs to be done in a async_to_sync wrapper to work correctly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be interesting to discover if doing that causes issues with Django applications (https://docs.djangoproject.com/en/5.1/topics/async/#async-safety).

I don't think so, as the worker manages its own connections (independent of Django), and only the user code would use Django connections.

Copy link
Contributor

@onlyann onlyann Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the user code (the task implementation) that is being wrapped here.

What happens then when the user code uses Django connections?
How can they make it work when the thread the code runs in is not the Django main thread?

Copy link
Member Author

@medihack medihack Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, it shouldn't matter if the Django connection wasn't initially created in the main thread but directly in the user thread. There is also no request/response cycle like in a Django view that messes around with those connections. And as you already recognized, our previous sync_to_async (with back then thread_sensitive=True) also didn't use the main thread as the Django command never started the worker with async_to_sync. All sync tasks (back then when thread_sensitive was True) used a single thread, but it wasn't the main thread.
Since we switched to thread_sensitive=False all sync tasks use their own separate thread. This should also work as Django connections are thread local. It should work as long as the user doesn't start threads on their own and reuses the connections there (the same as in Django itself).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, as long as we don't try to support thread_sensitive=true, that should be fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and I also think that using asyncio.to_thread is, in our scenario, the same as using sync_to_async with thread_sensitive=False as we always started our worker in the Django command with asyncio.run and never used async_to_sync there.


job_args = [context] if task.pass_context else []
task_result = await await_func(*job_args, **job.task_kwargs)
Expand Down
6 changes: 1 addition & 5 deletions procrastinate_demos/demo_async/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
import json
import logging

import asgiref.sync

from . import app as app_module
from . import tasks

ainput = asgiref.sync.sync_to_async(input)


async def main():
logging.info("Running app in async context")
Expand All @@ -22,7 +18,7 @@ async def main():
print("Enter an empty line to quit")
print()
while True:
response = (await ainput("Your input: ")).strip()
response = (await asyncio.to_thread(input, "Your input: ")).strip()
if not response:
break
command, *args = (response).split(maxsplit=1)
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ procrastinate = 'procrastinate.cli:main'
python = "^3.9"
aiopg = { version = "*", optional = true }
anyio = "*"
asgiref = "*"
asgiref = { version = "*", optional = true }
attrs = "*"
contextlib2 = { version = "*", python = "<3.10" }
croniter = "*"
Expand All @@ -39,7 +39,7 @@ typing-extensions = "*"
sphinx = { version = "*", optional = true }

[tool.poetry.extras]
django = ["django"]
django = ["asgiref", "django"]
sqlalchemy = ["sqlalchemy"]
aiopg = ["aiopg", "psycopg2-binary"]
psycopg2 = ["psycopg2-binary"]
Expand Down
3 changes: 1 addition & 2 deletions tests/acceptance/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time

import pytest
from asgiref.sync import sync_to_async

import procrastinate
from procrastinate.contrib import psycopg2
Expand Down Expand Up @@ -70,7 +69,7 @@ def _inner_sum_task_sync(a, b):
sum_results.append(a + b)

# Only works if the worker runs the sync task in a separate thread
await sync_to_async(_inner_sum_task_sync)(a, b)
await asyncio.to_thread(_inner_sum_task_sync, a, b)

asyncio.run(_sum_task_async(a, b))

Expand Down
4 changes: 1 addition & 3 deletions tests/integration/contrib/aiopg/test_aiopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import functools
import json

import asgiref.sync
import attr
import pytest

Expand Down Expand Up @@ -101,7 +100,6 @@ async def test_get_sync_connector(aiopg_connector_factory):

aiopg_connector = await aiopg_connector_factory(open=False)

@asgiref.sync.sync_to_async
def f():
sync_conn = aiopg_connector.get_sync_connector()
sync_conn.open()
Expand All @@ -110,7 +108,7 @@ def f():
finally:
sync_conn.close()

await f()
await asyncio.to_thread(f)
assert list(result[0].values()) == [1]


Expand Down
4 changes: 1 addition & 3 deletions tests/integration/test_psycopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import functools
import json

import asgiref.sync
import attr
import pytest

Expand Down Expand Up @@ -112,7 +111,6 @@ async def test_wrap_exceptions(psycopg_connector):


async def test_execute_query_sync(psycopg_connector):
@asgiref.sync.sync_to_async()
def sync():
assert (
psycopg_connector.execute_query(
Expand All @@ -130,7 +128,7 @@ def sync():
)
assert result == [{"obj_description": "foo"}]

await sync()
await asyncio.to_thread(sync)


async def test_execute_query_interpolate(psycopg_connector):
Expand Down
11 changes: 0 additions & 11 deletions tests/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,6 @@ def test_import_all():
assert module in sys.modules


def test_sync_await():
result = []

async def coro():
result.append(1)

utils.async_to_sync(coro)

assert result == [1]


def test_causes():
e1, e2, e3 = AttributeError("foo"), KeyError("bar"), IndexError("baz")

Expand Down
Loading