Skip to content

Commit

Permalink
Fixed error in thread pool shutdown callback on Python 3.6
Browse files Browse the repository at this point in the history
Fixes #286.
  • Loading branch information
agronholm committed May 10, 2021
1 parent 4c87f07 commit 7c6063d
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Changed asyncio task groups so that if the host and child tasks have only raised
``CancelledErrors``, just one ``CancelledError`` will now be raised instead of an
``ExceptionGroup``, allowing asyncio to ignore it when it propagates out of the task
- Fixed ``sniffio._impl.AsyncLibraryNotFoundError: unknown async library, or not in async context``
on asyncio and Python 3.6 when ``to_thread.run_sync()`` is used from
``loop.run_until_complete()``
- Fixed odd ``ExceptionGroup: 0 exceptions were raised in the task group`` appearing under certain
circumstances on asyncio
- Fixed ``wait_all_tasks_blocked()`` returning prematurely on asyncio when a previously blocked
Expand Down
13 changes: 8 additions & 5 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,13 +684,16 @@ async def start(self, func: Callable[..., Coroutine], *args, name=None) -> None:


class WorkerThread(Thread):
__slots__ = 'root_task', 'loop', 'queue', 'idle_since'
__slots__ = 'root_task', 'workers', 'idle_workers', 'loop', 'queue', 'idle_since'

MAX_IDLE_TIME = 10 # seconds

def __init__(self, root_task: asyncio.Task):
def __init__(self, root_task: asyncio.Task, workers: Set['WorkerThread'],
idle_workers: Deque['WorkerThread'],):
super().__init__(name='AnyIO worker thread')
self.root_task = root_task
self.workers = workers
self.idle_workers = idle_workers
self.loop = root_task._loop
self.queue: Queue[Union[Tuple[Callable, tuple, asyncio.Future], None]] = Queue(2)
self.idle_since = current_time()
Expand Down Expand Up @@ -719,9 +722,9 @@ def run(self) -> None:

def stop(self, f: Optional[asyncio.Task] = None) -> None:
self.queue.put_nowait(None)
_threadpool_workers.get().discard(self)
self.workers.discard(self)
try:
_threadpool_idle_workers.get().remove(self)
self.idle_workers.remove(self)
except ValueError:
pass

Expand Down Expand Up @@ -750,7 +753,7 @@ async def run_sync_in_worker_thread(
future: asyncio.Future = asyncio.Future()
root_task = find_root_task()
if not idle_workers:
worker = WorkerThread(root_task)
worker = WorkerThread(root_task, workers, idle_workers)
worker.start()
workers.add(worker)
root_task.add_done_callback(worker.stop)
Expand Down
11 changes: 11 additions & 0 deletions tests/test_to_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,14 @@ async def sleep_sync():
task = asyncio_event_loop.create_task(sleep_sync())
task.add_done_callback(partial(func))
asyncio_event_loop.run_until_complete(task)


def test_asyncio_run_sync_no_asyncio_run(asyncio_event_loop):
"""Test that the thread pool shutdown callback does not raise an exception."""
def exception_handler(loop, context=None):
exceptions.append(context['exception'])

exceptions = []
asyncio_event_loop.set_exception_handler(exception_handler)
asyncio_event_loop.run_until_complete(to_thread.run_sync(time.sleep, 0))
assert not exceptions

0 comments on commit 7c6063d

Please sign in to comment.