diff --git a/docs/source/history.rst b/docs/source/history.rst index 3c460e5763..09c6fae46a 100644 --- a/docs/source/history.rst +++ b/docs/source/history.rst @@ -136,6 +136,9 @@ will work but complain loudly, and won't work in 0.3.0): ``run_in_worker_thread`` → ``run_sync_in_worker_thread`` ``nursery.spawn`` → ``nursery.start_soon`` +``current_call_soon_thread_and_signal_safe`` → :class:`trio.hazmat.TrioToken` +``run_in_trio_thread``, ``await_in_trio_thread`` → :class:`trio.BlockingTrioPortal` + deprecated big chunks of nursery and Task API diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 56ea434743..fbfaf8bddc 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1314,9 +1314,8 @@ for working with real, operating-system level, :mod:`threading`\-module-style threads. First, if you're in Trio but need to push some blocking I/O into a thread, there's :func:`run_sync_in_worker_thread`. And if you're in a thread and need -to communicate back with trio, there's the closely related -:func:`current_run_in_trio_thread` and -:func:`current_await_in_trio_thread`. +to communicate back with trio, you can use a +:class:`BlockingTrioPortal`. Trio's philosophy about managing worker threads @@ -1452,40 +1451,8 @@ Putting blocking I/O into worker threads Getting back into the trio thread from another thread ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. function:: current_run_in_trio_thread - current_await_in_trio_thread - - Call these from inside a trio run to get a reference to the current - run's :func:`run_in_trio_thread` or :func:`await_in_trio_thread`: - - .. function:: run_in_trio_thread(sync_fn, *args) - :module: - .. function:: await_in_trio_thread(async_fn, *args) - :module: - - These functions schedule a call to ``sync_fn(*args)`` or ``await - async_fn(*args)`` to happen in the main trio thread, wait for it to - complete, and then return the result or raise whatever exception it - raised. - - These are the *only* non-hazmat functions that interact with the - trio run loop and that can safely be called from a different thread - than the one that called :func:`trio.run`. These two functions - *must* be called from a different thread than the one that called - :func:`trio.run`. (After all, they're blocking functions!) - - .. warning:: - - If the relevant call to :func:`trio.run` finishes while a call - to ``await_in_trio_thread`` is in progress, then the call to - ``async_fn`` will be :ref:`cancelled ` and the - resulting :exc:`~trio.Cancelled` exception may propagate out of - ``await_in_trio_thread`` and into the calling thread. You should - be prepared for this. - - :raises RunFinishedError: If the corresponding call to - :func:`trio.run` has already completed. - +.. autoclass:: BlockingTrioPortal + :members: This will probably be clearer with an example. Here we demonstrate how to spawn a child thread, and then use a :class:`trio.Queue` to send @@ -1494,28 +1461,27 @@ messages between the thread and a trio task:: import trio import threading - def thread_fn(await_in_trio_thread, request_queue, response_queue): + def thread_fn(portal, request_queue, response_queue): while True: # Since we're in a thread, we can't call trio.Queue methods - # directly -- so we use await_in_trio_thread to call them. - request = await_in_trio_thread(request_queue.get) + # directly -- so we use our portal to call them. + request = portal.run(request_queue.get) # We use 'None' as a request to quit if request is not None: response = request + 1 - await_in_trio_thread(response_queue.put, response) + portal.run(response_queue.put, response) else: # acknowledge that we're shutting down, and then do it - await_in_trio_thread(response_queue.put, None) + portal.run(response_queue.put, None) return async def main(): - # Get a reference to the await_in_trio_thread function - await_in_trio_thread = trio.current_await_in_trio_thread() + portal = trio.BlockingTrioPortal() request_queue = trio.Queue(1) response_queue = trio.Queue(1) thread = threading.Thread( target=thread_fn, - args=(await_in_trio_thread, request_queue, response_queue)) + args=(portal, request_queue, response_queue)) thread.start() # prints "1" diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 565e2eacd9..2dbbce55c1 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -197,10 +197,13 @@ Global state: system tasks and run-local storage .. autofunction:: spawn_system_task -Entering trio from external threads or signal handlers -====================================================== +Trio tokens +=========== -.. autofunction:: current_call_soon_thread_and_signal_safe +.. autoclass:: TrioToken() + :members: + +.. autofunction:: current_trio_token Safer KeyboardInterrupt handling diff --git a/trio/__init__.py b/trio/__init__.py index b2e4fc2632..db6f84f491 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -111,6 +111,7 @@ # Having the public path in .__module__ attributes is important for: # - exception names in printed tracebacks # - sphinx :show-inheritance: +# - deprecation warnings # - pickle # - probably other stuff from ._util import fixup_module_metadata diff --git a/trio/_core/__init__.py b/trio/_core/__init__.py index b058d108e1..4406f526ff 100644 --- a/trio/_core/__init__.py +++ b/trio/_core/__init__.py @@ -27,6 +27,9 @@ def _public(fn): from ._run import * __all__ += _run.__all__ +from ._entry_queue import * +__all__ += _entry_queue.__all__ + from ._parking_lot import * __all__ += _parking_lot.__all__ diff --git a/trio/_core/_entry_queue.py b/trio/_core/_entry_queue.py new file mode 100644 index 0000000000..42996b6130 --- /dev/null +++ b/trio/_core/_entry_queue.py @@ -0,0 +1,196 @@ +from collections import deque +import threading + +import attr + +from .. import _core +from ._wakeup_socketpair import WakeupSocketpair + +__all__ = ["TrioToken"] + + +@attr.s +class EntryQueue: + # This used to use a queue.Queue. but that was broken, because Queues are + # implemented in Python, and not reentrant -- so it was thread-safe, but + # not signal-safe. deque is implemented in C, so each operation is atomic + # WRT threads (and this is guaranteed in the docs), AND each operation is + # atomic WRT signal delivery (signal handlers can run on either side, but + # not *during* a deque operation). dict makes similar guarantees - and on + # CPython 3.6 and PyPy, it's even ordered! + queue = attr.ib(default=attr.Factory(deque)) + idempotent_queue = attr.ib(default=attr.Factory(dict)) + + wakeup = attr.ib(default=attr.Factory(WakeupSocketpair)) + done = attr.ib(default=False) + # Must be a reentrant lock, because it's acquired from signal handlers. + # RLock is signal-safe as of cpython 3.2. NB that this does mean that the + # lock is effectively *disabled* when we enter from signal context. The + # way we use the lock this is OK though, because when + # run_sync_soon is called from a signal it's atomic WRT the + # main thread -- it just might happen at some inconvenient place. But if + # you look at the one place where the main thread holds the lock, it's + # just to make 1 assignment, so that's atomic WRT a signal anyway. + lock = attr.ib(default=attr.Factory(threading.RLock)) + + async def task(self): + assert _core.currently_ki_protected() + # RLock has two implementations: a signal-safe version in _thread, and + # and signal-UNsafe version in threading. We need the signal safe + # version. Python 3.2 and later should always use this anyway, but, + # since the symptoms if this goes wrong are just "weird rare + # deadlocks", then let's make a little check. + # See: + # https://bugs.python.org/issue13697#msg237140 + assert self.lock.__class__.__module__ == "_thread" + + def run_cb(job): + # We run this with KI protection enabled; it's the callback's + # job to disable it if it wants it disabled. Exceptions are + # treated like system task exceptions (i.e., converted into + # TrioInternalError and cause everything to shut down). + sync_fn, args = job + try: + sync_fn(*args) + except BaseException as exc: + + async def kill_everything(exc): + raise exc + + _core.spawn_system_task(kill_everything, exc) + return True + + # This has to be carefully written to be safe in the face of new items + # being queued while we iterate, and to do a bounded amount of work on + # each pass: + def run_all_bounded(): + for _ in range(len(self.queue)): + run_cb(self.queue.popleft()) + for job in list(self.idempotent_queue): + del self.idempotent_queue[job] + run_cb(job) + + try: + while True: + run_all_bounded() + if not self.queue and not self.idempotent_queue: + await self.wakeup.wait_woken() + else: + await _core.checkpoint() + except _core.Cancelled: + # Keep the work done with this lock held as minimal as possible, + # because it doesn't protect us against concurrent signal delivery + # (see the comment above). Notice that this code would still be + # correct if written like: + # self.done = True + # with self.lock: + # pass + # because all we want is to force run_sync_soon + # to either be completely before or completely after the write to + # done. That's why we don't need the lock to protect + # against signal handlers. + with self.lock: + self.done = True + # No more jobs will be submitted, so just clear out any residual + # ones: + run_all_bounded() + assert not self.queue + assert not self.idempotent_queue + + def close(self): + self.wakeup.close() + + def size(self): + return len(self.queue) + len(self.idempotent_queue) + + def spawn(self): + name = "" + _core.spawn_system_task(self.task, name=name) + + def run_sync_soon(self, sync_fn, *args, idempotent=False): + with self.lock: + if self.done: + raise _core.RunFinishedError("run() has exited") + # We have to hold the lock all the way through here, because + # otherwise the main thread might exit *while* we're doing these + # calls, and then our queue item might not be processed, or the + # wakeup call might trigger an OSError b/c the IO manager has + # already been shut down. + if idempotent: + self.idempotent_queue[(sync_fn, args)] = None + else: + self.queue.append((sync_fn, args)) + self.wakeup.wakeup_thread_and_signal_safe() + + +class TrioToken: + """An opaque object representing a single call to :func:`trio.run`. + + It has no public constructor; instead, see :func:`current_trio_token`. + + This object has two uses: + + 1. It lets you re-enter the Trio run loop from external threads or signal + handlers. This is the low-level primitive that + :func:`trio.run_sync_in_worker_thread` uses to receive results from + worker threads, that :func:`trio.catch_signals` uses to receive + notifications about signals, and so forth. + + 2. Each call to :func:`trio.run` has exactly one associated + :class:`TrioToken` object, so you can use it to identify a particular + call. + + """ + + def __init__(self, reentry_queue): + self._reentry_queue = reentry_queue + + def run_sync_soon(self, sync_fn, *args, idempotent=False): + """Schedule a call to ``sync_fn(*args)`` to occur in the context of a + trio task. + + This is safe to call from the main thread, from other threads, and + from signal handlers. This is the fundamental primitive used to + re-enter the Trio run loop from outside of it. + + The call will happen "soon", but there's no guarantee about exactly + when, and no mechanism provided for finding out when it's happened. + If you need this, you'll have to build your own. + + The call is effectively run as part of a system task (see + :func:`~trio.hazmat.spawn_system_task`). In particular this means + that: + + * :exc:`KeyboardInterrupt` protection is *enabled* by default; if + you want ``sync_fn`` to be interruptible by control-C, then you + need to use :func:`~trio.hazmat.disable_ki_protection` + explicitly. + + * If ``sync_fn`` raises an exception, then it's converted into a + :exc:`~trio.TrioInternalError` and *all* tasks are cancelled. You + should be careful that ``sync_fn`` doesn't crash. + + All calls with ``idempotent=False`` are processed in strict + first-in first-out order. + + If ``idempotent=True``, then ``sync_fn`` and ``args`` must be + hashable, and trio will make a best-effort attempt to discard any + call submission which is equal to an already-pending call. Trio + will make an attempt to process these in first-in first-out order, + but no guarantees. (Currently processing is FIFO on CPython 3.6 and + PyPy, but not CPython 3.5.) + + Any ordering guarantees apply separately to ``idempotent=False`` + and ``idempotent=True`` calls; there's no rule for how calls in the + different categories are ordered with respect to each other. + + :raises trio.RunFinishedError: + if the associated call to :func:`trio.run` + has already exited. (Any call that *doesn't* raise this error + is guaranteed to be fully processed before :func:`trio.run` + exits.) + + """ + self._reentry_queue.run_sync_soon( + sync_fn, *args, idempotent=idempotent + ) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 03c705a8f9..c0db198c2f 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -31,11 +31,11 @@ CancelShieldedCheckpoint, WaitTaskRescheduled, ) +from ._entry_queue import EntryQueue, TrioToken from ._ki import ( LOCALS_KEY_KI_PROTECTION_ENABLED, currently_ki_protected, ki_manager, enable_ki_protection ) -from ._wakeup_socketpair import WakeupSocketpair from . import _public # At the bottom of this file there's also some "clever" code that generates @@ -44,8 +44,8 @@ # namespaces. __all__ = [ "Task", "run", "open_nursery", "open_cancel_scope", "checkpoint", - "current_task", "current_effective_deadline", "checkpoint_if_cancelled", - "STATUS_IGNORED" + "current_call_soon_thread_and_signal_safe", "current_task", + "current_effective_deadline", "checkpoint_if_cancelled", "STATUS_IGNORED" ] GLOBAL_RUN_CONTEXT = threading.local() @@ -726,7 +726,12 @@ class _RunStatistics: tasks_runnable = attr.ib() seconds_to_next_deadline = attr.ib() io_statistics = attr.ib() - call_soon_queue_size = attr.ib() + run_sync_soon_queue_size = attr.ib() + + @property + @deprecated("0.2.0", issue=68, instead="run_sync_soon_queue_size") + def call_soon_queue_size(self): + return self.run_sync_soon_queue_size @attr.s(cmp=False, hash=False) @@ -751,9 +756,12 @@ class Runner: main_task = attr.ib(default=None) system_nursery = attr.ib(default=None) + entry_queue = attr.ib(default=attr.Factory(EntryQueue)) + trio_token = attr.ib(default=None) + def close(self): self.io_manager.close() - self.call_soon_wakeup.close() + self.entry_queue.close() self.instrument("after_run") # Methods marked with @_public get converted into functions exported by @@ -773,9 +781,9 @@ def current_statistics(self): pending cancel scope deadline. May be negative if the deadline has expired but we haven't yet processed cancellations. May be :data:`~math.inf` if there are no pending deadlines. - * ``call_soon_queue_size`` (int): The number of unprocessed callbacks - queued via - :func:`trio.hazmat.current_call_soon_thread_and_signal_safe`. + * ``run_sync_soon_queue_size`` (int): The number of + unprocessed callbacks queued via + :meth:`trio.hazmat.TrioToken.run_sync_soon`. * ``io_statistics`` (object): Some statistics from trio's I/O backend. This always has an attribute ``backend`` which is a string naming which operating-system-specific I/O backend is in use; the @@ -792,10 +800,7 @@ def current_statistics(self): tasks_runnable=len(self.runq), seconds_to_next_deadline=seconds_to_next_deadline, io_statistics=self.io_manager.statistics(), - call_soon_queue_size=( - len(self.call_soon_queue) + - len(self.call_soon_idempotent_queue) - ), + run_sync_soon_queue_size=self.entry_queue.size(), ) @_public @@ -1058,13 +1063,13 @@ def excfilter(exc): async def init(self, async_fn, args): async with open_nursery() as system_nursery: self.system_nursery = system_nursery - self.spawn_system_task( - self.call_soon_task, name="" - ) + + self.entry_queue.spawn() self.main_task = self.spawn_impl( async_fn, args, self.system_nursery, name=None ) + async for task_batch in system_nursery._monitor: for task in task_batch: if task is self.main_task: @@ -1074,165 +1079,18 @@ async def init(self, async_fn, args): system_nursery._reap_and_unwrap(task) ################ - # Outside Context Problems + # Outside context problems ################ - # XX factor this chunk into another file - - # This used to use a queue.Queue. but that was broken, because Queues are - # implemented in Python, and not reentrant -- so it was thread-safe, but - # not signal-safe. deque is implemented in C, so each operation is atomic - # WRT threads (and this is guaranteed in the docs), AND each operation is - # atomic WRT signal delivery (signal handlers can run on either side, but - # not *during* a deque operation). dict makes similar guarantees - and on - # CPython 3.6 and PyPy, it's even ordered! - call_soon_wakeup = attr.ib(default=attr.Factory(WakeupSocketpair)) - call_soon_queue = attr.ib(default=attr.Factory(deque)) - call_soon_idempotent_queue = attr.ib(default=attr.Factory(dict)) - call_soon_done = attr.ib(default=False) - # Must be a reentrant lock, because it's acquired from signal - # handlers. RLock is signal-safe as of cpython 3.2. - # NB that this does mean that the lock is effectively *disabled* when we - # enter from signal context. The way we use the lock this is OK though, - # because when call_soon_thread_and_signal_safe is called from a signal - # it's atomic WRT the main thread -- it just might happen at some - # inconvenient place. But if you look at the one place where the main - # thread holds the lock, it's just to make 1 assignment, so that's atomic - # WRT a signal anyway. - call_soon_lock = attr.ib(default=attr.Factory(threading.RLock)) - - def call_soon_thread_and_signal_safe( - self, sync_fn, *args, idempotent=False - ): - with self.call_soon_lock: - if self.call_soon_done: - raise RunFinishedError("run() has exited") - # We have to hold the lock all the way through here, because - # otherwise the main thread might exit *while* we're doing these - # calls, and then our queue item might not be processed, or the - # wakeup call might trigger an OSError b/c the IO manager has - # already been shut down. - if idempotent: - self.call_soon_idempotent_queue[(sync_fn, args)] = None - else: - self.call_soon_queue.append((sync_fn, args)) - self.call_soon_wakeup.wakeup_thread_and_signal_safe() - @_public - def current_call_soon_thread_and_signal_safe(self): - """Returns a reference to the ``call_soon_thread_and_signal_safe`` - function for the current trio run: - - .. currentmodule:: None - - .. function:: call_soon_thread_and_signal_safe(sync_fn, *args, idempotent=False) - - Schedule a call to ``sync_fn(*args)`` to occur in the context of a - trio task. This is safe to call from the main thread, from other - threads, and from signal handlers. - - The call is effectively run as part of a system task (see - :func:`~trio.hazmat.spawn_system_task`). In particular this means - that: - - * :exc:`KeyboardInterrupt` protection is *enabled* by default; if - you want ``sync_fn`` to be interruptible by control-C, then you - need to use :func:`~trio.hazmat.disable_ki_protection` - explicitly. - - * If ``sync_fn`` raises an exception, then it's converted into a - :exc:`~trio.TrioInternalError` and *all* tasks are cancelled. You - should be careful that ``sync_fn`` doesn't crash. - - All calls with ``idempotent=False`` are processed in strict - first-in first-out order. - - If ``idempotent=True``, then ``sync_fn`` and ``args`` must be - hashable, and trio will make a best-effort attempt to discard any - call submission which is equal to an already-pending call. Trio - will make an attempt to process these in first-in first-out order, - but no guarantees. (Currently processing is FIFO on CPython 3.6 and - PyPy, but not CPython 3.5.) - - Any ordering guarantees apply separately to ``idempotent=False`` - and ``idempotent=True`` calls; there's no rule for how calls in the - different categories are ordered with respect to each other. - - :raises trio.RunFinishedError: - if the associated call to :func:`trio.run` - has already exited. (Any call that *doesn't* raise this error - is guaranteed to be fully processed before :func:`trio.run` - exits.) - - .. currentmodule:: trio.hazmat + def current_trio_token(self): + """Retrieve the :class:`TrioToken` for the current call to + :func:`trio.run`. """ - return self.call_soon_thread_and_signal_safe - - async def call_soon_task(self): - assert currently_ki_protected() - # RLock has two implementations: a signal-safe version in _thread, and - # and signal-UNsafe version in threading. We need the signal safe - # version. Python 3.2 and later should always use this anyway, but, - # since the symptoms if this goes wrong are just "weird rare - # deadlocks", then let's make a little check. - # See: - # https://bugs.python.org/issue13697#msg237140 - assert self.call_soon_lock.__class__.__module__ == "_thread" - - def run_cb(job): - # We run this with KI protection enabled; it's the callbacks - # job to disable it if it wants it disabled. Exceptions are - # treated like system task exceptions (i.e., converted into - # TrioInternalError and cause everything to shut down). - sync_fn, args = job - try: - sync_fn(*args) - except BaseException as exc: - - async def kill_everything(exc): - raise exc - - self.spawn_system_task(kill_everything, exc) - return True - - # This has to be carefully written to be safe in the face of new items - # being queued while we iterate, and to do a bounded amount of work on - # each pass: - def run_all_bounded(): - for _ in range(len(self.call_soon_queue)): - run_cb(self.call_soon_queue.popleft()) - for job in list(self.call_soon_idempotent_queue): - del self.call_soon_idempotent_queue[job] - run_cb(job) - - try: - while True: - run_all_bounded() - if (not self.call_soon_queue - and not self.call_soon_idempotent_queue): - await self.call_soon_wakeup.wait_woken() - else: - await checkpoint() - except Cancelled: - # Keep the work done with this lock held as minimal as possible, - # because it doesn't protect us against concurrent signal delivery - # (see the comment above). Notice that this could would still be - # correct if written like: - # self.call_soon_done = True - # with self.call_soon_lock: - # pass - # because all we want is to force call_soon_thread_and_signal_safe - # to either be completely before or completely after the write to - # call_soon_done. That's why we don't need the lock to protect - # against signal handlers. - with self.call_soon_lock: - self.call_soon_done = True - # No more jobs will be submitted, so just clear out any residual - # ones: - run_all_bounded() - assert not self.call_soon_queue - assert not self.call_soon_idempotent_queue + if self.trio_token is None: + self.trio_token = TrioToken(self.entry_queue) + return self.trio_token ################ # KI handling @@ -1240,20 +1098,26 @@ def run_all_bounded(): ki_pending = attr.ib(default=False) + # deliver_ki is broke. Maybe move all the actual logic and state into + # RunToken, and we'll only have one instance per runner? But then we can't + # have a public constructor. Eh, but current_run_token() returning a + # unique object per run feels pretty nice. Maybe let's just go for it. And + # keep the class public so people can isinstance() it if they want. + # This gets called from signal context def deliver_ki(self): self.ki_pending = True try: - self.call_soon_thread_and_signal_safe(self._deliver_ki_cb) + self.entry_queue.run_sync_soon(self._deliver_ki_cb) except RunFinishedError: pass def _deliver_ki_cb(self): if not self.ki_pending: return - # Can't happen because main_task and call_soon_task are created at the - # same time -- so even if KI arrives before main_task is created, we - # won't get here until afterwards. + # Can't happen because main_task and run_sync_soon_task are created at + # the same time -- so even if KI arrives before main_task is created, + # we won't get here until afterwards. assert self.main_task is not None if self.main_task._result is not None: # We're already in the process of exiting -- leave ki_pending set @@ -1816,3 +1680,8 @@ def _generate_method_wrappers(cls, path_to_instance): _generate_method_wrappers(Runner, "runner") _generate_method_wrappers(TheIOManager, "runner.io_manager") + + +@deprecated("0.2.0", issue=68, instead=TrioToken) +def current_call_soon_thread_and_signal_safe(): + return current_trio_token().run_sync_soon diff --git a/trio/_core/tests/test_ki.py b/trio/_core/tests/test_ki.py index 4e9486d80d..41cda41bbf 100644 --- a/trio/_core/tests/test_ki.py +++ b/trio/_core/tests/test_ki.py @@ -29,13 +29,13 @@ async def test_ki_enabled(): assert not _core.currently_ki_protected() # Low-level call-soon callbacks are KI-protected - call_soon = _core.current_call_soon_thread_and_signal_safe() + token = _core.current_trio_token() record = [] def check(): record.append(_core.currently_ki_protected()) - call_soon(check) + token.run_sync_soon(check) await wait_all_tasks_blocked() assert record == [True] @@ -263,23 +263,23 @@ async def check_protected_kill(): _core.run(check_protected_kill) assert record == {"s1 ok", "s2 ok", "r1 cancel ok"} - # kill at last moment still raises (call_soon until it raises an error, - # then kill) + # kill at last moment still raises (run_sync_soon until it raises an + # error, then kill) print("check 3") async def check_kill_during_shutdown(): - call_soon = _core.current_call_soon_thread_and_signal_safe() + token = _core.current_trio_token() def kill_during_shutdown(): assert _core.currently_ki_protected() try: - call_soon(kill_during_shutdown) + token.run_sync_soon(kill_during_shutdown) except _core.RunFinishedError: # it's too late for regular handling! handle this! print("kill! kill!") ki_self() - call_soon(kill_during_shutdown) + token.run_sync_soon(kill_during_shutdown) with pytest.raises(KeyboardInterrupt): _core.run(check_kill_during_shutdown) @@ -363,9 +363,9 @@ def abort(raise_cancel): _core.run(main) - # KI arrives just before main task exits, so the call_soon machinery is - # still functioning and will accept the callback to deliver the KI, but by - # the time the callback is actually run, main has exited and can't be + # KI arrives just before main task exits, so the run_sync_soon machinery + # is still functioning and will accept the callback to deliver the KI, but + # by the time the callback is actually run, main has exited and can't be # aborted. print("check 9") diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 32f973e1dd..87df8a6213 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -384,14 +384,14 @@ async def child(): print(stats) # 2 system tasks + us assert stats.tasks_living == 3 - assert stats.call_soon_queue_size == 0 + assert stats.run_sync_soon_queue_size == 0 async with _core.open_nursery() as nursery: nursery.start_soon(child) await wait_all_tasks_blocked() - call_soon = _core.current_call_soon_thread_and_signal_safe() - call_soon(lambda: None) - call_soon(lambda: None, idempotent=True) + token = _core.current_trio_token() + token.run_sync_soon(lambda: None) + token.run_sync_soon(lambda: None, idempotent=True) stats = _core.current_statistics() print(stats) # 2 system tasks + us + child @@ -401,14 +401,14 @@ async def child(): # runnable on the next pass), but still useful to at least test the # difference between now and after we wake up the child: assert stats.tasks_runnable == 0 - assert stats.call_soon_queue_size == 2 + assert stats.run_sync_soon_queue_size == 2 nursery.cancel_scope.cancel() stats = _core.current_statistics() print(stats) assert stats.tasks_runnable == 1 - # Give the child a chance to die and the call_soon a chance to clear + # Give the child a chance to die and the run_sync_soon a chance to clear await _core.checkpoint() await _core.checkpoint() @@ -974,11 +974,11 @@ def test_broken_abort(): async def main(): # These yields are here to work around an annoying warning -- we're # going to crash the main loop, and if we (by chance) do this before - # the call_soon_task runs for the first time, then Python gives us a - # spurious warning about it not being awaited. (I mean, the warning is - # correct, but here we're testing our ability to deliver a + # the run_sync_soon task runs for the first time, then Python gives us + # a spurious warning about it not being awaited. (I mean, the warning + # is correct, but here we're testing our ability to deliver a # semi-meaningful error after things have gone totally pear-shaped, so - # it's not relevant.) By letting the call_soon_task run first, we + # it's not relevant.) By letting the run_sync_soon_task run first, we # avoid the warning. await _core.checkpoint() await _core.checkpoint() @@ -1021,8 +1021,7 @@ async def system_task(x): assert record == [("x", 1), ("ki", True)] -# intentionally make a system task crash (simulates a bug in call_soon_task or -# similar) +# intentionally make a system task crash def test_system_task_crash(): async def crasher(): raise KeyError @@ -1226,45 +1225,59 @@ async def child(): assert isinstance(excinfo.value.__context__, KeyError) -async def test_call_soon_basic(): +def test_TrioToken_identity(): + async def get_and_check_token(): + token = _core.current_trio_token() + # Two calls in the same run give the same object + assert token is _core.current_trio_token() + return token + + t1 = _core.run(get_and_check_token) + t2 = _core.run(get_and_check_token) + assert t1 is not t2 + assert t1 != t2 + assert hash(t1) != hash(t2) + + +async def test_TrioToken_run_sync_soon_basic(): record = [] def cb(x): record.append(("cb", x)) - call_soon = _core.current_call_soon_thread_and_signal_safe() - call_soon(cb, 1) + token = _core.current_trio_token() + token.run_sync_soon(cb, 1) assert not record await wait_all_tasks_blocked() assert record == [("cb", 1)] -def test_call_soon_too_late(): - call_soon = None +def test_TrioToken_run_sync_soon_too_late(): + token = None async def main(): - nonlocal call_soon - call_soon = _core.current_call_soon_thread_and_signal_safe() + nonlocal token + token = _core.current_trio_token() _core.run(main) - assert call_soon is not None + assert token is not None with pytest.raises(_core.RunFinishedError): - call_soon(lambda: None) # pragma: no branch + token.run_sync_soon(lambda: None) # pragma: no branch -async def test_call_soon_idempotent(): +async def test_TrioToken_run_sync_soon_idempotent(): record = [] def cb(x): record.append(x) - call_soon = _core.current_call_soon_thread_and_signal_safe() - call_soon(cb, 1) - call_soon(cb, 1, idempotent=True) - call_soon(cb, 1, idempotent=True) - call_soon(cb, 1, idempotent=True) - call_soon(cb, 2, idempotent=True) - call_soon(cb, 2, idempotent=True) + token = _core.current_trio_token() + token.run_sync_soon(cb, 1) + token.run_sync_soon(cb, 1, idempotent=True) + token.run_sync_soon(cb, 1, idempotent=True) + token.run_sync_soon(cb, 1, idempotent=True) + token.run_sync_soon(cb, 2, idempotent=True) + token.run_sync_soon(cb, 2, idempotent=True) await wait_all_tasks_blocked() assert len(record) == 3 assert sorted(record) == [1, 1, 2] @@ -1273,7 +1286,7 @@ def cb(x): record = [] for _ in range(3): for i in range(100): - call_soon(cb, i, idempotent=True) + token.run_sync_soon(cb, i, idempotent=True) await wait_all_tasks_blocked() if (sys.version_info < (3, 6) and platform.python_implementation() == "CPython"): @@ -1283,23 +1296,23 @@ def cb(x): assert record == list(range(100)) -def test_call_soon_idempotent_requeue(): +def test_TrioToken_run_sync_soon_idempotent_requeue(): # We guarantee that if a call has finished, queueing it again will call it # again. Due to the lack of synchronization, this effectively means that # we have to guarantee that once a call has *started*, queueing it again # will call it again. Also this is much easier to test :-) record = [] - def redo(call_soon): + def redo(token): record.append(None) try: - call_soon(redo, call_soon, idempotent=True) + token.run_sync_soon(redo, token, idempotent=True) except _core.RunFinishedError: pass async def main(): - call_soon = _core.current_call_soon_thread_and_signal_safe() - call_soon(redo, call_soon, idempotent=True) + token = _core.current_trio_token() + token.run_sync_soon(redo, token, idempotent=True) await _core.checkpoint() await _core.checkpoint() await _core.checkpoint() @@ -1309,14 +1322,14 @@ async def main(): assert len(record) >= 2 -def test_call_soon_after_main_crash(): +def test_TrioToken_run_sync_soon_after_main_crash(): record = [] async def main(): - call_soon = _core.current_call_soon_thread_and_signal_safe() + token = _core.current_trio_token() # After main exits but before finally cleaning up, callback processed # normally - call_soon(lambda: record.append("sync-cb")) + token.run_sync_soon(lambda: record.append("sync-cb")) raise ValueError with pytest.raises(ValueError): @@ -1325,15 +1338,15 @@ async def main(): assert record == ["sync-cb"] -def test_call_soon_crashes(): +def test_TrioToken_run_sync_soon_crashes(): record = set() async def main(): - call_soon = _core.current_call_soon_thread_and_signal_safe() - call_soon(lambda: dict()["nope"]) - # check that a crashing call_soon callback doesn't stop further calls - # to call_soon - call_soon(lambda: record.add("2nd call_soon ran")) + token = _core.current_trio_token() + token.run_sync_soon(lambda: dict()["nope"]) + # check that a crashing run_sync_soon callback doesn't stop further + # calls to run_sync_soon + token.run_sync_soon(lambda: record.add("2nd run_sync_soon ran")) try: await sleep_forever() except _core.Cancelled: @@ -1343,37 +1356,37 @@ async def main(): _core.run(main) assert type(excinfo.value.__cause__) is KeyError - assert record == {"2nd call_soon ran", "cancelled!"} + assert record == {"2nd run_sync_soon ran", "cancelled!"} -async def test_call_soon_FIFO(): +async def test_TrioToken_run_sync_soon_FIFO(): N = 100 record = [] - call_soon = _core.current_call_soon_thread_and_signal_safe() + token = _core.current_trio_token() for i in range(N): - call_soon(lambda j: record.append(j), i) + token.run_sync_soon(lambda j: record.append(j), i) await wait_all_tasks_blocked() assert record == list(range(N)) -def test_call_soon_starvation_resistance(): +def test_TrioToken_run_sync_soon_starvation_resistance(): # Even if we push callbacks in from callbacks, so that the callback queue # never empties out, then we still can't starve out other tasks from # running. - call_soon = None + token = None record = [] def naughty_cb(i): - nonlocal call_soon + nonlocal token try: - call_soon(naughty_cb, i + 1) + token.run_sync_soon(naughty_cb, i + 1) except _core.RunFinishedError: record.append(("run finished", i)) async def main(): - nonlocal call_soon - call_soon = _core.current_call_soon_thread_and_signal_safe() - call_soon(naughty_cb, 0) + nonlocal token + token = _core.current_trio_token() + token.run_sync_soon(naughty_cb, 0) record.append("starting") for _ in range(20): await _core.checkpoint() @@ -1385,24 +1398,24 @@ async def main(): assert record[1][1] >= 20 -def test_call_soon_threaded_stress_test(): +def test_TrioToken_run_sync_soon_threaded_stress_test(): cb_counter = 0 def cb(): nonlocal cb_counter cb_counter += 1 - def stress_thread(call_soon): + def stress_thread(token): try: while True: - call_soon(cb) + token.run_sync_soon(cb) time.sleep(0) except _core.RunFinishedError: pass async def main(): - call_soon = _core.current_call_soon_thread_and_signal_safe() - thread = threading.Thread(target=stress_thread, args=(call_soon,)) + token = _core.current_trio_token() + thread = threading.Thread(target=stress_thread, args=(token,)) thread.start() for _ in range(10): start_value = cb_counter @@ -1413,7 +1426,7 @@ async def main(): print(cb_counter) -async def test_call_soon_massive_queue(): +async def test_TrioToken_run_sync_soon_massive_queue(): # There are edge cases in the wakeup fd code when the wakeup fd overflows, # so let's try to make that happen. This is also just a good stress test # in general. (With the current-as-of-2017-02-14 code using a socketpair @@ -1421,7 +1434,7 @@ async def test_call_soon_massive_queue(): # takes 1 wakeup. So 1000 is overkill if anything. Windows OTOH takes # ~600,000 wakeups, but has the same code paths...) COUNT = 1000 - call_soon = _core.current_call_soon_thread_and_signal_safe() + token = _core.current_trio_token() counter = [0] def cb(i): @@ -1430,7 +1443,7 @@ def cb(i): counter[0] += 1 for i in range(COUNT): - call_soon(cb, i) + token.run_sync_soon(cb, i) await wait_all_tasks_blocked() assert counter[0] == COUNT @@ -1440,11 +1453,11 @@ async def test_slow_abort_basic(): scope.cancel() with pytest.raises(_core.Cancelled): task = _core.current_task() - call_soon = _core.current_call_soon_thread_and_signal_safe() + token = _core.current_trio_token() def slow_abort(raise_cancel): result = _core.Result.capture(raise_cancel) - call_soon(_core.reschedule, task, result) + token.run_sync_soon(_core.reschedule, task, result) return _core.Abort.FAILED await _core.wait_task_rescheduled(slow_abort) @@ -1455,12 +1468,12 @@ async def test_slow_abort_edge_cases(): async def slow_aborter(): task = _core.current_task() - call_soon = _core.current_call_soon_thread_and_signal_safe() + token = _core.current_trio_token() def slow_abort(raise_cancel): record.append("abort-called") result = _core.Result.capture(raise_cancel) - call_soon(_core.reschedule, task, result) + token.run_sync_soon(_core.reschedule, task, result) return _core.Abort.FAILED with pytest.raises(_core.Cancelled): @@ -1921,3 +1934,11 @@ async def noop(): assert nursery.reap_and_unwrap(task) == 33 assert not nursery.zombies + + record = [] + assert _core.current_statistics().call_soon_queue_size == 0 + call_soon = _core.current_call_soon_thread_and_signal_safe() + call_soon(record.append, 1) + assert _core.current_statistics().call_soon_queue_size == 1 + await wait_all_tasks_blocked() + assert record == [1] diff --git a/trio/_signals.py b/trio/_signals.py index 817d8b6cbf..d4e8a72e94 100644 --- a/trio/_signals.py +++ b/trio/_signals.py @@ -149,11 +149,11 @@ def catch_signals(signals): "Sorry, catch_signals is only possible when running in the " "Python interpreter's main thread" ) - call_soon = _core.current_call_soon_thread_and_signal_safe() + token = _core.current_trio_token() queue = SignalQueue() def handler(signum, _): - call_soon(queue._add, signum, idempotent=True) + token.run_sync_soon(queue._add, signum, idempotent=True) try: with _signal_handler(signals, handler): diff --git a/trio/_threads.py b/trio/_threads.py index 752ffaf8cd..53b6194d65 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -6,61 +6,131 @@ from . import _core from ._sync import CapacityLimiter -from ._deprecate import deprecated_alias +from ._deprecate import deprecated __all__ = [ "current_await_in_trio_thread", "current_run_in_trio_thread", "run_sync_in_worker_thread", "current_default_worker_thread_limiter", + "BlockingTrioPortal", ] -def _await_in_trio_thread_cb(q, afn, args): - @_core.disable_ki_protection - async def unprotected_afn(): - return await afn(*args) +class BlockingTrioPortal: + """A portal that synchronous threads can reach through to run code in the + Trio thread. - async def await_in_trio_thread_task(): - q.put_nowait(await _core.Result.acapture(unprotected_afn)) + Most Trio functions can only be called from the Trio thread, which is + sometimes annoying. What if you really need to call a Trio function from a + worker thread? That's where :class:`BlockingTrioPortal` comes in: its the + rare Trio object whose methods can – in fact, must! – be called from a + another thread, and it allows you to call all those other functions. - _core.spawn_system_task(await_in_trio_thread_task, name=afn) + There is one complication: it's possible for a single Python program to + contain multiple calls to :func:`trio.run`, either in sequence – like in a + test suite that calls :func:`trio.run` for each test – or simultaneously + in different threads. So how do you control which :func:`trio.run` your + portal opens into? + The answer is that each :class:`BlockingTrioPortal` object is associated + with one *specific* call to :func:`trio.run`. -def _run_in_trio_thread_cb(q, fn, args): - @_core.disable_ki_protection - def unprotected_fn(): - return fn(*args) + The simplest way to set this up is to instantiate the class with no + arguments inside Trio; this automatically binds it to the context where + you instantiate it:: - res = _core.Result.capture(unprotected_fn) - q.put_nowait(res) + async def some_function(): + portal = trio.BlockingTrioPortal() + await trio.run_sync_in_worker_thread(sync_fn, portal) + Alternatively, you can pass an explicit :class:`trio.hazmat.TrioToken` to + specify the :func:`trio.run` that you want your portal to connect to. -def _current_do_in_trio_thread(name, cb): - call_soon = _core.current_call_soon_thread_and_signal_safe() - trio_thread = threading.current_thread() + """ + + def __init__(self, trio_token=None): + if trio_token is None: + trio_token = _core.current_trio_token() + self._trio_token = trio_token + + # This is the part that runs in the trio thread + def _run_cb(self, q, afn, args): + @_core.disable_ki_protection + async def unprotected_afn(): + return await afn(*args) + + async def await_in_trio_thread_task(): + q.put_nowait(await _core.Result.acapture(unprotected_afn)) + + _core.spawn_system_task(await_in_trio_thread_task, name=afn) - def do_in_trio_thread(fn, *args): - if threading.current_thread() == trio_thread: - raise RuntimeError("must be called from a thread") + # This is the part that runs in the trio thread + def _run_sync_cb(self, q, fn, args): + @_core.disable_ki_protection + def unprotected_fn(): + return fn(*args) + + res = _core.Result.capture(unprotected_fn) + q.put_nowait(res) + + def _do_it(self, cb, fn, *args): + try: + _core.current_task() + except RuntimeError: + pass + else: + raise RuntimeError( + "this is a blocking function; call it from a thread" + ) q = stdlib_queue.Queue() - call_soon(cb, q, fn, args) + self._trio_token.run_sync_soon(cb, q, fn, args) return q.get().unwrap() - do_in_trio_thread.__name__ = name - return do_in_trio_thread + def run(self, afn, *args): + """Run the given async function in the trio thread, blocking until it + is complete. + + Returns or raises whatever the given function returns or raises. It + can also raise exceptions of its own: + + Raises: + RunFinishedError: if the corresponding call to :func:`trio.run` has + already completed. + Cancelled: if the corresponding call to :func:`trio.run` completes + while ``afn(*args)`` is running, then ``afn`` is likely to raise + :class:`Cancelled`, and this will propagate out into + RuntimeError: if you try calling this from inside the Trio thread, + which would otherwise cause a deadlock. + + """ + return self._do_it(self._run_cb, afn, *args) + + def run_sync(self, fn, *args): + """Run the given synchronous function in the trio thread, blocking + until it is complete. + + Returns or raises whatever the given function returns or raises. It + can also exceptions of its own: + + Raises: + RunFinishedError: if the corresponding call to :func:`trio.run` has + already completed. + RuntimeError: if you try calling this from inside the Trio thread, + which would otherwise cause a deadlock. + + """ + return self._do_it(self._run_sync_cb, fn, *args) +@deprecated("0.2.0", issue=68, instead=BlockingTrioPortal.run_sync) def current_run_in_trio_thread(): - return _current_do_in_trio_thread( - "run_in_trio_thread", _run_in_trio_thread_cb - ) + return BlockingTrioPortal().run_sync +@deprecated("0.2.0", issue=68, instead=BlockingTrioPortal.run) def current_await_in_trio_thread(): - return _current_do_in_trio_thread( - "await_in_trio_thread", _await_in_trio_thread_cb - ) + return BlockingTrioPortal().run ################################################################ @@ -280,7 +350,7 @@ async def run_sync_in_worker_thread( """ await _core.checkpoint_if_cancelled() - call_soon = _core.current_call_soon_thread_and_signal_safe() + token = _core.current_trio_token() if limiter is None: limiter = current_default_worker_thread_limiter() @@ -313,7 +383,7 @@ def do_release_then_return_result(): def worker_thread_fn(): result = _core.Result.capture(sync_fn, *args) try: - call_soon(report_back_in_trio_thread_fn, result) + token.run_sync_soon(report_back_in_trio_thread_fn, result) except _core.RunFinishedError: # The entire run finished, so our particular task is certainly # long gone -- it must have cancelled. diff --git a/trio/_util.py b/trio/_util.py index 6cee5f0eef..f08b403b5f 100644 --- a/trio/_util.py +++ b/trio/_util.py @@ -242,7 +242,14 @@ def decorator(func): def fixup_module_metadata(module_name, namespace): + def fix_one(obj): + mod = getattr(obj, "__module__", None) + if mod is not None and mod.startswith("trio."): + obj.__module__ = module_name + if isinstance(obj, type): + for attr_value in obj.__dict__.values(): + fix_one(attr_value) + for objname in namespace["__all__"]: obj = namespace[objname] - if hasattr(obj, "__module__") and obj.__module__.startswith("trio."): - obj.__module__ = module_name + fix_one(obj) diff --git a/trio/hazmat.py b/trio/hazmat.py index 29ae1092ba..66caa76de0 100644 --- a/trio/hazmat.py +++ b/trio/hazmat.py @@ -26,6 +26,8 @@ "RunLocal", "wait_socket_readable", "wait_socket_writable", + "TrioToken", + "current_trio_token", # kqueue symbols "current_kqueue", "monitor_kevent", diff --git a/trio/testing/_mock_clock.py b/trio/testing/_mock_clock.py index 0908f49bb3..f3a6d7decc 100644 --- a/trio/testing/_mock_clock.py +++ b/trio/testing/_mock_clock.py @@ -176,8 +176,8 @@ def _real_to_virtual(self, real): return self._virtual_base + virtual_offset def start_clock(self): - call_soon = _core.current_call_soon_thread_and_signal_safe() - call_soon(self._maybe_spawn_autojump_task) + token = _core.current_trio_token() + token.run_sync_soon(self._maybe_spawn_autojump_task) def current_time(self): return self._real_to_virtual(self._real_clock()) diff --git a/trio/tests/test_signals.py b/trio/tests/test_signals.py index c1076db082..d9b1db2d56 100644 --- a/trio/tests/test_signals.py +++ b/trio/tests/test_signals.py @@ -62,8 +62,8 @@ def direct_handler(signo, frame): async def wait_call_soon_idempotent_queue_barrier(): ev = Event() - call_soon = _core.current_call_soon_thread_and_signal_safe() - call_soon(ev.set, idempotent=True) + token = _core.current_trio_token() + token.run_sync_soon(ev.set, idempotent=True) await ev.wait() print(1) diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index 47e84847d4..586e05e69d 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -41,23 +41,21 @@ def threadfn(): ("f", trio_thread), expected ] - run_in_trio_thread = current_run_in_trio_thread() + portal = BlockingTrioPortal() def f(record): assert not _core.currently_ki_protected() record.append(("f", threading.current_thread())) return 2 - await check_case(run_in_trio_thread, f, ("got", 2)) + await check_case(portal.run_sync, f, ("got", 2)) def f(record): assert not _core.currently_ki_protected() record.append(("f", threading.current_thread())) raise ValueError - await check_case(run_in_trio_thread, f, ("error", ValueError)) - - await_in_trio_thread = current_await_in_trio_thread() + await check_case(portal.run_sync, f, ("error", ValueError)) async def f(record): assert not _core.currently_ki_protected() @@ -65,7 +63,7 @@ async def f(record): record.append(("f", threading.current_thread())) return 3 - await check_case(await_in_trio_thread, f, ("got", 3)) + await check_case(portal.run, f, ("got", 3)) async def f(record): assert not _core.currently_ki_protected() @@ -73,21 +71,33 @@ async def f(record): record.append(("f", threading.current_thread())) raise KeyError - await check_case(await_in_trio_thread, f, ("error", KeyError)) + await check_case(portal.run, f, ("error", KeyError)) async def test_do_in_trio_thread_from_trio_thread(): - run_in_trio_thread = current_run_in_trio_thread() - await_in_trio_thread = current_await_in_trio_thread() + portal = BlockingTrioPortal() with pytest.raises(RuntimeError): - run_in_trio_thread(lambda: None) # pragma: no branch + portal.run_sync(lambda: None) # pragma: no branch async def foo(): # pragma: no cover pass with pytest.raises(RuntimeError): - await_in_trio_thread(foo) + portal.run(foo) + + +async def test_BlockingTrioPortal_with_explicit_TrioToken(): + token = _core.current_trio_token() + + def worker_thread(token): + with pytest.raises(RuntimeError): + BlockingTrioPortal() + portal = BlockingTrioPortal(token) + return portal.run_sync(threading.current_thread) + + t = await run_sync_in_worker_thread(worker_thread, token) + assert t == threading.current_thread() def test_run_in_trio_thread_ki(): @@ -96,8 +106,7 @@ def test_run_in_trio_thread_ki(): record = set() async def check_run_in_trio_thread(): - run_in_trio_thread = current_run_in_trio_thread() - await_in_trio_thread = current_await_in_trio_thread() + portal = BlockingTrioPortal() def trio_thread_fn(): print("in trio thread") @@ -115,12 +124,12 @@ async def trio_thread_afn(): def external_thread_fn(): try: print("running") - run_in_trio_thread(trio_thread_fn) + portal.run_sync(trio_thread_fn) except KeyboardInterrupt: print("ok1") record.add("ok1") try: - await_in_trio_thread(trio_thread_afn) + portal.run(trio_thread_afn) except KeyboardInterrupt: print("ok2") record.add("ok2") @@ -147,15 +156,15 @@ async def trio_fn(): ev.set() await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED) - def thread_fn(await_in_trio_thread): + def thread_fn(portal): try: - await_in_trio_thread(trio_fn) + portal.run(trio_fn) except _core.Cancelled: record.append("cancelled") async def main(): - aitt = current_await_in_trio_thread() - thread = threading.Thread(target=thread_fn, args=(aitt,)) + portal = BlockingTrioPortal() + thread = threading.Thread(target=thread_fn, args=(portal,)) thread.start() await ev.wait() assert record == ["sleeping"] @@ -319,11 +328,11 @@ class state: state.running = 0 state.parked = 0 - run_in_trio_thread = current_run_in_trio_thread() + portal = BlockingTrioPortal() def thread_fn(cancel_scope): print("thread_fn start") - run_in_trio_thread(cancel_scope.cancel) + portal.run_sync(cancel_scope.cancel) with lock: state.ran += 1 state.running += 1 @@ -454,3 +463,21 @@ def bad_start(self): assert "engines" in str(excinfo.value) assert limiter.borrowed_tokens == 0 + + +# can remove after deleting 0.2.0 deprecations +async def test_deprecated_portal_API(recwarn): + trio_thread = threading.current_thread() + + async def async_current_thread(): + return threading.current_thread() + + def worker_thread(run_in_trio_thread, await_in_trio_thread): + assert run_in_trio_thread(threading.current_thread) == trio_thread + assert await_in_trio_thread(async_current_thread) == trio_thread + + run_in_trio_thread = current_run_in_trio_thread() + await_in_trio_thread = current_await_in_trio_thread() + await run_sync_in_worker_thread( + worker_thread, run_in_trio_thread, await_in_trio_thread + ) diff --git a/trio/tests/test_util.py b/trio/tests/test_util.py index b0c273c873..813140bcfd 100644 --- a/trio/tests/test_util.py +++ b/trio/tests/test_util.py @@ -172,3 +172,7 @@ def test_module_metadata_is_fixed_up(): assert trio.hazmat.wait_task_rescheduled.__module__ == "trio.hazmat" import trio.testing assert trio.testing.trio_test.__module__ == "trio.testing" + + # Also check methods + assert trio.ssl.SSLStream.__init__.__module__ == "trio.ssl" + assert trio.abc.Stream.send_all.__module__ == "trio.abc"