Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-124958: fix asyncio.TaskGroup and _PyFuture refcycles #124959

Merged
merged 4 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Lib/asyncio/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ def result(self):
the future is done and has an exception set, this exception is raised.
"""
if self._state == _CANCELLED:
exc = self._make_cancelled_error()
raise exc
raise self._make_cancelled_error()
if self._state != _FINISHED:
raise exceptions.InvalidStateError('Result is not ready.')
self.__log_traceback = False
Expand All @@ -208,8 +207,7 @@ def exception(self):
InvalidStateError.
"""
if self._state == _CANCELLED:
exc = self._make_cancelled_error()
raise exc
raise self._make_cancelled_error()
if self._state != _FINISHED:
raise exceptions.InvalidStateError('Exception is not set.')
self.__log_traceback = False
Expand Down
161 changes: 85 additions & 76 deletions Lib/asyncio/taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,94 +66,103 @@ async def __aenter__(self):
return self

async def __aexit__(self, et, exc, tb):
self._exiting = True
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this huge diff. Can you instead factor out the code of __aexit__ in a helper function (so that code keeps all of the existing indentation) and nest a call to it in try..finally in __aexit__?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to delete locals as well as instance attributes so it's not possible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is. Instance attributes are not a problem at all. Locals are solvable too:

try:
    raise propagate_cancellation_error
finally:
    del propagate_cancellation_error

Copy link
Contributor Author

@graingert graingert Oct 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed the changes but it introduces a bunch of extra places to delete things, so I prefer the one big try/finally. You can also just view the diff with whitespace hidden: 1a4f2b5?w=1 which should look like this:
image

self._exiting = True

if (exc is not None and
self._is_base_error(exc) and
self._base_error is None):
self._base_error = exc
if (exc is not None and
self._is_base_error(exc) and
self._base_error is None):
self._base_error = exc

if et is not None and issubclass(et, exceptions.CancelledError):
propagate_cancellation_error = exc
else:
propagate_cancellation_error = None
if et is not None and issubclass(et, exceptions.CancelledError):
propagate_cancellation_error = exc
else:
propagate_cancellation_error = None

if et is not None:
if not self._aborting:
# Our parent task is being cancelled:
#
# async with TaskGroup() as g:
# g.create_task(...)
# await ... # <- CancelledError
#
# or there's an exception in "async with":
#
# async with TaskGroup() as g:
# g.create_task(...)
# 1 / 0
#
self._abort()

# We use while-loop here because "self._on_completed_fut"
# can be cancelled multiple times if our parent task
# is being cancelled repeatedly (or even once, when
# our own cancellation is already in progress)
while self._tasks:
if self._on_completed_fut is None:
self._on_completed_fut = self._loop.create_future()

try:
await self._on_completed_fut
except exceptions.CancelledError as ex:
if et is not None:
if not self._aborting:
# Our parent task is being cancelled:
#
# async def wrapper():
# async with TaskGroup() as g:
# g.create_task(foo)
# async with TaskGroup() as g:
# g.create_task(...)
# await ... # <- CancelledError
#
# or there's an exception in "async with":
#
# async with TaskGroup() as g:
# g.create_task(...)
# 1 / 0
#
# "wrapper" is being cancelled while "foo" is
# still running.
propagate_cancellation_error = ex
self._abort()

self._on_completed_fut = None

assert not self._tasks

if self._base_error is not None:
raise self._base_error

if self._parent_cancel_requested:
# If this flag is set we *must* call uncancel().
if self._parent_task.uncancel() == 0:
# If there are no pending cancellations left,
# don't propagate CancelledError.
propagate_cancellation_error = None

# Propagate CancelledError if there is one, except if there
# are other errors -- those have priority.
if propagate_cancellation_error is not None and not self._errors:
raise propagate_cancellation_error

if et is not None and not issubclass(et, exceptions.CancelledError):
self._errors.append(exc)

if self._errors:
# If the parent task is being cancelled from the outside
# of the taskgroup, un-cancel and re-cancel the parent task,
# which will keep the cancel count stable.
if self._parent_task.cancelling():
self._parent_task.uncancel()
self._parent_task.cancel()
# We use while-loop here because "self._on_completed_fut"
# can be cancelled multiple times if our parent task
# is being cancelled repeatedly (or even once, when
# our own cancellation is already in progress)
while self._tasks:
if self._on_completed_fut is None:
self._on_completed_fut = self._loop.create_future()

try:
await self._on_completed_fut
except exceptions.CancelledError as ex:
if not self._aborting:
# Our parent task is being cancelled:
#
# async def wrapper():
# async with TaskGroup() as g:
# g.create_task(foo)
#
# "wrapper" is being cancelled while "foo" is
# still running.
propagate_cancellation_error = ex
self._abort()

self._on_completed_fut = None

assert not self._tasks

if self._base_error is not None:
raise self._base_error

if self._parent_cancel_requested:
# If this flag is set we *must* call uncancel().
if self._parent_task.uncancel() == 0:
# If there are no pending cancellations left,
# don't propagate CancelledError.
propagate_cancellation_error = None

# Propagate CancelledError if there is one, except if there
# are other errors -- those have priority.
if propagate_cancellation_error is not None and not self._errors:
raise propagate_cancellation_error

if et is not None and not issubclass(et, exceptions.CancelledError):
self._errors.append(exc)

if self._errors:
# If the parent task is being cancelled from the outside
# of the taskgroup, un-cancel and re-cancel the parent task,
# which will keep the cancel count stable.
if self._parent_task.cancelling():
self._parent_task.uncancel()
self._parent_task.cancel()
raise BaseExceptionGroup(
'unhandled errors in a TaskGroup',
self._errors,
) from None
finally:
# Exceptions are heavy objects that can have object
# cycles (bad for GC); let's not keep a reference to
# a bunch of them.
try:
me = BaseExceptionGroup('unhandled errors in a TaskGroup', self._errors)
raise me from None
finally:
self._errors = None
propagate_cancellation_error = None
self._parent_task = None
self._errors = None
self._base_error = None
et = None
exc = None
tb = None


def create_task(self, coro, *, name=None, context=None):
"""Create a new task in this group and return it.
Expand Down
22 changes: 22 additions & 0 deletions Lib/test/test_asyncio/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,28 @@ def __del__(self):
fut = self._new_future(loop=self.loop)
fut.set_result(Evil())

def test_future_cancelled_result_refcycles(self):
f = self._new_future(loop=self.loop)
f.cancel()
exc = None
try:
f.result()
except asyncio.CancelledError as e:
exc = e
self.assertIsNotNone(exc)
self.assertListEqual(gc.get_referrers(exc), [])

def test_future_cancelled_exception_refcycles(self):
f = self._new_future(loop=self.loop)
f.cancel()
exc = None
try:
f.exception()
except asyncio.CancelledError as e:
exc = e
self.assertIsNotNone(exc)
self.assertListEqual(gc.get_referrers(exc), [])


@unittest.skipUnless(hasattr(futures, '_CFuture'),
'requires the C _asyncio module')
Expand Down
92 changes: 90 additions & 2 deletions Lib/test/test_asyncio/test_taskgroups.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Adapted with permission from the EdgeDB project;
# license: PSFL.


import gc
import asyncio
import contextvars
import contextlib
Expand All @@ -11,7 +11,6 @@

from test.test_asyncio.utils import await_without_task


# To prevent a warning "test altered the execution environment"
def tearDownModule():
asyncio.set_event_loop_policy(None)
Expand Down Expand Up @@ -899,6 +898,95 @@ async def outer():

await outer()

async def test_exception_refcycles_direct(self):
"""Test that TaskGroup doesn't keep a reference to the raised ExceptionGroup"""
tg = asyncio.TaskGroup()
exc = None

class _Done(Exception):
pass

try:
async with tg:
raise _Done
except ExceptionGroup as e:
exc = e

self.assertIsNotNone(exc)
self.assertListEqual(gc.get_referrers(exc), [])


async def test_exception_refcycles_errors(self):
"""Test that TaskGroup deletes self._errors, and __aexit__ args"""
tg = asyncio.TaskGroup()
exc = None

class _Done(Exception):
pass

try:
async with tg:
raise _Done
except* _Done as excs:
exc = excs.exceptions[0]

self.assertIsInstance(exc, _Done)
self.assertListEqual(gc.get_referrers(exc), [])


async def test_exception_refcycles_parent_task(self):
"""Test that TaskGroup deletes self._parent_task"""
tg = asyncio.TaskGroup()
exc = None

class _Done(Exception):
pass

async def coro_fn():
async with tg:
raise _Done

try:
async with asyncio.TaskGroup() as tg2:
tg2.create_task(coro_fn())
except* _Done as excs:
exc = excs.exceptions[0].exceptions[0]

self.assertIsInstance(exc, _Done)
self.assertListEqual(gc.get_referrers(exc), [])

async def test_exception_refcycles_propagate_cancellation_error(self):
"""Test that TaskGroup deletes propagate_cancellation_error"""
tg = asyncio.TaskGroup()
exc = None

try:
async with asyncio.timeout(-1):
async with tg:
await asyncio.sleep(0)
except TimeoutError as e:
exc = e.__cause__

self.assertIsInstance(exc, asyncio.CancelledError)
self.assertListEqual(gc.get_referrers(exc), [])

async def test_exception_refcycles_base_error(self):
"""Test that TaskGroup deletes self._base_error"""
class MyKeyboardInterrupt(KeyboardInterrupt):
pass

tg = asyncio.TaskGroup()
exc = None

try:
async with tg:
raise MyKeyboardInterrupt
except MyKeyboardInterrupt as e:
exc = e

self.assertIsNotNone(exc)
self.assertListEqual(gc.get_referrers(exc), [])


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix refcycles in exceptions raised from :class:`asyncio.TaskGroup` and the python implementation of :class:`asyncio.Future`
Loading