Skip to content

Commit

Permalink
Switch to Python 3.12-style wait_for
Browse files Browse the repository at this point in the history
`wait_for` has been a mess with respect to cancellations
consistently in `asyncio`.  Hopefully the approach taken in
Python 3.12 solves the issues, so adopt that instead of trying
to "fix" `wait_for` with wrappers on older Pythons.  Use `async_timeout`
as a polyfill on pre-3.11 Python.

Closes: #1056
Closes: #1052
Fixes: #955
  • Loading branch information
elprans committed Oct 9, 2023
1 parent 4ddb039 commit 9f44bed
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 18 deletions.
93 changes: 93 additions & 0 deletions asyncpg/_asyncio_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Backports from Python/Lib/asyncio for older Pythons
#
# Copyright (c) 2001-2023 Python Software Foundation; All Rights Reserved
#
# SPDX-License-Identifier: PSF-2.0


import asyncio
import functools
import sys

from . import events
from . import exceptions


if sys.version_info < (3, 11):
from async_timeout import timeout as timeout_ctx
else:
from asyncio import timeout as timeout_ctx

from async_timeout import timeout as timeout_ctx_2


async def wait_for(fut, timeout):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
If the task supresses the cancellation and returns a value instead,
that value is returned.
This function is a coroutine.
"""
# The special case for timeout <= 0 is for the following case:
#
# async def test_waitfor():
# func_started = False
#
# async def func():
# nonlocal func_started
# func_started = True
#
# try:
# await asyncio.wait_for(func(), 0)
# except asyncio.TimeoutError:
# assert not func_started
# else:
# assert False
#
# asyncio.run(test_waitfor())

if timeout is not None and timeout <= 0:
fut = asyncio.ensure_future(fut)

if fut.done():
return fut.result()

await _cancel_and_wait(fut)
try:
return fut.result()
except exceptions.CancelledError as exc:
raise TimeoutError from exc

async with timeout_ctx(timeout):
return await fut


async def _cancel_and_wait(fut):
"""Cancel the *fut* future or task and wait until it completes."""

loop = events.get_running_loop()
waiter = loop.create_future()
cb = functools.partial(_release_waiter, waiter)
fut.add_done_callback(cb)

try:
fut.cancel()
# We cannot wait on *fut* directly to make
# sure _cancel_and_wait itself is reliably cancellable.
await waiter
finally:
fut.remove_done_callback(cb)


def _release_waiter(waiter, *args):
if not waiter.done():
waiter.set_result(None)
20 changes: 5 additions & 15 deletions asyncpg/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0


import asyncio
import pathlib
import platform
import typing
import sys


SYSTEM = platform.uname().system
Expand Down Expand Up @@ -49,17 +49,7 @@ async def wait_closed(stream):
pass


# Workaround for https://bugs.python.org/issue37658
async def wait_for(fut, timeout):
if timeout is None:
return await fut

fut = asyncio.ensure_future(fut)

try:
return await asyncio.wait_for(fut, timeout)
except asyncio.CancelledError:
if fut.done():
return fut.result()
else:
raise
if sys.version_info < (3, 12):
from ._asyncio_compat import wait_for as wait_for # noqa: F401
else:
from asyncio import wait_for as wait_for # noqa: F401
6 changes: 3 additions & 3 deletions asyncpg/protocol/protocol.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ cdef class BaseProtocol(CoreProtocol):

while more:
with timer:
await asyncio.wait_for(
await compat.wait_for(
self.writing_allowed.wait(),
timeout=timer.get_remaining_budget())
# On Windows the above event somehow won't allow context
Expand Down Expand Up @@ -383,7 +383,7 @@ cdef class BaseProtocol(CoreProtocol):
if buffer:
try:
with timer:
await asyncio.wait_for(
await compat.wait_for(
sink(buffer),
timeout=timer.get_remaining_budget())
except (Exception, asyncio.CancelledError) as ex:
Expand Down Expand Up @@ -511,7 +511,7 @@ cdef class BaseProtocol(CoreProtocol):
with timer:
await self.writing_allowed.wait()
with timer:
chunk = await asyncio.wait_for(
chunk = await compat.wait_for(
iterator.__anext__(),
timeout=timer.get_remaining_budget())
self._write_copy_data_msg(chunk)
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython",
"Topic :: Database :: Front-Ends",
]
dependencies = [
'async_timeout>=4.0.3; python_version < "3.12.0"'
]

[project.urls]
github = "https://github.com/MagicStack/asyncpg"
Expand Down

0 comments on commit 9f44bed

Please sign in to comment.