Skip to content

Commit

Permalink
Add a workaround for bpo-37658
Browse files Browse the repository at this point in the history
`asyncio.wait_for()` currently has a bug where it raises a
`CancelledError` even when the wrapped awaitable has completed.
The upstream fix is in python/cpython#37658.  This adds a workaround
until the aforementioned PR is merged, backported and released.

Fixes: #467
Fixes: #547
Related: #468
Supersedes: #548
  • Loading branch information
elprans committed Aug 16, 2020
1 parent db4f1a6 commit cd8feab
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
16 changes: 16 additions & 0 deletions asyncpg/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,19 @@ async def wait_closed(stream):
# On Windows wait_closed() sometimes propagates
# ConnectionResetError which is totally unnecessary.
pass


# Workaround for https://bugs.python.org/issue37658
async def wait_for(fut, timeout):
if timeout is None:
return await fut

fut = asyncio.ensure_future(fut)

try:
return await asyncio.wait_for(fut, timeout)
except asyncio.CancelledError:
if fut.done():
return fut.result()
else:
raise
18 changes: 2 additions & 16 deletions asyncpg/connect_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,18 +636,13 @@ async def _connect_addr(

connector = asyncio.ensure_future(connector)
before = time.monotonic()
try:
tr, pr = await asyncio.wait_for(
connector, timeout=timeout)
except asyncio.CancelledError:
connector.add_done_callback(_close_leaked_connection)
raise
tr, pr = await compat.wait_for(connector, timeout=timeout)
timeout -= time.monotonic() - before

try:
if timeout <= 0:
raise asyncio.TimeoutError
await asyncio.wait_for(connected, timeout=timeout)
await compat.wait_for(connected, timeout=timeout)
except (Exception, asyncio.CancelledError):
tr.close()
raise
Expand Down Expand Up @@ -745,12 +740,3 @@ def _create_future(loop):
return asyncio.Future(loop=loop)
else:
return create_future()


def _close_leaked_connection(fut):
try:
tr, pr = fut.result()
if tr:
tr.close()
except asyncio.CancelledError:
pass # hide the exception
5 changes: 3 additions & 2 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import time
import warnings

from . import compat
from . import connection
from . import connect_utils
from . import exceptions
Expand Down Expand Up @@ -198,7 +199,7 @@ async def release(self, timeout):
# If the connection is in cancellation state,
# wait for the cancellation
started = time.monotonic()
await asyncio.wait_for(
await compat.wait_for(
self._con._protocol._wait_for_cancellation(),
budget)
if budget is not None:
Expand Down Expand Up @@ -623,7 +624,7 @@ async def _acquire_impl():
if timeout is None:
return await _acquire_impl()
else:
return await asyncio.wait_for(
return await compat.wait_for(
_acquire_impl(), timeout=timeout)

async def release(self, connection, *, timeout=None):
Expand Down

0 comments on commit cd8feab

Please sign in to comment.