Skip to content

Commit

Permalink
Import ensure_async and run_sync from jupyter_core
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Nov 24, 2022
1 parent 1aa276b commit 0cb36ef
Showing 1 changed file with 3 additions and 77 deletions.
80 changes: 3 additions & 77 deletions jupyter_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,84 +3,10 @@
- provides utility wrappers to run asynchronous functions in a blocking environment.
- vendor functions from ipython_genutils that should be retired at some point.
"""
import asyncio
import atexit
import inspect
import os
import threading
from typing import Optional


class _TaskRunner:
"""A task runner that runs an asyncio event loop on a background thread."""

def __init__(self):
self.__io_loop: Optional[asyncio.AbstractEventLoop] = None
self.__runner_thread: Optional[threading.Thread] = None
self.__lock = threading.Lock()
atexit.register(self._close)

def _close(self):
if self.__io_loop:
self.__io_loop.stop()

def _runner(self):
loop = self.__io_loop
assert loop is not None
try:
loop.run_forever()
finally:
loop.close()

def run(self, coro):
"""Synchronously run a coroutine on a background thread."""
with self.__lock:
name = f"{threading.current_thread().name} - runner"
if self.__io_loop is None:
self.__io_loop = asyncio.new_event_loop()
self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name)
self.__runner_thread.start()
fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop)
return fut.result(None)


_runner_map = {}
_loop_map = {}


def run_sync(coro):
def wrapped(*args, **kwargs):
name = threading.current_thread().name
inner = coro(*args, **kwargs)
try:
# If a loop is currently running in this thread,
# use a task runner.
asyncio.get_running_loop()
if name not in _runner_map:
_runner_map[name] = _TaskRunner()
return _runner_map[name].run(inner)
except RuntimeError:
pass

# Run the loop for this thread.
if name not in _loop_map:
_loop_map[name] = asyncio.new_event_loop()
loop = _loop_map[name]
return loop.run_until_complete(inner)

wrapped.__doc__ = coro.__doc__
return wrapped


async def ensure_async(obj):
"""Ensure a returned object is asynchronous.L
NOTE: This should only be used on methods of external classes,
not on a `self` method.
"""
if inspect.isawaitable(obj):
return await obj
return obj

from jupyter_core.utils import ensure_async # noqa: F401
from jupyter_core.utils import run_sync # noqa: F401


def _filefind(filename, path_dirs=None):
Expand Down

0 comments on commit 0cb36ef

Please sign in to comment.