Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for async generator finalization #1564

Merged
merged 23 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
26f1aea
Async generator hooks, simpler approach
oremanj Jun 10, 2020
ac3e46d
blacken
oremanj Jun 10, 2020
2947de7
Respond to review comments + add more tests
oremanj Jun 25, 2020
d198ed2
blacken
oremanj Jun 25, 2020
b720303
Merge remote-tracking branch 'origin/master' into asyncgenhooks-basic
oremanj Jun 25, 2020
45c2eee
flake8
oremanj Jun 25, 2020
2d81bb0
Fix mismerge with master
oremanj Jun 25, 2020
5a37f78
Work correctly in -Werror mode too
oremanj Jun 25, 2020
3acf34b
Handle asyncgens correctly when Trio is the guest of an asyncio host
oremanj Jun 25, 2020
80e8ab3
Fix 3.6
oremanj Jun 25, 2020
cfdb850
Make tests pass on pypy 7.2 which doesn't run firstiter hooks
oremanj Jun 25, 2020
153ce13
Hopefully resolve coverage issues
oremanj Jun 25, 2020
ebaf69c
blacken
oremanj Jun 25, 2020
5b2c544
Merge remote-tracking branch 'origin/master' into asyncgenhooks-basic
oremanj Jun 25, 2020
936ccdb
Add docs and newsfragment
oremanj Jun 25, 2020
7d0fcd9
Fix formatting
oremanj Jun 25, 2020
86a8b7d
Merge remote-tracking branch 'origin/master' into asyncgenhooks-basic
oremanj Jul 7, 2020
4776728
Fix flake8
oremanj Jul 7, 2020
5574b9e
Merge remote-tracking branch 'origin/master' into asyncgenhooks-basic
oremanj Jul 24, 2020
9868f25
Respond to review comments; split asyncgens logic into a separate file
oremanj Jul 24, 2020
2af6974
Fix mypy
oremanj Jul 24, 2020
33d168e
Merge remote-tracking branch 'origin/master' into asyncgenhooks-basic
oremanj Aug 5, 2020
0441952
Review responses
oremanj Aug 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions trio/_core/_entry_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ def run_cb(job):
async def kill_everything(exc):
raise exc

_core.spawn_system_task(kill_everything, exc)
try:
_core.spawn_system_task(kill_everything, exc)
except RuntimeError:
# We're quite late in the shutdown process and
# the system nursery is already closed.
_core.current_task().parent_nursery.start_soon(kill_everything, exc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a gross hack and I don't like it. (And I don't like the subtle coupling with the finalize_remaining_asyncgens code, where it has to use a shield because of this.)

But maybe there's no better solution for right now, and we should merge this as is, and then address #1607 in a separate PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very much a gross hack, but I think it's the best solution for right now. I'll leave a comment.


return True

# This has to be carefully written to be safe in the face of new items
Expand Down Expand Up @@ -102,10 +108,6 @@ def close(self):
def size(self):
return len(self.queue) + len(self.idempotent_queue)

def spawn(self):
name = "<TrioToken.run_sync_soon task>"
_core.spawn_system_task(self.task, name=name)

def run_sync_soon(self, sync_fn, *args, idempotent=False):
with self.lock:
if self.done:
Expand Down
199 changes: 182 additions & 17 deletions trio/_core/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import sys
import threading
from collections import deque
from functools import partial
import collections.abc
from contextlib import contextmanager, closing
import warnings
import weakref
import enum

from contextvars import copy_context
Expand Down Expand Up @@ -42,7 +44,7 @@
from ._thread_cache import start_thread_soon
from .. import _core
from .._deprecate import deprecated
from .._util import Final, NoPublicConstructor, coroutine_or_error
from .._util import Final, NoPublicConstructor, coroutine_or_error, name_asyncgen

_NO_SEND = object()

Expand All @@ -61,8 +63,9 @@ def _public(fn):
_ALLOW_DETERMINISTIC_SCHEDULING = False
_r = random.Random()

# Used to log exceptions in instruments
# Used to log exceptions in instruments and async generator finalizers
INSTRUMENT_LOGGER = logging.getLogger("trio.abc.Instrument")
ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors")


# On 3.7+, Context.run() is implemented in C and doesn't show up in
Expand Down Expand Up @@ -958,7 +961,7 @@ async def async_fn(arg1, arg2, \*, task_status=trio.TASK_STATUS_IGNORED):
self._pending_starts += 1
async with open_nursery() as old_nursery:
task_status = _TaskStatus(old_nursery, self)
thunk = functools.partial(async_fn, task_status=task_status)
thunk = partial(async_fn, task_status=task_status)
task = GLOBAL_RUN_CONTEXT.runner.spawn_impl(
thunk, args, old_nursery, name
)
Expand Down Expand Up @@ -1222,6 +1225,14 @@ class Runner:
is_guest = attr.ib(default=False)
guest_tick_scheduled = attr.ib(default=False)

# Async generators are added to this set when first iterated. Any
# left after the main task exits will be closed before trio.run()
# returns. During the execution of the main task, this is a
# WeakSet so GC works. During shutdown, it's a regular set so we
# don't have to deal with GC firing at unexpected times.
asyncgens = attr.ib(factory=weakref.WeakSet)
prev_asyncgen_hooks = attr.ib(default=None)

oremanj marked this conversation as resolved.
Show resolved Hide resolved
def force_guest_tick_asap(self):
if self.guest_tick_scheduled:
return
Expand All @@ -1231,6 +1242,8 @@ def force_guest_tick_asap(self):
def close(self):
self.io_manager.close()
self.entry_queue.close()
if self.prev_asyncgen_hooks is not None:
sys.set_asyncgen_hooks(*self.prev_asyncgen_hooks)
if self.instruments:
self.instrument("after_run")
# This is where KI protection gets disabled, so we do it last
Expand Down Expand Up @@ -1366,7 +1379,7 @@ def spawn_impl(self, async_fn, args, nursery, name, *, system_task=False):

if name is None:
name = async_fn
if isinstance(name, functools.partial):
if isinstance(name, partial):
name = name.func
if not isinstance(name, str):
try:
Expand Down Expand Up @@ -1432,11 +1445,7 @@ def task_exited(self, task, outcome):

task._activate_cancel_status(None)
self.tasks.remove(task)
if task is self.main_task:
self.main_task_outcome = outcome
self.system_nursery.cancel_scope.cancel()
self.system_nursery._child_finished(task, Value(None))
elif task is self.init_task:
if task is self.init_task:
# If the init task crashed, then something is very wrong and we
# let the error propagate. (It'll eventually be wrapped in a
# TrioInternalError.)
Expand All @@ -1446,11 +1455,126 @@ def task_exited(self, task, outcome):
if self.tasks: # pragma: no cover
raise TrioInternalError
else:
if task is self.main_task:
self.main_task_outcome = outcome
outcome = Value(None)
task._parent_nursery._child_finished(task, outcome)

if self.instruments:
self.instrument("task_exited", task)

################
# Async generator finalization support
################

oremanj marked this conversation as resolved.
Show resolved Hide resolved
async def finalize_asyncgen(self, agen, name, *, check_running):
if check_running and agen.ag_running:
# Another async generator is iterating this one, which is
# suspended at an event loop trap. Add it back to the
# asyncgens set and we'll get it on the next round. Note
# that this is only possible during end-of-run
# finalization; in GC-directed finalization, no one has a
# reference to agen anymore, so no one can be iterating it.
#
# This field is only reliable on 3.8+ due to
# ttps://bugs.python.org/issue32526. Pythons below
# 3.8 use a workaround in finalize_remaining_asyncgens.
oremanj marked this conversation as resolved.
Show resolved Hide resolved
self.asyncgens.add(agen)
return

try:
# This shield ensures that finalize_asyncgen never exits
# with an exception, not even a Cancelled. The inside
# is cancelled so there's no deadlock risk.
with CancelScope(shield=True) as cancel_scope:
cancel_scope.cancel()
await agen.aclose()
except BaseException as exc:
ASYNCGEN_LOGGER.exception(
"Exception ignored during finalization of async generator %r -- "
"surround your use of the generator in 'async with aclosing(...):' "
"to raise exceptions like this in the context where they're generated",
name,
)

async def finalize_remaining_asyncgens(self):
# At the time this function is called, there are exactly two
# tasks running: init and the run_sync_soon task. (And we've
# shut down the system nursery, so no more can appear.)
# Neither one uses async generators, so every async generator
# must be suspended at a yield point -- there's no one to be
# doing the iteration. However, once we start aclose() of one
# async generator, it might start fetching the next value from
# another, thus preventing us from closing that other.
#
# On 3.8+, we can detect this condition by looking at
# ag_running. On earlier versions, ag_running doesn't provide
# useful information. We could look at ag_await, but that
# would fail in case of shenanigans like
# https://github.com/python-trio/async_generator/pull/16.
# It's easier to just not parallelize the shutdowns.
finalize_in_parallel = sys.version_info >= (3, 8)

# It's possible that that cleanup code will itself create
# more async generators, so we iterate repeatedly until
# all are gone.
while self.asyncgens:
batch = self.asyncgens
self.asyncgens = set()

if finalize_in_parallel:
async with open_nursery() as kill_them_all:
# This shield is needed to avoid the checkpoint
# in Nursery.__aexit__ raising Cancelled if we're
# in a cancelled scope. (Which can happen if
# a run_sync_soon callback raises an exception.)
kill_them_all.cancel_scope.shield = True
for agen in batch:
name = name_asyncgen(agen)
kill_them_all.start_soon(
partial(
self.finalize_asyncgen, agen, name, check_running=True
),
name="close asyncgen {} (outlived run)".format(name),
oremanj marked this conversation as resolved.
Show resolved Hide resolved
)

if self.asyncgens == batch: # pragma: no cover
# Something about the running-detection seems
# to have failed; fall back to one-at-a-time mode
# instead of looping forever
finalize_in_parallel = False
else:
for agen in batch:
await self.finalize_asyncgen(
agen, name_asyncgen(agen), check_running=False
)

def setup_asyncgen_hooks(self):
def firstiter(agen):
self.asyncgens.add(agen)

def finalizer(agen):
agen_name = name_asyncgen(agen)
warnings.warn(
f"Async generator {agen_name!r} was garbage collected before it had "
f"been exhausted. Surround its use in 'async with aclosing(...):' "
f"to ensure that it gets cleaned up as soon as you're done using it.",
ResourceWarning,
stacklevel=2,
)
oremanj marked this conversation as resolved.
Show resolved Hide resolved
self.entry_queue.run_sync_soon(
partial(
self.spawn_system_task,
partial(
self.finalize_asyncgen, agen, agen_name, check_running=False
),
name=f"close asyncgen {agen_name} (abandoned)",
),
)

self.prev_asyncgen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer)

################
# System tasks and init
################
Expand Down Expand Up @@ -1500,14 +1624,51 @@ def spawn_system_task(self, async_fn, *args, name=None):
)

async def init(self, async_fn, args):
async with open_nursery() as system_nursery:
self.system_nursery = system_nursery
try:
self.main_task = self.spawn_impl(async_fn, args, system_nursery, None)
except BaseException as exc:
self.main_task_outcome = Error(exc)
system_nursery.cancel_scope.cancel()
self.entry_queue.spawn()
# run_sync_soon task runs here:
async with open_nursery() as run_sync_soon_nursery:
# All other system tasks run here:
async with open_nursery() as self.system_nursery:
# Only the main task runs here:
async with open_nursery() as main_task_nursery:
try:
self.main_task = self.spawn_impl(
async_fn, args, main_task_nursery, None
)
except BaseException as exc:
self.main_task_outcome = Error(exc)
return
self.spawn_impl(
self.entry_queue.task,
(),
run_sync_soon_nursery,
"<TrioToken.run_sync_soon task>",
system_task=True,
)

# Main task is done. We should be exiting soon, so
# we're going to shut down GC-mediated async generator
# finalization by turning the asyncgens WeakSet into a
# regular set. We must do that before closing the system
# nursery, since finalization spawns a new system tasks.
self.asyncgens = set(self.asyncgens)

# Process all pending run_sync_soon callbacks, in case one of
# them was an asyncgen finalizer.
self.entry_queue.run_sync_soon(self.reschedule, self.init_task)
await wait_task_rescheduled(lambda _: Abort.FAILED)

# Now it's safe to proceed with shutting down system tasks
self.system_nursery.cancel_scope.cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so this is an interesting change compared to the sketch in my comment. In my sketch, I did the self.asyncgens = set(self.asyncgens) and run_sync_soon flush after shutting down the system nursery, and you're doing it before.

I guess the tradeoff is: in my sketch, the finalizer's run_sync_soon payload has to be prepared to run after the system nursery is shut down. (I handled that by having it detect this case, and instead place the object into self.asyncgens.) So that's a bit more complex. In this version, it's a bit simpler, but it means that during the time period where system tasks are shutting down, async generator garbage collection is disabled. That makes me a bit nervous, because arbitrary user code is within its rights to create unbounded amounts of garbage and expect the GC to clean it all up.

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I figured we could trust system tasks to not run for an unbounded amount of time after they're cancelled, but you're right that that's not a very robust assumption to make. I'll change it to use your original approach. I think it will be quite difficult to test that reliably though, since we only have a one-tick window in between the system nursery becoming closed and the init task continuing on. I'll see what I can do...


# System tasks are gone and no more will be appearing.
# The only async-colored user code left to run is the
# finalizers for the async generators that remain alive.
await self.finalize_remaining_asyncgens()

# There are no more asyncgens, which means no more user-provided
# code except possibly run_sync_soon callbacks. It's finally safe
# to stop the run_sync_soon task and exit run().
run_sync_soon_nursery.cancel_scope.cancel()

################
# Outside context problems
Expand Down Expand Up @@ -1989,6 +2150,10 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False):
if not host_uses_signal_set_wakeup_fd:
runner.entry_queue.wakeup.wakeup_on_signals()

# Do this before before_run in case before_run wants to override
# our hooks
runner.setup_asyncgen_hooks()
oremanj marked this conversation as resolved.
Show resolved Hide resolved

if runner.instruments:
runner.instrument("before_run")
runner.clock.start_clock()
Expand Down
Loading