diff --git a/jupyter_client/utils.py b/jupyter_client/utils.py index 84a19616c..f9448841d 100644 --- a/jupyter_client/utils.py +++ b/jupyter_client/utils.py @@ -3,84 +3,10 @@ - provides utility wrappers to run asynchronous functions in a blocking environment. - vendor functions from ipython_genutils that should be retired at some point. """ -import asyncio -import atexit -import inspect import os -import threading -from typing import Optional - - -class _TaskRunner: - """A task runner that runs an asyncio event loop on a background thread.""" - - def __init__(self): - self.__io_loop: Optional[asyncio.AbstractEventLoop] = None - self.__runner_thread: Optional[threading.Thread] = None - self.__lock = threading.Lock() - atexit.register(self._close) - - def _close(self): - if self.__io_loop: - self.__io_loop.stop() - - def _runner(self): - loop = self.__io_loop - assert loop is not None - try: - loop.run_forever() - finally: - loop.close() - - def run(self, coro): - """Synchronously run a coroutine on a background thread.""" - with self.__lock: - name = f"{threading.current_thread().name} - runner" - if self.__io_loop is None: - self.__io_loop = asyncio.new_event_loop() - self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name) - self.__runner_thread.start() - fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop) - return fut.result(None) - - -_runner_map = {} -_loop_map = {} - - -def run_sync(coro): - def wrapped(*args, **kwargs): - name = threading.current_thread().name - inner = coro(*args, **kwargs) - try: - # If a loop is currently running in this thread, - # use a task runner. - asyncio.get_running_loop() - if name not in _runner_map: - _runner_map[name] = _TaskRunner() - return _runner_map[name].run(inner) - except RuntimeError: - pass - - # Run the loop for this thread. - if name not in _loop_map: - _loop_map[name] = asyncio.new_event_loop() - loop = _loop_map[name] - return loop.run_until_complete(inner) - - wrapped.__doc__ = coro.__doc__ - return wrapped - - -async def ensure_async(obj): - """Ensure a returned object is asynchronous.L - - NOTE: This should only be used on methods of external classes, - not on a `self` method. - """ - if inspect.isawaitable(obj): - return await obj - return obj + +from jupyter_core.utils import ensure_async # noqa: F401 +from jupyter_core.utils import run_sync # noqa: F401 def _filefind(filename, path_dirs=None):