From 26f1aeae1e42cef682b829b98b36cb0fc5f52b8d Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 10 Jun 2020 08:03:20 +0000 Subject: [PATCH 01/18] Async generator hooks, simpler approach --- trio/_core/_entry_queue.py | 14 ++- trio/_core/_run.py | 193 +++++++++++++++++++++++++++--- trio/_core/tests/test_asyncgen.py | 115 ++++++++++++++++++ trio/_core/tests/test_run.py | 1 - trio/_util.py | 19 +++ 5 files changed, 319 insertions(+), 23 deletions(-) create mode 100644 trio/_core/tests/test_asyncgen.py diff --git a/trio/_core/_entry_queue.py b/trio/_core/_entry_queue.py index 791ab8ca6e..ac71405e4c 100644 --- a/trio/_core/_entry_queue.py +++ b/trio/_core/_entry_queue.py @@ -56,7 +56,15 @@ def run_cb(job): async def kill_everything(exc): raise exc - _core.spawn_system_task(kill_everything, exc) + try: + _core.spawn_system_task(kill_everything, exc) + except RuntimeError: + # We're quite late in the shutdown process and + # the system nursery is already closed. + _core.current_task().parent_nursery.start_soon( + kill_everything, exc + ) + return True # This has to be carefully written to be safe in the face of new items @@ -102,10 +110,6 @@ def close(self): def size(self): return len(self.queue) + len(self.idempotent_queue) - def spawn(self): - name = "" - _core.spawn_system_task(self.task, name=name) - def run_sync_soon(self, sync_fn, *args, idempotent=False): with self.lock: if self.done: diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 8b112051e5..24cdf9166d 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -9,9 +9,11 @@ import sys import threading from collections import deque +from functools import partial import collections.abc from contextlib import contextmanager, closing import warnings +import weakref import enum from contextvars import copy_context @@ -42,7 +44,7 @@ from ._thread_cache import start_thread_soon from .. import _core from .._deprecate import deprecated -from .._util import Final, NoPublicConstructor, coroutine_or_error +from .._util import Final, NoPublicConstructor, coroutine_or_error, name_asyncgen _NO_SEND = object() @@ -61,8 +63,9 @@ def _public(fn): _ALLOW_DETERMINISTIC_SCHEDULING = False _r = random.Random() -# Used to log exceptions in instruments +# Used to log exceptions in instruments and async generator finalizers INSTRUMENT_LOGGER = logging.getLogger("trio.abc.Instrument") +ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors") # On 3.7+, Context.run() is implemented in C and doesn't show up in @@ -958,7 +961,7 @@ async def async_fn(arg1, arg2, \*, task_status=trio.TASK_STATUS_IGNORED): self._pending_starts += 1 async with open_nursery() as old_nursery: task_status = _TaskStatus(old_nursery, self) - thunk = functools.partial(async_fn, task_status=task_status) + thunk = partial(async_fn, task_status=task_status) task = GLOBAL_RUN_CONTEXT.runner.spawn_impl( thunk, args, old_nursery, name ) @@ -1222,6 +1225,14 @@ class Runner: is_guest = attr.ib(default=False) guest_tick_scheduled = attr.ib(default=False) + # Async generators are added to this set when first iterated. Any + # left after the main task exits will be closed before trio.run() + # returns. During the execution of the main task, this is a + # WeakSet so GC works. During shutdown, it's a regular set so we + # don't have to deal with GC firing at unexpected times. + asyncgens = attr.ib(factory=weakref.WeakSet) + prev_asyncgen_hooks = attr.ib(default=None) + def force_guest_tick_asap(self): if self.guest_tick_scheduled: return @@ -1231,6 +1242,8 @@ def force_guest_tick_asap(self): def close(self): self.io_manager.close() self.entry_queue.close() + if self.prev_asyncgen_hooks is not None: + sys.set_asyncgen_hooks(*self.prev_asyncgen_hooks) if self.instruments: self.instrument("after_run") # This is where KI protection gets disabled, so we do it last @@ -1366,7 +1379,7 @@ def spawn_impl(self, async_fn, args, nursery, name, *, system_task=False): if name is None: name = async_fn - if isinstance(name, functools.partial): + if isinstance(name, partial): name = name.func if not isinstance(name, str): try: @@ -1432,11 +1445,7 @@ def task_exited(self, task, outcome): task._activate_cancel_status(None) self.tasks.remove(task) - if task is self.main_task: - self.main_task_outcome = outcome - self.system_nursery.cancel_scope.cancel() - self.system_nursery._child_finished(task, Value(None)) - elif task is self.init_task: + if task is self.init_task: # If the init task crashed, then something is very wrong and we # let the error propagate. (It'll eventually be wrapped in a # TrioInternalError.) @@ -1446,11 +1455,120 @@ def task_exited(self, task, outcome): if self.tasks: # pragma: no cover raise TrioInternalError else: + if task is self.main_task: + self.main_task_outcome = outcome + outcome = Value(None) task._parent_nursery._child_finished(task, outcome) if self.instruments: self.instrument("task_exited", task) + ################ + # Async generator finalization support + ################ + + async def finalize_asyncgen(self, agen, name, *, check_running): + if check_running and agen.ag_running: + # Another async generator is iterating this one, which is + # suspended at an event loop trap. Add it back to the + # asyncgens set and we'll get it on the next round. Note + # that this is only possible during end-of-run + # finalization; in GC-directed finalization, no one has a + # reference to agen anymore, so no one can be iterating it. + # + # This field is only reliable on 3.8+ due to + # ttps://bugs.python.org/issue32526. Pythons below + # 3.8 use a workaround in finalize_remaining_asyncgens. + self.asyncgens.add(agen) + return + + try: + # This shield ensures that finalize_asyncgen never exits + # with an exception, not even a Cancelled. The inside + # is cancelled so there's no deadlock risk. + with CancelScope(shield=True) as cancel_scope: + cancel_scope.cancel() + await agen.aclose() + except BaseException as exc: + ASYNCGEN_LOGGER.exception( + "Exception ignored during finalization of async generator %r -- " + "surround your use of the generator in 'async with aclosing(...):' " + "to raise exceptions like this in the context where they're generated", + name, + ) + + async def finalize_remaining_asyncgens(self): + # At the time this function is called, there are exactly two + # tasks running: init and the run_sync_soon task. (And we've + # shut down the system nursery, so no more can appear.) + # Neither one uses async generators, so every async generator + # must be suspended at a yield point -- there's no one to be + # doing the iteration. However, once we start aclose() of one + # async generator, it might start fetching the next value from + # another, thus preventing us from closing that other. + # + # On 3.8+, we can detect this condition by looking at + # ag_running. On earlier versions, ag_running doesn't provide + # useful information. We could look at ag_await, but that + # would fail in case of shenanigans like + # https://github.com/python-trio/async_generator/pull/16. + # It's easier to just not parallelize the shutdowns. + finalize_in_parallel = sys.version_info >= (3, 8) + + # It's possible that that cleanup code will itself create + # more async generators, so we iterate repeatedly until + # all are gone. + while self.asyncgens: + batch = self.asyncgens + self.asyncgens = set() + + if finalize_in_parallel: + async with open_nursery() as kill_them_all: + # This shield is needed to avoid the checkpoint + # in Nursery.__aexit__ raising Cancelled if we're + # in a cancelled scope. (Which can happen if + # a run_sync_soon callback raises an exception.) + kill_them_all.cancel_scope.shield = True + for agen in batch: + name = name_asyncgen(agen) + kill_them_all.start_soon( + partial(self.finalize_asyncgen, agen, name, check_running=True), + name="close asyncgen {} (outlived run)".format(name), + ) + + if self.asyncgens == batch: # pragma: no cover + # Something about the running-detection seems + # to have failed; fall back to one-at-a-time mode + # instead of looping forever + finalize_in_parallel = False + else: + for agen in batch: + await self.finalize_asyncgen(agen, name_asyncgen(agen), check_running=False) + + def setup_asyncgen_hooks(self): + def firstiter(agen): + self.asyncgens.add(agen) + + def finalizer(agen): + agen_name = name_asyncgen(agen) + warnings.warn( + f"Async generator {agen_name!r} was garbage collected before it had " + f"been exhausted. Surround its use in 'async with aclosing(...):' " + f"to ensure that it gets cleaned up as soon as you're done using it.", + ResourceWarning, + stacklevel=2, + ) + self.entry_queue.run_sync_soon( + partial( + self.spawn_system_task, + partial(self.finalize_asyncgen, agen, agen_name, check_running=False), + name=f"close asyncgen {agen_name} (abandoned)", + ), + ) + + self.prev_asyncgen_hooks = sys.get_asyncgen_hooks() + sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer) + ################ # System tasks and init ################ @@ -1500,14 +1618,51 @@ def spawn_system_task(self, async_fn, *args, name=None): ) async def init(self, async_fn, args): - async with open_nursery() as system_nursery: - self.system_nursery = system_nursery - try: - self.main_task = self.spawn_impl(async_fn, args, system_nursery, None) - except BaseException as exc: - self.main_task_outcome = Error(exc) - system_nursery.cancel_scope.cancel() - self.entry_queue.spawn() + # run_sync_soon task runs here: + async with open_nursery() as run_sync_soon_nursery: + # All other system tasks run here: + async with open_nursery() as self.system_nursery: + # Only the main task runs here: + async with open_nursery() as main_task_nursery: + try: + self.main_task = self.spawn_impl( + async_fn, args, main_task_nursery, None + ) + except BaseException as exc: + self.main_task_outcome = Error(exc) + return + self.spawn_impl( + self.entry_queue.task, + (), + run_sync_soon_nursery, + "", + system_task=True, + ) + + # Main task is done. We should be exiting soon, so + # we're going to shut down GC-mediated async generator + # finalization by turning the asyncgens WeakSet into a + # regular set. We must do that before closing the system + # nursery, since finalization spawns a new system tasks. + self.asyncgens = set(self.asyncgens) + + # Process all pending run_sync_soon callbacks, in case one of + # them was an asyncgen finalizer. + self.entry_queue.run_sync_soon(self.reschedule, self.init_task) + await wait_task_rescheduled(lambda _: Abort.FAILED) + + # Now it's safe to proceed with shutting down system tasks + self.system_nursery.cancel_scope.cancel() + + # System tasks are gone and no more will be appearing. + # The only async-colored user code left to run is the + # finalizers for the async generators that remain alive. + await self.finalize_remaining_asyncgens() + + # There are no more asyncgens, which means no more user-provided + # code except possibly run_sync_soon callbacks. It's finally safe + # to stop the run_sync_soon task and exit run(). + run_sync_soon_nursery.cancel_scope.cancel() ################ # Outside context problems @@ -1989,6 +2144,10 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): if not host_uses_signal_set_wakeup_fd: runner.entry_queue.wakeup.wakeup_on_signals() + # Do this before before_run in case before_run wants to override + # our hooks + runner.setup_asyncgen_hooks() + if runner.instruments: runner.instrument("before_run") runner.clock.start_clock() diff --git a/trio/_core/tests/test_asyncgen.py b/trio/_core/tests/test_asyncgen.py new file mode 100644 index 0000000000..ecc4c4ba54 --- /dev/null +++ b/trio/_core/tests/test_asyncgen.py @@ -0,0 +1,115 @@ +import sys +import pytest +from math import inf +from functools import partial +from async_generator import aclosing +from ... import _core +from .tutil import gc_collect_harder + + +def test_asyncgen_basics(): + collected = [] + + async def example(cause): + try: + try: + yield 42 + except GeneratorExit: + pass + await _core.checkpoint() + except _core.Cancelled: + assert "example" in _core.current_task().name + assert "exhausted" not in cause + task_name = _core.current_task().name + assert cause in task_name or task_name == "" + assert _core.current_effective_deadline() == -inf + with pytest.raises(_core.Cancelled): + await _core.checkpoint() + collected.append(cause) + else: + assert "async_main" in _core.current_task().name + assert "exhausted" in cause + assert _core.current_effective_deadline() == inf + await _core.checkpoint() + collected.append(cause) + + saved = [] + + async def async_main(): + # GC'ed before exhausted + with pytest.warns( + ResourceWarning, match="Async generator.*collected before.*exhausted", + ): + async for val in example("abandoned"): + assert val == 42 + break + gc_collect_harder() + await _core.wait_all_tasks_blocked() + assert collected.pop() == "abandoned" + + # aclosing() ensures it's cleaned up at point of use + async with aclosing(example("exhausted 1")) as aiter: + async for val in aiter: + assert val == 42 + break + assert collected.pop() == "exhausted 1" + + # Also fine if you exhaust it at point of use + async for val in example("exhausted 2"): + assert val == 42 + assert collected.pop() == "exhausted 2" + + gc_collect_harder() + + # No problems saving the geniter when using either of these patterns + async with aclosing(example("exhausted 3")) as aiter: + saved.append(aiter) + async for val in aiter: + assert val == 42 + break + assert collected.pop() == "exhausted 3" + + # Also fine if you exhaust it at point of use + saved.append(example("exhausted 4")) + async for val in saved[-1]: + assert val == 42 + assert collected.pop() == "exhausted 4" + + # Leave one referenced-but-unexhausted and make sure it gets cleaned up + saved.append(example("outlived run")) + async for val in saved[-1]: + assert val == 42 + break + assert collected == [] + + _core.run(async_main) + assert collected.pop() == "outlived run" + for agen in saved: + assert agen.ag_frame is None # all should now be exhausted + + +def test_firstiter_after_closing(): + saved = [] + record = [] + + async def funky_agen(): + try: + yield 1 + except GeneratorExit: + record.append("cleanup 1") + raise + try: + yield 2 + finally: + record.append("cleanup 2") + async for _ in funky_agen(): + break + + async def async_main(): + aiter = funky_agen() + saved.append(aiter) + assert 1 == await aiter.asend(None) + assert 2 == await aiter.asend(None) + + _core.run(async_main) + assert record == ["cleanup 2", "cleanup 1"] diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 78b46b7adc..1ddab60cd2 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -403,7 +403,6 @@ async def main(): + [("before", task), ("schedule", task), ("after", task)] * 5 + [("before", task), ("after", task), ("after_run",)] ) - assert len(r1.record) > len(r2.record) > len(r3.record) assert r1.record == r2.record + r3.record assert list(r1.filter_tasks([task])) == expected diff --git a/trio/_util.py b/trio/_util.py index 03b79065e2..939eaad332 100644 --- a/trio/_util.py +++ b/trio/_util.py @@ -1,3 +1,5 @@ +# coding: utf-8 + # Little utilities we use internally from abc import ABCMeta @@ -349,3 +351,20 @@ def __call__(self, *args, **kwargs): def _create(self, *args, **kwargs): return super().__call__(*args, **kwargs) + + +def name_asyncgen(agen): + """Return the fully-qualified name of the async generator function + that produced the async generator iterator *agen*. + """ + if not hasattr(agen, "ag_code"): + return repr(agen) + try: + module = agen.ag_frame.f_globals["__name__"] + except (AttributeError, KeyError): + module = "<{}>".format(agen.ag_code.co_filename) + try: + qualname = agen.__qualname__ + except AttributeError: + qualname = agen.ag_code.co_name + return f"{module}.{qualname}" From ac3e46d113e7b0f9eb1ad0708bd955a1a7dacc77 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 10 Jun 2020 08:21:30 +0000 Subject: [PATCH 02/18] blacken --- trio/_core/_entry_queue.py | 4 +--- trio/_core/_run.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/trio/_core/_entry_queue.py b/trio/_core/_entry_queue.py index ac71405e4c..357ea2cca9 100644 --- a/trio/_core/_entry_queue.py +++ b/trio/_core/_entry_queue.py @@ -61,9 +61,7 @@ async def kill_everything(exc): except RuntimeError: # We're quite late in the shutdown process and # the system nursery is already closed. - _core.current_task().parent_nursery.start_soon( - kill_everything, exc - ) + _core.current_task().parent_nursery.start_soon(kill_everything, exc) return True diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 24cdf9166d..f891fda7b3 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1532,7 +1532,9 @@ async def finalize_remaining_asyncgens(self): for agen in batch: name = name_asyncgen(agen) kill_them_all.start_soon( - partial(self.finalize_asyncgen, agen, name, check_running=True), + partial( + self.finalize_asyncgen, agen, name, check_running=True + ), name="close asyncgen {} (outlived run)".format(name), ) @@ -1543,7 +1545,9 @@ async def finalize_remaining_asyncgens(self): finalize_in_parallel = False else: for agen in batch: - await self.finalize_asyncgen(agen, name_asyncgen(agen), check_running=False) + await self.finalize_asyncgen( + agen, name_asyncgen(agen), check_running=False + ) def setup_asyncgen_hooks(self): def firstiter(agen): @@ -1561,7 +1565,9 @@ def finalizer(agen): self.entry_queue.run_sync_soon( partial( self.spawn_system_task, - partial(self.finalize_asyncgen, agen, agen_name, check_running=False), + partial( + self.finalize_asyncgen, agen, agen_name, check_running=False + ), name=f"close asyncgen {agen_name} (abandoned)", ), ) From 2947de7eb248e2f33790fb00bc7465490b0e0922 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 01:50:28 +0000 Subject: [PATCH 03/18] Respond to review comments + add more tests --- trio/_core/_entry_queue.py | 6 +- trio/_core/_run.py | 151 ++++++++++++++---------------- trio/_core/tests/test_asyncgen.py | 97 ++++++++++++++++++- 3 files changed, 170 insertions(+), 84 deletions(-) diff --git a/trio/_core/_entry_queue.py b/trio/_core/_entry_queue.py index 357ea2cca9..a1587a18cd 100644 --- a/trio/_core/_entry_queue.py +++ b/trio/_core/_entry_queue.py @@ -59,8 +59,10 @@ async def kill_everything(exc): try: _core.spawn_system_task(kill_everything, exc) except RuntimeError: - # We're quite late in the shutdown process and - # the system nursery is already closed. + # We're quite late in the shutdown process and the + # system nursery is already closed. + # TODO(2020-06): this is a gross hack and should + # be fixed soon when we address #1607. _core.current_task().parent_nursery.start_soon(kill_everything, exc) return True diff --git a/trio/_core/_run.py b/trio/_core/_run.py index f891fda7b3..9efc9644ac 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1227,10 +1227,18 @@ class Runner: # Async generators are added to this set when first iterated. Any # left after the main task exits will be closed before trio.run() - # returns. During the execution of the main task, this is a - # WeakSet so GC works. During shutdown, it's a regular set so we - # don't have to deal with GC firing at unexpected times. + # returns. During most of the run, this is a WeakSet so GC works. + # During shutdown, when we're finalizign all the remaining + # asyncgens after the system nursery has been closed, it's a + # regular set so we don't have to deal with GC firing at + # unexpected times. asyncgens = attr.ib(factory=weakref.WeakSet) + + # This collects async generators that get garbage collected during + # the one-tick window between the system nursery closing and the + # init task starting end-of-run asyncgen finalization. + trailing_finalizer_asyncgens = attr.ib(factory=set) + prev_asyncgen_hooks = attr.ib(default=None) def force_guest_tick_asap(self): @@ -1467,21 +1475,7 @@ def task_exited(self, task, outcome): # Async generator finalization support ################ - async def finalize_asyncgen(self, agen, name, *, check_running): - if check_running and agen.ag_running: - # Another async generator is iterating this one, which is - # suspended at an event loop trap. Add it back to the - # asyncgens set and we'll get it on the next round. Note - # that this is only possible during end-of-run - # finalization; in GC-directed finalization, no one has a - # reference to agen anymore, so no one can be iterating it. - # - # This field is only reliable on 3.8+ due to - # ttps://bugs.python.org/issue32526. Pythons below - # 3.8 use a workaround in finalize_remaining_asyncgens. - self.asyncgens.add(agen) - return - + async def finalize_asyncgen(self, agen, name): try: # This shield ensures that finalize_asyncgen never exits # with an exception, not even a Cancelled. The inside @@ -1503,17 +1497,32 @@ async def finalize_remaining_asyncgens(self): # shut down the system nursery, so no more can appear.) # Neither one uses async generators, so every async generator # must be suspended at a yield point -- there's no one to be - # doing the iteration. However, once we start aclose() of one - # async generator, it might start fetching the next value from - # another, thus preventing us from closing that other. + # doing the iteration. That's good, because aclose() only + # works on an asyncgen that's suspended at a yield point. + # (If it's suspended at an event loop trap, because someone + # is in the middle of iterating it, then you get a RuntimeError + # on 3.8+, and a nasty surprise on earlier versions due to + # https://bugs.python.org/issue32526.) # - # On 3.8+, we can detect this condition by looking at - # ag_running. On earlier versions, ag_running doesn't provide - # useful information. We could look at ag_await, but that - # would fail in case of shenanigans like - # https://github.com/python-trio/async_generator/pull/16. - # It's easier to just not parallelize the shutdowns. - finalize_in_parallel = sys.version_info >= (3, 8) + # However, once we start aclose() of one async generator, it + # might start fetching the next value from another, thus + # preventing us from closing that other (at least until + # aclose() of the first one is complete). This constraint + # effectively requires us to finalize the remaining asyncgens + # in arbitrary order, rather than doing all of them at the + # same time. On 3.8+ we could defer any generator with + # ag_running=True to a later batch, but that only catches + # the case where our aclose() starts after the user's + # asend()/etc. If our aclose() starts first, then the + # user's asend()/etc will raise RuntimeError, since they're + # probably not checking ag_running. + # + # It might be possible to allow some parallelized cleanup if + # we can determine that a certain set of asyncgens have no + # interdependencies, using gc.get_referents() and such. + # But just doing one at a time will typically work well enough + # (since each aclose() executes in a cancelled scope) and + # is much easier to reason about. # It's possible that that cleanup code will itself create # more async generators, so we iterate repeatedly until @@ -1521,38 +1530,26 @@ async def finalize_remaining_asyncgens(self): while self.asyncgens: batch = self.asyncgens self.asyncgens = set() - - if finalize_in_parallel: - async with open_nursery() as kill_them_all: - # This shield is needed to avoid the checkpoint - # in Nursery.__aexit__ raising Cancelled if we're - # in a cancelled scope. (Which can happen if - # a run_sync_soon callback raises an exception.) - kill_them_all.cancel_scope.shield = True - for agen in batch: - name = name_asyncgen(agen) - kill_them_all.start_soon( - partial( - self.finalize_asyncgen, agen, name, check_running=True - ), - name="close asyncgen {} (outlived run)".format(name), - ) - - if self.asyncgens == batch: # pragma: no cover - # Something about the running-detection seems - # to have failed; fall back to one-at-a-time mode - # instead of looping forever - finalize_in_parallel = False - else: - for agen in batch: - await self.finalize_asyncgen( - agen, name_asyncgen(agen), check_running=False - ) + for agen in batch: + await self.finalize_asyncgen(agen, name_asyncgen(agen)) def setup_asyncgen_hooks(self): def firstiter(agen): self.asyncgens.add(agen) + def finalize_in_trio_context(agen, agen_name): + try: + self.spawn_system_task( + self.finalize_asyncgen, agen, agen_name, + name=f"close asyncgen {agen_name} (abandoned)", + ) + except RuntimeError: + # There is a one-tick window where the system nursery + # is closed but the init task hasn't yet made + # self.asyncgens a strong set to disable GC. We seem to + # have hit it. + self.trailing_finalizer_asyncgens.add(agen) + def finalizer(agen): agen_name = name_asyncgen(agen) warnings.warn( @@ -1561,16 +1558,9 @@ def finalizer(agen): f"to ensure that it gets cleaned up as soon as you're done using it.", ResourceWarning, stacklevel=2, + source=agen, ) - self.entry_queue.run_sync_soon( - partial( - self.spawn_system_task, - partial( - self.finalize_asyncgen, agen, agen_name, check_running=False - ), - name=f"close asyncgen {agen_name} (abandoned)", - ), - ) + self.entry_queue.run_sync_soon(finalize_in_trio_context, agen, agen_name) self.prev_asyncgen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer) @@ -1645,22 +1635,23 @@ async def init(self, async_fn, args): system_task=True, ) - # Main task is done. We should be exiting soon, so - # we're going to shut down GC-mediated async generator - # finalization by turning the asyncgens WeakSet into a - # regular set. We must do that before closing the system - # nursery, since finalization spawns a new system tasks. - self.asyncgens = set(self.asyncgens) + # Main task is done; start shutting down system tasks + self.system_nursery.cancel_scope.cancel() - # Process all pending run_sync_soon callbacks, in case one of - # them was an asyncgen finalizer. - self.entry_queue.run_sync_soon(self.reschedule, self.init_task) - await wait_task_rescheduled(lambda _: Abort.FAILED) + # The only tasks running at this point are init (this one) + # and the run_sync_soon task. We still need to finalize + # remaining async generators. To make that easier to reason + # about, we'll shut down their garbage collection by turning + # the asyncgens WeakSet into a regular set. + self.asyncgens = set(self.asyncgens) - # Now it's safe to proceed with shutting down system tasks - self.system_nursery.cancel_scope.cancel() + # Process all pending run_sync_soon callbacks, in case one of + # them was an asyncgen finalizer that snuck in under the wire. + self.entry_queue.run_sync_soon(self.reschedule, self.init_task) + await wait_task_rescheduled(lambda _: Abort.FAILED) + self.asyncgens.update(self.trailing_finalizer_asyncgens) + self.trailing_finalizer_asyncgens.clear() - # System tasks are gone and no more will be appearing. # The only async-colored user code left to run is the # finalizers for the async generators that remain alive. await self.finalize_remaining_asyncgens() @@ -1948,6 +1939,8 @@ def setup_runner(clock, instruments, restrict_keyboard_interrupt_to_checkpoints) # particular before we start modifying global state like GLOBAL_RUN_CONTEXT ki_manager.install(runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints) + runner.setup_asyncgen_hooks() + GLOBAL_RUN_CONTEXT.runner = runner return runner @@ -2150,10 +2143,6 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): if not host_uses_signal_set_wakeup_fd: runner.entry_queue.wakeup.wakeup_on_signals() - # Do this before before_run in case before_run wants to override - # our hooks - runner.setup_asyncgen_hooks() - if runner.instruments: runner.instrument("before_run") runner.clock.start_clock() diff --git a/trio/_core/tests/test_asyncgen.py b/trio/_core/tests/test_asyncgen.py index ecc4c4ba54..5750b09489 100644 --- a/trio/_core/tests/test_asyncgen.py +++ b/trio/_core/tests/test_asyncgen.py @@ -1,4 +1,5 @@ import sys +import weakref import pytest from math import inf from functools import partial @@ -18,7 +19,6 @@ async def example(cause): pass await _core.checkpoint() except _core.Cancelled: - assert "example" in _core.current_task().name assert "exhausted" not in cause task_name = _core.current_task().name assert cause in task_name or task_name == "" @@ -113,3 +113,98 @@ async def async_main(): _core.run(async_main) assert record == ["cleanup 2", "cleanup 1"] + + +def test_interdependent_asyncgen_cleanup_order(): + saved = [] + record = [] + + async def innermost(): + try: + yield 1 + finally: + await _core.cancel_shielded_checkpoint() + record.append("innermost") + + async def agen(label, inner): + try: + yield await inner.asend(None) + finally: + # Either `inner` has already been cleaned up, or + # we're about to exhaust it. Either way, we wind + # up with `record` containing the labels in + # innermost-to-outermost order. + with pytest.raises(StopAsyncIteration): + await inner.asend(None) + record.append(label) + + async def async_main(): + # This makes a chain of 101 interdependent asyncgens: + # agen(99)'s cleanup will iterate agen(98)'s will iterate + # ... agen(0)'s will iterate innermost()'s + ag_chain = innermost() + for idx in range(100): + ag_chain = agen(idx, ag_chain) + saved.append(ag_chain) + async for val in ag_chain: + assert val == 1 + break + assert record == [] + + _core.run(async_main) + assert record == ["innermost"] + list(range(100)) + + +def test_last_minute_gc_edge_case(): + saved = [] + record = [] + needs_retry = True + + async def agen(): + try: + yield 1 + finally: + record.append("cleaned up") + + def collect_at_opportune_moment(token): + runner = _core._run.GLOBAL_RUN_CONTEXT.runner + if runner.system_nursery._closed and isinstance(runner.asyncgens, weakref.WeakSet): + saved.clear() + record.append("final collection") + gc_collect_harder() + record.append("done") + else: + try: + token.run_sync_soon(collect_at_opportune_moment, token) + except _core.RunFinishedError: + nonlocal needs_retry + needs_retry = True + + async def async_main(): + token = _core.current_trio_token() + token.run_sync_soon(collect_at_opportune_moment, token) + saved.append(agen()) + async for _ in saved[-1]: + break + + # Actually running into the edge case requires that the run_sync_soon task + # execute in between the system nursery's closure and the strong-ification + # of runner.asyncgens. There's about a 25% chance that it doesn't + # (if the run_sync_soon task runs before init on one tick and after init + # on the next tick); if we try enough times, we can make the chance of + # failure as small as we want. + for attempt in range(50): + needs_retry = False + del record[:] + del saved[:] + _core.run(async_main) + if needs_retry: + assert record == ["cleaned up"] + else: + assert record == ["final collection", "done", "cleaned up"] + break + else: + pytest.fail( + f"Didn't manage to hit the trailing_finalizer_asyncgens case " + f"despite trying {attempt} times" + ) From d198ed20844783f2ae9a843ca3238c1eb580367e Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 01:51:00 +0000 Subject: [PATCH 04/18] blacken --- trio/_core/_run.py | 4 +++- trio/_core/tests/test_asyncgen.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 9efc9644ac..0df293881f 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1540,7 +1540,9 @@ def firstiter(agen): def finalize_in_trio_context(agen, agen_name): try: self.spawn_system_task( - self.finalize_asyncgen, agen, agen_name, + self.finalize_asyncgen, + agen, + agen_name, name=f"close asyncgen {agen_name} (abandoned)", ) except RuntimeError: diff --git a/trio/_core/tests/test_asyncgen.py b/trio/_core/tests/test_asyncgen.py index 5750b09489..4d939a9f72 100644 --- a/trio/_core/tests/test_asyncgen.py +++ b/trio/_core/tests/test_asyncgen.py @@ -168,7 +168,9 @@ async def agen(): def collect_at_opportune_moment(token): runner = _core._run.GLOBAL_RUN_CONTEXT.runner - if runner.system_nursery._closed and isinstance(runner.asyncgens, weakref.WeakSet): + if runner.system_nursery._closed and isinstance( + runner.asyncgens, weakref.WeakSet + ): saved.clear() record.append("final collection") gc_collect_harder() From 45c2eeea66207bb92831fcb941472c7c42f73dc4 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 01:55:52 +0000 Subject: [PATCH 05/18] flake8 --- trio/_core/_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 4a94ecdaad..f36f4616a2 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1544,7 +1544,7 @@ async def finalize_asyncgen(self, agen, name): with CancelScope(shield=True) as cancel_scope: cancel_scope.cancel() await agen.aclose() - except BaseException as exc: + except BaseException: ASYNCGEN_LOGGER.exception( "Exception ignored during finalization of async generator %r -- " "surround your use of the generator in 'async with aclosing(...):' " From 2d81bb00f5fd8fd5f5954be1997ee8a51907a77b Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 01:57:49 +0000 Subject: [PATCH 06/18] Fix mismerge with master --- trio/_core/_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index f36f4616a2..e74c73a884 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1448,7 +1448,7 @@ def spawn_impl(self, async_fn, args, nursery, name, *, system_task=False): if name is None: name = async_fn - if isinstance(name, partial): + if isinstance(name, functools.partial): name = name.func if not isinstance(name, str): try: From 5a37f78697c4194e8ac2c5fc65f611e5cf0770f0 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 02:06:08 +0000 Subject: [PATCH 07/18] Work correctly in -Werror mode too --- trio/_core/_run.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index e74c73a884..b7e8763b8f 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1615,6 +1615,11 @@ def finalize_in_trio_context(agen, agen_name): def finalizer(agen): agen_name = name_asyncgen(agen) + self.entry_queue.run_sync_soon(finalize_in_trio_context, agen, agen_name) + + # Do this last, because it might raise an exception depending on the + # user's warnings filter. (That exception will be printed to the + # terminal and ignored, since we're running in GC context.) warnings.warn( f"Async generator {agen_name!r} was garbage collected before it had " f"been exhausted. Surround its use in 'async with aclosing(...):' " @@ -1623,7 +1628,6 @@ def finalizer(agen): stacklevel=2, source=agen, ) - self.entry_queue.run_sync_soon(finalize_in_trio_context, agen, agen_name) self.prev_asyncgen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer) From 3acf34be7ecf2699d14741aa3fd95fabd1357650 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 04:27:26 +0000 Subject: [PATCH 08/18] Handle asyncgens correctly when Trio is the guest of an asyncio host --- trio/_core/_run.py | 73 +++++++++++++++++++----- trio/_core/tests/test_asyncgen.py | 87 +++++++++++++++++++++++++++++ trio/_core/tests/test_guest_mode.py | 39 +++++++++++++ 3 files changed, 185 insertions(+), 14 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index b7e8763b8f..dc9526fc2e 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1596,7 +1596,18 @@ async def finalize_remaining_asyncgens(self): def setup_asyncgen_hooks(self): def firstiter(agen): - self.asyncgens.add(agen) + if hasattr(GLOBAL_RUN_CONTEXT, "task"): + self.asyncgens.add(agen) + else: + # An async generator first iterated outside of a Trio + # task doesn't belong to Trio. Probably we're in guest + # mode and the async generator belongs to our host. + # The locals dictionary is the only good place to + # remember this fact, at least until + # https://bugs.python.org/issue40916 is implemented. + agen.ag_frame.f_locals["@trio_foreign_asyncgen"] = True + if self.prev_asyncgen_hooks.firstiter is not None: + self.prev_asyncgen_hooks.firstiter(agen) def finalize_in_trio_context(agen, agen_name): try: @@ -1615,19 +1626,53 @@ def finalize_in_trio_context(agen, agen_name): def finalizer(agen): agen_name = name_asyncgen(agen) - self.entry_queue.run_sync_soon(finalize_in_trio_context, agen, agen_name) - - # Do this last, because it might raise an exception depending on the - # user's warnings filter. (That exception will be printed to the - # terminal and ignored, since we're running in GC context.) - warnings.warn( - f"Async generator {agen_name!r} was garbage collected before it had " - f"been exhausted. Surround its use in 'async with aclosing(...):' " - f"to ensure that it gets cleaned up as soon as you're done using it.", - ResourceWarning, - stacklevel=2, - source=agen, - ) + try: + is_ours = not agen.ag_frame.f_locals.get("@trio_foreign_asyncgen") + except AttributeError: + is_ours = True + + if is_ours: + self.entry_queue.run_sync_soon( + finalize_in_trio_context, agen, agen_name + ) + + # Do this last, because it might raise an exception + # depending on the user's warnings filter. (That + # exception will be printed to the terminal and + # ignored, since we're running in GC context.) + warnings.warn( + f"Async generator {agen_name!r} was garbage collected before it " + f"had been exhausted. Surround its use in 'async with " + f"aclosing(...):' to ensure that it gets cleaned up as soon as " + f"you're done using it.", + ResourceWarning, + stacklevel=2, + source=agen, + ) + else: + # Not ours -> forward to the host loop's async generator finalizer + if self.prev_asyncgen_hooks.finalizer is not None: + self.prev_asyncgen_hooks.finalizer(agen) + else: + # Host has no finalizer. Reimplement the default + # Python behavior with no hooks installed: throw in + # GeneratorExit, step once, raise RuntimeError if + # it doesn't exit. + closer = agen.aclose() + try: + # If the next thing is a yield, this will raise RuntimeError + # which we allow to propagate + closer.send(None) + except StopIteration: + pass + else: + # If the next thing is an await, we get here. Give a nicer + # error than the default "async generator ignored GeneratorExit" + raise RuntimeError( + f"async generator {agen_name!r} awaited during " + f"finalization; install a finalization hook to support " + f"this, or wrap it in 'async with aclosing(...):'" + ) self.prev_asyncgen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer) diff --git a/trio/_core/tests/test_asyncgen.py b/trio/_core/tests/test_asyncgen.py index 4d939a9f72..ff3e42266d 100644 --- a/trio/_core/tests/test_asyncgen.py +++ b/trio/_core/tests/test_asyncgen.py @@ -210,3 +210,90 @@ async def async_main(): f"Didn't manage to hit the trailing_finalizer_asyncgens case " f"despite trying {attempt} times" ) + + +async def step_outside_async_context(aiter): + # abort_fns run outside of task context, at least if they're + # triggered by a deadline expiry rather than a direct + # cancellation. Thus, an asyncgen first iterated inside one + # will appear non-Trio, and since no other hooks were installed, + # will use the last-ditch fallback handling (that tries to mimic + # CPython's behavior with no hooks). + # + # NB: the strangeness with aiter being an attribute of abort_fn is + # to make it as easy as possible to ensure we don't hang onto a + # reference to aiter inside the guts of the run loop. + def abort_fn(_): + with pytest.raises(StopIteration, match="42"): + abort_fn.aiter.asend(None).send(None) + del abort_fn.aiter + return _core.Abort.SUCCEEDED + + abort_fn.aiter = aiter + + async with _core.open_nursery() as nursery: + nursery.start_soon(_core.wait_task_rescheduled, abort_fn) + await _core.wait_all_tasks_blocked() + nursery.cancel_scope.deadline = _core.current_time() + + +async def test_fallback_when_no_hook_claims_it(capsys): + async def well_behaved(): + yield 42 + + async def yields_after_yield(): + with pytest.raises(GeneratorExit): + yield 42 + yield 100 + + async def awaits_after_yield(): + with pytest.raises(GeneratorExit): + yield 42 + await _core.cancel_shielded_checkpoint() + + await step_outside_async_context(well_behaved()) + gc_collect_harder() + assert capsys.readouterr().err == "" + + await step_outside_async_context(yields_after_yield()) + gc_collect_harder() + assert "ignored GeneratorExit" in capsys.readouterr().err + + await step_outside_async_context(awaits_after_yield()) + gc_collect_harder() + assert "awaited during finalization" in capsys.readouterr().err + + +def test_delegation_to_existing_hooks(): + record = [] + + def my_firstiter(agen): + record.append("firstiter " + agen.ag_frame.f_locals["arg"]) + + def my_finalizer(agen): + record.append("finalizer " + agen.ag_frame.f_locals["arg"]) + + async def example(arg): + try: + yield 42 + finally: + with pytest.raises(_core.Cancelled): + await _core.checkpoint() + record.append("trio collected " + arg) + + async def async_main(): + await step_outside_async_context(example("theirs")) + assert 42 == await example("ours").asend(None) + gc_collect_harder() + assert record == ["firstiter theirs", "finalizer theirs"] + record[:] = [] + await _core.wait_all_tasks_blocked() + assert record == ["trio collected ours"] + + old_hooks = sys.get_asyncgen_hooks() + sys.set_asyncgen_hooks(my_firstiter, my_finalizer) + try: + _core.run(async_main) + finally: + assert sys.get_asyncgen_hooks() == (my_firstiter, my_finalizer) + sys.set_asyncgen_hooks(*old_hooks) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index e58ec3ebc2..3412cb0d6b 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -497,3 +497,42 @@ async def trio_main(in_host): # Should be basically instantaneous, but we'll leave a generous buffer to # account for any CI weirdness assert end - start < DURATION / 2 + + +def test_guest_mode_asyncgens(): + import sniffio, sys + + record = set() + + async def agen(label): + assert sniffio.current_async_library() == label + try: + yield 1 + finally: + library = sniffio.current_async_library() + try: + await sys.modules[library].sleep(0) + except trio.Cancelled: + pass + record.add((label, library)) + + async def iterate_in_aio(): + # "trio" gets inherited from our Trio caller if we don't set this + sniffio.current_async_library_cvar.set("asyncio") + async for _ in agen("asyncio"): + break + + async def trio_main(): + task = asyncio.ensure_future(iterate_in_aio()) + done_evt = trio.Event() + task.add_done_callback(lambda _: done_evt.set()) + with trio.fail_after(1): + await done_evt.wait() + + async for _ in agen("trio"): + break + + gc_collect_harder() + + aiotrio_run(trio_main, host_uses_signal_set_wakeup_fd=True) + assert record == {("asyncio", "asyncio"), ("trio", "trio")} From 80e8ab3db35451502d11294cffd8421a0985b84a Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 04:55:59 +0000 Subject: [PATCH 09/18] Fix 3.6 --- trio/_core/tests/test_guest_mode.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index 3412cb0d6b..fdc304c3ae 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -1,5 +1,6 @@ import pytest import asyncio +import contextvars import traceback import queue from functools import partial @@ -534,5 +535,9 @@ async def trio_main(): gc_collect_harder() - aiotrio_run(trio_main, host_uses_signal_set_wakeup_fd=True) + # Ensure we don't pollute the thread-level context if run under + # an asyncio without contextvars support (3.6) + context = contextvars.copy_context() + context.run(aiotrio_run, trio_main, host_uses_signal_set_wakeup_fd=True) + assert record == {("asyncio", "asyncio"), ("trio", "trio")} From cfdb8501836eb10370bd69b04fbbf004eae638b7 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 05:22:58 +0000 Subject: [PATCH 10/18] Make tests pass on pypy 7.2 which doesn't run firstiter hooks --- trio/_core/tests/test_asyncgen.py | 28 ++++++++++++++++++++++------ trio/_core/tests/test_guest_mode.py | 5 +++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/trio/_core/tests/test_asyncgen.py b/trio/_core/tests/test_asyncgen.py index ff3e42266d..efc3d562fe 100644 --- a/trio/_core/tests/test_asyncgen.py +++ b/trio/_core/tests/test_asyncgen.py @@ -8,6 +8,14 @@ from .tutil import gc_collect_harder +# PyPy 7.2 was released with a bug that just never called firstiter at +# all. This impacts tests of end-of-run finalization (nothing gets +# added to runner.asyncgens) and tests of "foreign" async generator +# behavior (since the firstiter hook is what marks the asyncgen as +# foreign), but most tests of GC-mediated finalization still work. +bad_pypy = sys.implementation.name == "pypy" and sys.pypy_version_info < (7, 3) + + def test_asyncgen_basics(): collected = [] @@ -76,11 +84,14 @@ async def async_main(): assert collected.pop() == "exhausted 4" # Leave one referenced-but-unexhausted and make sure it gets cleaned up - saved.append(example("outlived run")) - async for val in saved[-1]: - assert val == 42 - break - assert collected == [] + if bad_pypy: + collected.append("outlived run") + else: + saved.append(example("outlived run")) + async for val in saved[-1]: + assert val == 42 + break + assert collected == [] _core.run(async_main) assert collected.pop() == "outlived run" @@ -88,6 +99,7 @@ async def async_main(): assert agen.ag_frame is None # all should now be exhausted +@pytest.mark.skipif(bad_pypy, reason="pypy 7.2.0 is buggy") def test_firstiter_after_closing(): saved = [] record = [] @@ -115,6 +127,7 @@ async def async_main(): assert record == ["cleanup 2", "cleanup 1"] +@pytest.mark.skipif(bad_pypy, reason="pypy 7.2.0 is buggy") def test_interdependent_asyncgen_cleanup_order(): saved = [] record = [] @@ -201,7 +214,8 @@ async def async_main(): del saved[:] _core.run(async_main) if needs_retry: - assert record == ["cleaned up"] + if not bad_pypy: + assert record == ["cleaned up"] else: assert record == ["final collection", "done", "cleaned up"] break @@ -237,6 +251,7 @@ def abort_fn(_): nursery.cancel_scope.deadline = _core.current_time() +@pytest.mark.skipif(bad_pypy, reason="pypy 7.2.0 is buggy") async def test_fallback_when_no_hook_claims_it(capsys): async def well_behaved(): yield 42 @@ -264,6 +279,7 @@ async def awaits_after_yield(): assert "awaited during finalization" in capsys.readouterr().err +@pytest.mark.skipif(bad_pypy, reason="pypy 7.2.0 is buggy") def test_delegation_to_existing_hooks(): record = [] diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index fdc304c3ae..caef096ffa 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -1,6 +1,7 @@ import pytest import asyncio import contextvars +import sys import traceback import queue from functools import partial @@ -500,6 +501,10 @@ async def trio_main(in_host): assert end - start < DURATION / 2 +@pytest.mark.skipif( + sys.implementation.name == "pypy" and sys.pypy_version_info < (7, 3), + reason="PyPy 7.2 has a buggy implementation of async generator hooks", +) def test_guest_mode_asyncgens(): import sniffio, sys From 153ce13991e01fe32684b7c707f6b7b433e6b46a Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 06:23:54 +0000 Subject: [PATCH 11/18] Hopefully resolve coverage issues --- trio/_core/_run.py | 14 +++--- trio/_core/tests/test_asyncgen.py | 75 +++++++++++++++-------------- trio/_core/tests/test_guest_mode.py | 15 ++---- trio/_core/tests/test_run.py | 29 ++++++++++- trio/_core/tests/tutil.py | 11 +++++ trio/_util.py | 2 +- 6 files changed, 90 insertions(+), 56 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index dc9526fc2e..ea1424605a 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1308,7 +1308,10 @@ class Runner: # init task starting end-of-run asyncgen finalization. trailing_finalizer_asyncgens = attr.ib(factory=set) - prev_asyncgen_hooks = attr.ib(default=None) + prev_asyncgen_hooks = attr.ib(init=False) + + def __attrs_post_init__(self): + self.setup_asyncgen_hooks() def force_guest_tick_asap(self): if self.guest_tick_scheduled: @@ -1319,8 +1322,7 @@ def force_guest_tick_asap(self): def close(self): self.io_manager.close() self.entry_queue.close() - if self.prev_asyncgen_hooks is not None: - sys.set_asyncgen_hooks(*self.prev_asyncgen_hooks) + sys.set_asyncgen_hooks(*self.prev_asyncgen_hooks) if self.instruments: self.instrument("after_run") # This is where KI protection gets disabled, so we do it last @@ -1628,7 +1630,7 @@ def finalizer(agen): agen_name = name_asyncgen(agen) try: is_ours = not agen.ag_frame.f_locals.get("@trio_foreign_asyncgen") - except AttributeError: + except AttributeError: # pragma: no cover is_ours = True if is_ours: @@ -1760,7 +1762,7 @@ async def init(self, async_fn, args): # Process all pending run_sync_soon callbacks, in case one of # them was an asyncgen finalizer that snuck in under the wire. self.entry_queue.run_sync_soon(self.reschedule, self.init_task) - await wait_task_rescheduled(lambda _: Abort.FAILED) + await wait_task_rescheduled(lambda _: Abort.FAILED) # pragma: no cover self.asyncgens.update(self.trailing_finalizer_asyncgens) self.trailing_finalizer_asyncgens.clear() @@ -2059,8 +2061,6 @@ def setup_runner(clock, instruments, restrict_keyboard_interrupt_to_checkpoints) # particular before we start modifying global state like GLOBAL_RUN_CONTEXT ki_manager.install(runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints) - runner.setup_asyncgen_hooks() - GLOBAL_RUN_CONTEXT.runner = runner return runner diff --git a/trio/_core/tests/test_asyncgen.py b/trio/_core/tests/test_asyncgen.py index efc3d562fe..829eb23db9 100644 --- a/trio/_core/tests/test_asyncgen.py +++ b/trio/_core/tests/test_asyncgen.py @@ -5,15 +5,7 @@ from functools import partial from async_generator import aclosing from ... import _core -from .tutil import gc_collect_harder - - -# PyPy 7.2 was released with a bug that just never called firstiter at -# all. This impacts tests of end-of-run finalization (nothing gets -# added to runner.asyncgens) and tests of "foreign" async generator -# behavior (since the firstiter hook is what marks the asyncgen as -# foreign), but most tests of GC-mediated finalization still work. -bad_pypy = sys.implementation.name == "pypy" and sys.pypy_version_info < (7, 3) +from .tutil import gc_collect_harder, buggy_pypy_asyncgens def test_asyncgen_basics(): @@ -48,18 +40,14 @@ async def async_main(): with pytest.warns( ResourceWarning, match="Async generator.*collected before.*exhausted", ): - async for val in example("abandoned"): - assert val == 42 - break + assert 42 == await example("abandoned").asend(None) gc_collect_harder() await _core.wait_all_tasks_blocked() assert collected.pop() == "abandoned" # aclosing() ensures it's cleaned up at point of use async with aclosing(example("exhausted 1")) as aiter: - async for val in aiter: - assert val == 42 - break + assert 42 == await aiter.asend(None) assert collected.pop() == "exhausted 1" # Also fine if you exhaust it at point of use @@ -72,9 +60,7 @@ async def async_main(): # No problems saving the geniter when using either of these patterns async with aclosing(example("exhausted 3")) as aiter: saved.append(aiter) - async for val in aiter: - assert val == 42 - break + assert 42 == await aiter.asend(None) assert collected.pop() == "exhausted 3" # Also fine if you exhaust it at point of use @@ -84,13 +70,11 @@ async def async_main(): assert collected.pop() == "exhausted 4" # Leave one referenced-but-unexhausted and make sure it gets cleaned up - if bad_pypy: + if buggy_pypy_asyncgens: collected.append("outlived run") else: saved.append(example("outlived run")) - async for val in saved[-1]: - assert val == 42 - break + assert 42 == await saved[-1].asend(None) assert collected == [] _core.run(async_main) @@ -99,7 +83,28 @@ async def async_main(): assert agen.ag_frame is None # all should now be exhausted -@pytest.mark.skipif(bad_pypy, reason="pypy 7.2.0 is buggy") +async def test_asyncgen_throws_during_finalization(caplog): + record = [] + + async def agen(): + try: + yield 1 + finally: + await _core.cancel_shielded_checkpoint() + record.append("crashing") + raise ValueError("oops") + + await agen().asend(None) + gc_collect_harder() + await _core.wait_all_tasks_blocked() + assert record == ["crashing"] + exc_type, exc_value, exc_traceback = caplog.records[0].exc_info + assert exc_type is ValueError + assert str(exc_value) == "oops" + assert "during finalization of async generator" in caplog.records[0].message + + +@pytest.mark.skipif(buggy_pypy_asyncgens, reason="pypy 7.2.0 is buggy") def test_firstiter_after_closing(): saved = [] record = [] @@ -114,8 +119,7 @@ async def funky_agen(): yield 2 finally: record.append("cleanup 2") - async for _ in funky_agen(): - break + await funky_agen().asend(None) async def async_main(): aiter = funky_agen() @@ -127,7 +131,7 @@ async def async_main(): assert record == ["cleanup 2", "cleanup 1"] -@pytest.mark.skipif(bad_pypy, reason="pypy 7.2.0 is buggy") +@pytest.mark.skipif(buggy_pypy_asyncgens, reason="pypy 7.2.0 is buggy") def test_interdependent_asyncgen_cleanup_order(): saved = [] record = [] @@ -159,9 +163,7 @@ async def async_main(): for idx in range(100): ag_chain = agen(idx, ag_chain) saved.append(ag_chain) - async for val in ag_chain: - assert val == 1 - break + assert 1 == await ag_chain.asend(None) assert record == [] _core.run(async_main) @@ -191,7 +193,7 @@ def collect_at_opportune_moment(token): else: try: token.run_sync_soon(collect_at_opportune_moment, token) - except _core.RunFinishedError: + except _core.RunFinishedError: # pragma: no cover nonlocal needs_retry needs_retry = True @@ -199,8 +201,7 @@ async def async_main(): token = _core.current_trio_token() token.run_sync_soon(collect_at_opportune_moment, token) saved.append(agen()) - async for _ in saved[-1]: - break + await saved[-1].asend(None) # Actually running into the edge case requires that the run_sync_soon task # execute in between the system nursery's closure and the strong-ification @@ -213,13 +214,13 @@ async def async_main(): del record[:] del saved[:] _core.run(async_main) - if needs_retry: - if not bad_pypy: + if needs_retry: # pragma: no cover + if not buggy_pypy_asyncgens: assert record == ["cleaned up"] else: assert record == ["final collection", "done", "cleaned up"] break - else: + else: # pragma: no cover pytest.fail( f"Didn't manage to hit the trailing_finalizer_asyncgens case " f"despite trying {attempt} times" @@ -251,7 +252,7 @@ def abort_fn(_): nursery.cancel_scope.deadline = _core.current_time() -@pytest.mark.skipif(bad_pypy, reason="pypy 7.2.0 is buggy") +@pytest.mark.skipif(buggy_pypy_asyncgens, reason="pypy 7.2.0 is buggy") async def test_fallback_when_no_hook_claims_it(capsys): async def well_behaved(): yield 42 @@ -279,7 +280,7 @@ async def awaits_after_yield(): assert "awaited during finalization" in capsys.readouterr().err -@pytest.mark.skipif(bad_pypy, reason="pypy 7.2.0 is buggy") +@pytest.mark.skipif(buggy_pypy_asyncgens, reason="pypy 7.2.0 is buggy") def test_delegation_to_existing_hooks(): record = [] diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index caef096ffa..381d966f81 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -13,7 +13,7 @@ import trio import trio.testing -from .tutil import gc_collect_harder +from .tutil import gc_collect_harder, buggy_pypy_asyncgens from ..._util import signal_raise # The simplest possible "host" loop. @@ -501,12 +501,9 @@ async def trio_main(in_host): assert end - start < DURATION / 2 -@pytest.mark.skipif( - sys.implementation.name == "pypy" and sys.pypy_version_info < (7, 3), - reason="PyPy 7.2 has a buggy implementation of async generator hooks", -) +@pytest.mark.skipif(buggy_pypy_asyncgens, reason="PyPy 7.2 is buggy") def test_guest_mode_asyncgens(): - import sniffio, sys + import sniffio record = set() @@ -525,8 +522,7 @@ async def agen(label): async def iterate_in_aio(): # "trio" gets inherited from our Trio caller if we don't set this sniffio.current_async_library_cvar.set("asyncio") - async for _ in agen("asyncio"): - break + await agen("asyncio").asend(None) async def trio_main(): task = asyncio.ensure_future(iterate_in_aio()) @@ -535,8 +531,7 @@ async def trio_main(): with trio.fail_after(1): await done_evt.wait() - async for _ in agen("trio"): - break + await agen("trio").asend(None) gc_collect_harder() diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index e6f6280080..0bdd46ae44 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -20,6 +20,7 @@ check_sequence_matches, gc_collect_harder, ignore_coroutine_never_awaited_warnings, + buggy_pypy_asyncgens, ) from ... import _core @@ -529,7 +530,6 @@ async def main(): assert str(exc_value) == "oops" assert "Instrument has been disabled" in caplog.records[0].message - async def test_cancel_scope_repr(mock_clock): scope = _core.CancelScope() assert "unbound" in repr(scope) @@ -1526,6 +1526,33 @@ def cb(i): assert counter[0] == COUNT +@pytest.mark.skipif(buggy_pypy_asyncgens, reason="PyPy 7.2 is buggy") +def test_TrioToken_run_sync_soon_late_crash(): + # Crash after system nursery is closed -- easiest way to do that is + # from an async generator finalizer. + record = [] + saved = [] + + async def agen(): + token = _core.current_trio_token() + try: + yield 1 + finally: + token.run_sync_soon(lambda: {}["nope"]) + token.run_sync_soon(lambda: record.append("2nd ran")) + + async def main(): + saved.append(agen()) + await saved[-1].asend(None) + record.append("main exiting") + + with pytest.raises(_core.TrioInternalError) as excinfo: + _core.run(main) + + assert type(excinfo.value.__cause__) is KeyError + assert record == ["main exiting", "2nd ran"] + + async def test_slow_abort_basic(): with _core.CancelScope() as scope: scope.cancel() diff --git a/trio/_core/tests/tutil.py b/trio/_core/tests/tutil.py index b569371482..84feabf6d1 100644 --- a/trio/_core/tests/tutil.py +++ b/trio/_core/tests/tutil.py @@ -7,12 +7,23 @@ from contextlib import contextmanager import gc +import sys # See trio/tests/conftest.py for the other half of this from trio.tests.conftest import RUN_SLOW slow = pytest.mark.skipif(not RUN_SLOW, reason="use --run-slow to run slow tests",) +# PyPy 7.2 was released with a bug that just never called the async +# generator 'firstiter' hook at all. This impacts tests of end-of-run +# finalization (nothing gets added to runner.asyncgens) and tests of +# "foreign" async generator behavior (since the firstiter hook is what +# marks the asyncgen as foreign), but most tests of GC-mediated +# finalization still work. +buggy_pypy_asyncgens = ( + sys.implementation.name == "pypy" and sys.pypy_version_info < (7, 3) +) + try: s = stdlib_socket.socket(stdlib_socket.AF_INET6, stdlib_socket.SOCK_STREAM, 0) except OSError: # pragma: no cover diff --git a/trio/_util.py b/trio/_util.py index 939eaad332..1ebd0081f9 100644 --- a/trio/_util.py +++ b/trio/_util.py @@ -357,7 +357,7 @@ def name_asyncgen(agen): """Return the fully-qualified name of the async generator function that produced the async generator iterator *agen*. """ - if not hasattr(agen, "ag_code"): + if not hasattr(agen, "ag_code"): # pragma: no cover return repr(agen) try: module = agen.ag_frame.f_globals["__name__"] From ebaf69cb1a7e1196439ed78003c94f18c0bf4017 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 06:29:49 +0000 Subject: [PATCH 12/18] blacken --- trio/_core/tests/test_run.py | 1 + trio/_core/tests/tutil.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 0bdd46ae44..82e251b652 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -530,6 +530,7 @@ async def main(): assert str(exc_value) == "oops" assert "Instrument has been disabled" in caplog.records[0].message + async def test_cancel_scope_repr(mock_clock): scope = _core.CancelScope() assert "unbound" in repr(scope) diff --git a/trio/_core/tests/tutil.py b/trio/_core/tests/tutil.py index 84feabf6d1..92c5ef2c66 100644 --- a/trio/_core/tests/tutil.py +++ b/trio/_core/tests/tutil.py @@ -20,8 +20,9 @@ # "foreign" async generator behavior (since the firstiter hook is what # marks the asyncgen as foreign), but most tests of GC-mediated # finalization still work. -buggy_pypy_asyncgens = ( - sys.implementation.name == "pypy" and sys.pypy_version_info < (7, 3) +buggy_pypy_asyncgens = sys.implementation.name == "pypy" and sys.pypy_version_info < ( + 7, + 3, ) try: From 936ccdb549b6381f46d8ba96fa5f314d27c6ddbb Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 08:52:16 +0000 Subject: [PATCH 13/18] Add docs and newsfragment --- docs/source/conf.py | 0 docs/source/reference-core.rst | 174 +++++++++++++++++++++++++++++++++ newsfragments/265.headline.rst | 6 ++ trio/_core/tests/tutil.py | 8 +- 4 files changed, 185 insertions(+), 3 deletions(-) mode change 100644 => 100755 docs/source/conf.py create mode 100644 newsfragments/265.headline.rst diff --git a/docs/source/conf.py b/docs/source/conf.py old mode 100644 new mode 100755 diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index b5f6d7c4d8..477a393f75 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1460,6 +1460,180 @@ don't have any special access to Trio's internals.) :members: +.. _async-generators: + +Notes on async generators +------------------------- + +Python 3.6 added support for *async generators*, which can use +``await``, ``async for``, and ``async with`` in between their ``yield`` +statements. As you might expect, you use ``async for`` to iterate +over them. :pep:`525` has many more details if you want them. + +For example, the following is a roundabout way to print +the numbers 0 through 9 with a 1-second delay before each one:: + + async def range_slowly(*args): + """Like range(), but adds a 1-second sleep before each value.""" + for value in range(*args): + await trio.sleep(1) + yield value + + async def use_it(): + async for value in range_slowly(10): + print(value) + + trio.run(use_it) + +Trio supports async generators, with some caveats described in this section. + +Finalization +~~~~~~~~~~~~ + +If you iterate over an async generator in its entirety, like the +example above does, then the execution of the async generator will +occur completely in the context of the code that's iterating over it, +and there aren't too many surprises. + +If you abandon a partially-completed async generator, though, such as +by ``break``\ing out of the iteration, things aren't so simple. The +async generator iterator object is still alive, waiting for you to +resume iterating it so it can produce more values. At some point, +Python will realize that you've dropped all references to the +iterator, and will call on Trio to help with executing any remaining +cleanup code inside the generator: ``finally`` blocks, ``__aexit__`` +handlers, and so on. + +So far, so good. Unfortunately, Python provides no guarantees about +*when* this happens. It could be as soon as you break out of the +``async for`` loop, or an arbitrary amount of time later. It could +even be after the entire Trio run has finished! Just about the only +guarantee is that it *won't* happen in the task that was using the +generator. That task will continue on with whatever else it's doing, +and the async generator cleanup will happen "sometime later, +somewhere else". + +If you don't like that ambiguity, and you want to ensure that a +generator's ``finally`` blocks and ``__aexit__`` handlers execute as +soon as you're done using it, then you'll need to wrap your use of the +generator in something like `async_generator.aclosing() +`__:: + + # Instead of this: + async for value in my_generator(): + if value == 42: + break + + # Do this: + async with aclosing(my_generator()) as aiter: + async for value in aiter: + if value == 42: + break + +This is cumbersome, but Python unfortunately doesn't provide any other +reliable options. If you use ``aclosing()``, then +your generator's cleanup code executes in the same context as the +rest of its iterations, so timeouts, exceptions, and context +variables work like you'd expect. + +If you don't use ``aclosing()``, then Trio will do +its best anyway, but you'll have to contend with the following semantics: + +* The cleanup of the generator occurs in a cancelled context, i.e., + all blocking calls executed during cleanup will raise `Cancelled`. + This is to compensate for the fact that any timeouts surrounding + the original use of the generator have been long since forgotten. + +* The cleanup runs without access to any :ref:`context variables + ` that may have been present when the generator + was originally being used. + +* If the generator raises an exception during cleanup, then it's + printed to the ``trio.async_generator_errors`` logger and otherwise + ignored. + +* If an async generator is still alive at the end of the whole + call to :func:`trio.run`, then it will be cleaned up after all + tasks have exited and before :func:`trio.run` returns. + Since the "system nursery" has already been closed at this point, + Trio isn't able to support any new calls to + :func:`trio.lowlevel.spawn_system_task`. + +If you plan to run your code on PyPy to take advantage of its better +performance, you should be aware that PyPy is *far more likely* than +CPython to perform async generator cleanup at a time well after the +last use of the generator. (This is a consequence of the fact that +PyPy does not use reference counting to manage memory.) To help catch +issues like this, Trio will issue a `ResourceWarning` (ignored by +default, but enabled when running under ``python -X dev`` for example) +for each async generator that needs to be handled through the fallback +finalization path. + +Cancel scopes and nurseries +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**You may not write a ``yield`` statement that suspends an async generator +inside a `CancelScope` or `Nursery` that was entered within the generator.** + +That is, this is OK:: + + async def some_agen(): + with trio.move_on_after(1): + await long_operation() + yield "first" + async with trio.open_nursery() as nursery: + nursery.start_soon(task1) + nursery.start_soon(task2) + yield "second" + ... + +But this is not:: + + async def some_agen(): + with trio.move_on_after(1): + yield "first" + async with trio.open_nursery() as nursery: + yield "second" + ... + +Async generators decorated with ``@asynccontextmanager`` to serve as +the template for an async context manager are *not* subject to this +constraint, because ``@asynccontextmanager`` uses them in a limited +way that doesn't create problems. + +Violating the rule described in this section will sometimes get you a +useful error message, but Trio is not able to detect all such cases, +so sometimes you'll get an unhelpful `TrioInternalError`. (And +sometimes it will seem to work, which is probably the worst outcome of +all, since then you might not notice the issue until you perform some +minor refactoring of the generator or the code that's iterating it, or +just get unlucky. There is a `proposed Python enhancement +`__ +that would at least make it fail consistently.) + +The reason for the restriction on cancel scopes has to do with the +difficulty of noticing when a generator gets suspended and +resumed. The cancel scopes inside the generator shouldn't affect code +running outside the generator, but Trio isn't involved in the process +of exiting and reentering the generator, so it would be hard pressed +to keep its cancellation plumbing in the correct state. Nurseries +use a cancel scope internally, so they have all the problems of cancel +scopes plus a number of problems of their own: for example, when +the generator is suspended, what should the background tasks do? +There's no good way to suspend them, but if they keep running and throw +an exception, where can that exception be reraised? + +If you have an async generator that wants to ``yield`` from within a nursery +or cancel scope, your best bet is to refactor it to be a separate task +that communicates over memory channels. + +For more discussion and some experimental partial workarounds, see +Trio issues `264 `__ +(especially `this comment +`__) +and `638 `__. + + .. _threads: Threads (if you must) diff --git a/newsfragments/265.headline.rst b/newsfragments/265.headline.rst new file mode 100644 index 0000000000..57a1ae30b4 --- /dev/null +++ b/newsfragments/265.headline.rst @@ -0,0 +1,6 @@ +Trio now supports automatic :ref:`async generator finalization +`, so more async generators will work even if you +don't wrap them in ``async with async_generator.aclosing():`` +blocks. Please see the documentation for important caveats; in +particular, yielding within a nursery or cancel scope remains +unsupported. diff --git a/trio/_core/tests/tutil.py b/trio/_core/tests/tutil.py index 92c5ef2c66..ee870958db 100644 --- a/trio/_core/tests/tutil.py +++ b/trio/_core/tests/tutil.py @@ -1,6 +1,7 @@ # Utilities for testing import socket as stdlib_socket import os +from typing import TYPE_CHECKING import pytest import warnings @@ -20,9 +21,10 @@ # "foreign" async generator behavior (since the firstiter hook is what # marks the asyncgen as foreign), but most tests of GC-mediated # finalization still work. -buggy_pypy_asyncgens = sys.implementation.name == "pypy" and sys.pypy_version_info < ( - 7, - 3, +buggy_pypy_asyncgens = ( + not TYPE_CHECKING + and sys.implementation.name == "pypy" + and sys.pypy_version_info < (7, 3) ) try: From 7d0fcd9a53c44983c7dd353f726e708782bcf605 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Thu, 25 Jun 2020 22:19:55 +0000 Subject: [PATCH 14/18] Fix formatting --- docs/source/reference-core.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 477a393f75..ffd546eaaf 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1572,8 +1572,8 @@ finalization path. Cancel scopes and nurseries ~~~~~~~~~~~~~~~~~~~~~~~~~~~ -**You may not write a ``yield`` statement that suspends an async generator -inside a `CancelScope` or `Nursery` that was entered within the generator.** +.. warning:: You may not write a ``yield`` statement that suspends an async generator + inside a `CancelScope` or `Nursery` that was entered within the generator. That is, this is OK:: From 477672897aa4d5e72b9df678ea92153537d33d5d Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Tue, 7 Jul 2020 21:39:10 +0000 Subject: [PATCH 15/18] Fix flake8 --- trio/_core/tests/tutil.py | 1 - 1 file changed, 1 deletion(-) diff --git a/trio/_core/tests/tutil.py b/trio/_core/tests/tutil.py index d6d768d0d4..4367338661 100644 --- a/trio/_core/tests/tutil.py +++ b/trio/_core/tests/tutil.py @@ -9,7 +9,6 @@ from contextlib import contextmanager import gc -import sys # See trio/tests/conftest.py for the other half of this from trio.tests.conftest import RUN_SLOW From 9868f25f746702f885eaf5388f05ddcd6a9ce115 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Fri, 24 Jul 2020 07:46:43 +0000 Subject: [PATCH 16/18] Respond to review comments; split asyncgens logic into a separate file --- docs/source/reference-core.rst | 10 +- trio/_core/_asyncgens.py | 191 ++++++++++++++++++++++++++++++ trio/_core/_run.py | 187 ++--------------------------- trio/_core/tests/test_asyncgen.py | 4 +- 4 files changed, 207 insertions(+), 185 deletions(-) create mode 100644 trio/_core/_asyncgens.py diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index e795088fbe..664a8d96c2 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1505,9 +1505,9 @@ by ``break``\ing out of the iteration, things aren't so simple. The async generator iterator object is still alive, waiting for you to resume iterating it so it can produce more values. At some point, Python will realize that you've dropped all references to the -iterator, and will call on Trio to help with executing any remaining -cleanup code inside the generator: ``finally`` blocks, ``__aexit__`` -handlers, and so on. +iterator, and will call on Trio to throw in a `GeneratorExit` exception +so that any remaining cleanup code inside the generator has a chance +to run: ``finally`` blocks, ``__aexit__`` handlers, and so on. So far, so good. Unfortunately, Python provides no guarantees about *when* this happens. It could be as soon as you break out of the @@ -1516,7 +1516,9 @@ even be after the entire Trio run has finished! Just about the only guarantee is that it *won't* happen in the task that was using the generator. That task will continue on with whatever else it's doing, and the async generator cleanup will happen "sometime later, -somewhere else". +somewhere else": potentially with different context variables, +not subject to timeouts, and/or after any nurseries you're using have +been closed. If you don't like that ambiguity, and you want to ensure that a generator's ``finally`` blocks and ``__aexit__`` handlers execute as diff --git a/trio/_core/_asyncgens.py b/trio/_core/_asyncgens.py new file mode 100644 index 0000000000..87f9d76904 --- /dev/null +++ b/trio/_core/_asyncgens.py @@ -0,0 +1,191 @@ +import attr +import logging +import sys +import warnings +import weakref + +from .._util import name_asyncgen +from . import _run +from .. import _core + +# Used to log exceptions in async generator finalizers +ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors") + + +@attr.s(eq=False, slots=True) +class AsyncGenerators: + # Async generators are added to this set when first iterated. Any + # left after the main task exits will be closed before trio.run() + # returns. During most of the run, this is a WeakSet so GC works. + # During shutdown, when we're finalizing all the remaining + # asyncgens after the system nursery has been closed, it's a + # regular set so we don't have to deal with GC firing at + # unexpected times. + alive = attr.ib(factory=weakref.WeakSet) + + # This collects async generators that get garbage collected during + # the one-tick window between the system nursery closing and the + # init task starting end-of-run asyncgen finalization. + trailing_needs_finalize = attr.ib(factory=set) + + prev_hooks = attr.ib(init=False) + + def install_hooks(self, runner): + def firstiter(agen): + if hasattr(_run.GLOBAL_RUN_CONTEXT, "task"): + self.alive.add(agen) + else: + # An async generator first iterated outside of a Trio + # task doesn't belong to Trio. Probably we're in guest + # mode and the async generator belongs to our host. + # The locals dictionary is the only good place to + # remember this fact, at least until + # https://bugs.python.org/issue40916 is implemented. + agen.ag_frame.f_locals["@trio_foreign_asyncgen"] = True + if self.prev_hooks.firstiter is not None: + self.prev_hooks.firstiter(agen) + + def finalize_in_trio_context(agen, agen_name): + try: + runner.spawn_system_task( + self._finalize_one, + agen, + agen_name, + name=f"close asyncgen {agen_name} (abandoned)", + ) + except RuntimeError: + # There is a one-tick window where the system nursery + # is closed but the init task hasn't yet made + # self.asyncgens a strong set to disable GC. We seem to + # have hit it. + self.trailing_needs_finalize.add(agen) + + def finalizer(agen): + agen_name = name_asyncgen(agen) + try: + is_ours = not agen.ag_frame.f_locals.get("@trio_foreign_asyncgen") + except AttributeError: # pragma: no cover + is_ours = True + + if is_ours: + runner.entry_queue.run_sync_soon( + finalize_in_trio_context, agen, agen_name + ) + + # Do this last, because it might raise an exception + # depending on the user's warnings filter. (That + # exception will be printed to the terminal and + # ignored, since we're running in GC context.) + warnings.warn( + f"Async generator {agen_name!r} was garbage collected before it " + f"had been exhausted. Surround its use in 'async with " + f"aclosing(...):' to ensure that it gets cleaned up as soon as " + f"you're done using it.", + ResourceWarning, + stacklevel=2, + source=agen, + ) + else: + # Not ours -> forward to the host loop's async generator finalizer + if self.prev_hooks.finalizer is not None: + self.prev_hooks.finalizer(agen) + else: + # Host has no finalizer. Reimplement the default + # Python behavior with no hooks installed: throw in + # GeneratorExit, step once, raise RuntimeError if + # it doesn't exit. + closer = agen.aclose() + try: + # If the next thing is a yield, this will raise RuntimeError + # which we allow to propagate + closer.send(None) + except StopIteration: + pass + else: + # If the next thing is an await, we get here. Give a nicer + # error than the default "async generator ignored GeneratorExit" + raise RuntimeError( + f"Non-Trio async generator {agen_name!r} awaited something " + f"during finalization; install a finalization hook to " + f"support this, or wrap it in 'async with aclosing(...):'" + ) + + self.prev_hooks = sys.get_asyncgen_hooks() + sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer) + + async def finalize_remaining(self, runner): + # This is called from init after shutting down the system nursery. + # The only tasks running at this point are init and + # the run_sync_soon task, and since the system nursery is closed, + # there's no way for user code to spawn more. + assert _core.current_task() is runner.init_task + assert len(runner.tasks) == 2 + + # To make async generator finalization easier to reason + # about, we'll shut down asyncgen garbage collection by turning + # the alive WeakSet into a regular set. + self.alive = set(self.alive) + + # Process all pending run_sync_soon callbacks, in case one of + # them was an asyncgen finalizer that snuck in under the wire. + runner.entry_queue.run_sync_soon(runner.reschedule, runner.init_task) + await _core.wait_task_rescheduled(lambda _: Abort.FAILED) # pragma: no cover + self.alive.update(self.trailing_needs_finalize) + self.trailing_needs_finalize.clear() + + # None of the still-living tasks use async generators, so + # every async generator must be suspended at a yield point -- + # there's no one to be doing the iteration. That's good, + # because aclose() only works on an asyncgen that's suspended + # at a yield point. (If it's suspended at an event loop trap, + # because someone is in the middle of iterating it, then you + # get a RuntimeError on 3.8+, and a nasty surprise on earlier + # versions due to https://bugs.python.org/issue32526.) + # + # However, once we start aclose() of one async generator, it + # might start fetching the next value from another, thus + # preventing us from closing that other (at least until + # aclose() of the first one is complete). This constraint + # effectively requires us to finalize the remaining asyncgens + # in arbitrary order, rather than doing all of them at the + # same time. On 3.8+ we could defer any generator with + # ag_running=True to a later batch, but that only catches + # the case where our aclose() starts after the user's + # asend()/etc. If our aclose() starts first, then the + # user's asend()/etc will raise RuntimeError, since they're + # probably not checking ag_running. + # + # It might be possible to allow some parallelized cleanup if + # we can determine that a certain set of asyncgens have no + # interdependencies, using gc.get_referents() and such. + # But just doing one at a time will typically work well enough + # (since each aclose() executes in a cancelled scope) and + # is much easier to reason about. + + # It's possible that that cleanup code will itself create + # more async generators, so we iterate repeatedly until + # all are gone. + while self.alive: + batch = self.alive + self.alive = set() + for agen in batch: + await self._finalize_one(agen, name_asyncgen(agen)) + + def close(self): + sys.set_asyncgen_hooks(*self.prev_hooks) + + async def _finalize_one(self, agen, name): + try: + # This shield ensures that finalize_asyncgen never exits + # with an exception, not even a Cancelled. The inside + # is cancelled so there's no deadlock risk. + with _core.CancelScope(shield=True) as cancel_scope: + cancel_scope.cancel() + await agen.aclose() + except BaseException: + ASYNCGEN_LOGGER.exception( + "Exception ignored during finalization of async generator %r -- " + "surround your use of the generator in 'async with aclosing(...):' " + "to raise exceptions like this in the context where they're generated", + name, + ) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 0c995788b2..d127dd54e6 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -43,10 +43,11 @@ PermanentlyDetachCoroutineObject, WaitTaskRescheduled, ) +from ._asyncgens import AsyncGenerators from ._thread_cache import start_thread_soon from .. import _core from .._deprecate import warn_deprecated -from .._util import Final, NoPublicConstructor, coroutine_or_error, name_asyncgen +from .._util import Final, NoPublicConstructor, coroutine_or_error DEADLINE_HEAP_MIN_PRUNE_THRESHOLD = 1000 @@ -67,9 +68,8 @@ def _public(fn): _ALLOW_DETERMINISTIC_SCHEDULING = False _r = random.Random() -# Used to log exceptions in instruments and async generator finalizers +# Used to log exceptions in instruments INSTRUMENT_LOGGER = logging.getLogger("trio.abc.Instrument") -ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors") # On 3.7+, Context.run() is implemented in C and doesn't show up in @@ -1288,6 +1288,7 @@ class Runner: entry_queue = attr.ib(factory=EntryQueue) trio_token = attr.ib(default=None) + asyncgens = attr.ib(factory=AsyncGenerators) # If everything goes idle for this long, we call clock._autojump() clock_autojump_threshold = attr.ib(default=inf) @@ -1296,24 +1297,8 @@ class Runner: is_guest = attr.ib(default=False) guest_tick_scheduled = attr.ib(default=False) - # Async generators are added to this set when first iterated. Any - # left after the main task exits will be closed before trio.run() - # returns. During most of the run, this is a WeakSet so GC works. - # During shutdown, when we're finalizign all the remaining - # asyncgens after the system nursery has been closed, it's a - # regular set so we don't have to deal with GC firing at - # unexpected times. - asyncgens = attr.ib(factory=weakref.WeakSet) - - # This collects async generators that get garbage collected during - # the one-tick window between the system nursery closing and the - # init task starting end-of-run asyncgen finalization. - trailing_finalizer_asyncgens = attr.ib(factory=set) - - prev_asyncgen_hooks = attr.ib(init=False) - def __attrs_post_init__(self): - self.setup_asyncgen_hooks() + self.asyncgens.install_hooks(self) def force_guest_tick_asap(self): if self.guest_tick_scheduled: @@ -1324,7 +1309,7 @@ def force_guest_tick_asap(self): def close(self): self.io_manager.close() self.entry_queue.close() - sys.set_asyncgen_hooks(*self.prev_asyncgen_hooks) + self.asyncgens.close() if self.instruments: self.instrument("after_run") # This is where KI protection gets disabled, so we do it last @@ -1540,147 +1525,6 @@ def task_exited(self, task, outcome): # Async generator finalization support ################ - async def finalize_asyncgen(self, agen, name): - try: - # This shield ensures that finalize_asyncgen never exits - # with an exception, not even a Cancelled. The inside - # is cancelled so there's no deadlock risk. - with CancelScope(shield=True) as cancel_scope: - cancel_scope.cancel() - await agen.aclose() - except BaseException: - ASYNCGEN_LOGGER.exception( - "Exception ignored during finalization of async generator %r -- " - "surround your use of the generator in 'async with aclosing(...):' " - "to raise exceptions like this in the context where they're generated", - name, - ) - - async def finalize_remaining_asyncgens(self): - # At the time this function is called, there are exactly two - # tasks running: init and the run_sync_soon task. (And we've - # shut down the system nursery, so no more can appear.) - # Neither one uses async generators, so every async generator - # must be suspended at a yield point -- there's no one to be - # doing the iteration. That's good, because aclose() only - # works on an asyncgen that's suspended at a yield point. - # (If it's suspended at an event loop trap, because someone - # is in the middle of iterating it, then you get a RuntimeError - # on 3.8+, and a nasty surprise on earlier versions due to - # https://bugs.python.org/issue32526.) - # - # However, once we start aclose() of one async generator, it - # might start fetching the next value from another, thus - # preventing us from closing that other (at least until - # aclose() of the first one is complete). This constraint - # effectively requires us to finalize the remaining asyncgens - # in arbitrary order, rather than doing all of them at the - # same time. On 3.8+ we could defer any generator with - # ag_running=True to a later batch, but that only catches - # the case where our aclose() starts after the user's - # asend()/etc. If our aclose() starts first, then the - # user's asend()/etc will raise RuntimeError, since they're - # probably not checking ag_running. - # - # It might be possible to allow some parallelized cleanup if - # we can determine that a certain set of asyncgens have no - # interdependencies, using gc.get_referents() and such. - # But just doing one at a time will typically work well enough - # (since each aclose() executes in a cancelled scope) and - # is much easier to reason about. - - # It's possible that that cleanup code will itself create - # more async generators, so we iterate repeatedly until - # all are gone. - while self.asyncgens: - batch = self.asyncgens - self.asyncgens = set() - for agen in batch: - await self.finalize_asyncgen(agen, name_asyncgen(agen)) - - def setup_asyncgen_hooks(self): - def firstiter(agen): - if hasattr(GLOBAL_RUN_CONTEXT, "task"): - self.asyncgens.add(agen) - else: - # An async generator first iterated outside of a Trio - # task doesn't belong to Trio. Probably we're in guest - # mode and the async generator belongs to our host. - # The locals dictionary is the only good place to - # remember this fact, at least until - # https://bugs.python.org/issue40916 is implemented. - agen.ag_frame.f_locals["@trio_foreign_asyncgen"] = True - if self.prev_asyncgen_hooks.firstiter is not None: - self.prev_asyncgen_hooks.firstiter(agen) - - def finalize_in_trio_context(agen, agen_name): - try: - self.spawn_system_task( - self.finalize_asyncgen, - agen, - agen_name, - name=f"close asyncgen {agen_name} (abandoned)", - ) - except RuntimeError: - # There is a one-tick window where the system nursery - # is closed but the init task hasn't yet made - # self.asyncgens a strong set to disable GC. We seem to - # have hit it. - self.trailing_finalizer_asyncgens.add(agen) - - def finalizer(agen): - agen_name = name_asyncgen(agen) - try: - is_ours = not agen.ag_frame.f_locals.get("@trio_foreign_asyncgen") - except AttributeError: # pragma: no cover - is_ours = True - - if is_ours: - self.entry_queue.run_sync_soon( - finalize_in_trio_context, agen, agen_name - ) - - # Do this last, because it might raise an exception - # depending on the user's warnings filter. (That - # exception will be printed to the terminal and - # ignored, since we're running in GC context.) - warnings.warn( - f"Async generator {agen_name!r} was garbage collected before it " - f"had been exhausted. Surround its use in 'async with " - f"aclosing(...):' to ensure that it gets cleaned up as soon as " - f"you're done using it.", - ResourceWarning, - stacklevel=2, - source=agen, - ) - else: - # Not ours -> forward to the host loop's async generator finalizer - if self.prev_asyncgen_hooks.finalizer is not None: - self.prev_asyncgen_hooks.finalizer(agen) - else: - # Host has no finalizer. Reimplement the default - # Python behavior with no hooks installed: throw in - # GeneratorExit, step once, raise RuntimeError if - # it doesn't exit. - closer = agen.aclose() - try: - # If the next thing is a yield, this will raise RuntimeError - # which we allow to propagate - closer.send(None) - except StopIteration: - pass - else: - # If the next thing is an await, we get here. Give a nicer - # error than the default "async generator ignored GeneratorExit" - raise RuntimeError( - f"async generator {agen_name!r} awaited during " - f"finalization; install a finalization hook to support " - f"this, or wrap it in 'async with aclosing(...):'" - ) - - self.prev_asyncgen_hooks = sys.get_asyncgen_hooks() - sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer) - ################ # System tasks and init ################ @@ -1754,23 +1598,8 @@ async def init(self, async_fn, args): # Main task is done; start shutting down system tasks self.system_nursery.cancel_scope.cancel() - # The only tasks running at this point are init (this one) - # and the run_sync_soon task. We still need to finalize - # remaining async generators. To make that easier to reason - # about, we'll shut down their garbage collection by turning - # the asyncgens WeakSet into a regular set. - self.asyncgens = set(self.asyncgens) - - # Process all pending run_sync_soon callbacks, in case one of - # them was an asyncgen finalizer that snuck in under the wire. - self.entry_queue.run_sync_soon(self.reschedule, self.init_task) - await wait_task_rescheduled(lambda _: Abort.FAILED) # pragma: no cover - self.asyncgens.update(self.trailing_finalizer_asyncgens) - self.trailing_finalizer_asyncgens.clear() - - # The only async-colored user code left to run is the - # finalizers for the async generators that remain alive. - await self.finalize_remaining_asyncgens() + # System nursery is closed; finalize remaining async generators + await self.asyncgens.finalize_remaining(self) # There are no more asyncgens, which means no more user-provided # code except possibly run_sync_soon callbacks. It's finally safe diff --git a/trio/_core/tests/test_asyncgen.py b/trio/_core/tests/test_asyncgen.py index 829eb23db9..dd15da9bfd 100644 --- a/trio/_core/tests/test_asyncgen.py +++ b/trio/_core/tests/test_asyncgen.py @@ -184,7 +184,7 @@ async def agen(): def collect_at_opportune_moment(token): runner = _core._run.GLOBAL_RUN_CONTEXT.runner if runner.system_nursery._closed and isinstance( - runner.asyncgens, weakref.WeakSet + runner.asyncgens.alive, weakref.WeakSet ): saved.clear() record.append("final collection") @@ -277,7 +277,7 @@ async def awaits_after_yield(): await step_outside_async_context(awaits_after_yield()) gc_collect_harder() - assert "awaited during finalization" in capsys.readouterr().err + assert "awaited something during finalization" in capsys.readouterr().err @pytest.mark.skipif(buggy_pypy_asyncgens, reason="pypy 7.2.0 is buggy") From 2af69745fcf015098fa14069719b336c9318b11a Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Fri, 24 Jul 2020 07:49:39 +0000 Subject: [PATCH 17/18] Fix mypy --- trio/_core/_asyncgens.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/trio/_core/_asyncgens.py b/trio/_core/_asyncgens.py index 87f9d76904..cb88a1d57b 100644 --- a/trio/_core/_asyncgens.py +++ b/trio/_core/_asyncgens.py @@ -129,7 +129,9 @@ async def finalize_remaining(self, runner): # Process all pending run_sync_soon callbacks, in case one of # them was an asyncgen finalizer that snuck in under the wire. runner.entry_queue.run_sync_soon(runner.reschedule, runner.init_task) - await _core.wait_task_rescheduled(lambda _: Abort.FAILED) # pragma: no cover + await _core.wait_task_rescheduled( + lambda _: _core.Abort.FAILED # pragma: no cover + ) self.alive.update(self.trailing_needs_finalize) self.trailing_needs_finalize.clear() From 04419520bedc430d13f8421343c5d32d64455190 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 5 Aug 2020 02:09:27 +0000 Subject: [PATCH 18/18] Review responses --- trio/_core/_run.py | 8 +------- trio/_core/tests/test_instrumentation.py | 1 - 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index f004f4cf39..55cc4a159d 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1301,9 +1301,6 @@ class Runner: is_guest = attr.ib(default=False) guest_tick_scheduled = attr.ib(default=False) - def __attrs_post_init__(self): - self.asyncgens.install_hooks(self) - def force_guest_tick_asap(self): if self.guest_tick_scheduled: return @@ -1525,10 +1522,6 @@ def task_exited(self, task, outcome): if "task_exited" in self.instruments: self.instruments.call("task_exited", task) - ################ - # Async generator finalization support - ################ - ################ # System tasks and init ################ @@ -1833,6 +1826,7 @@ def setup_runner(clock, instruments, restrict_keyboard_interrupt_to_checkpoints) system_context=system_context, ki_manager=ki_manager, ) + runner.asyncgens.install_hooks(runner) # This is where KI protection gets enabled, so we want to do it early - in # particular before we start modifying global state like GLOBAL_RUN_CONTEXT diff --git a/trio/_core/tests/test_instrumentation.py b/trio/_core/tests/test_instrumentation.py index 946caf7965..57d3461d3b 100644 --- a/trio/_core/tests/test_instrumentation.py +++ b/trio/_core/tests/test_instrumentation.py @@ -73,7 +73,6 @@ async def main(): + [("before", task), ("schedule", task), ("after", task)] * 5 + [("before", task), ("after", task), ("after_run",)] ) - assert len(r1.record) > len(r2.record) > len(r3.record) assert r1.record == r2.record + r3.record assert list(r1.filter_tasks([task])) == expected