Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import ensure_async and run_sync from jupyter_core #889

Merged
merged 3 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 3 additions & 77 deletions jupyter_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,84 +3,10 @@
- provides utility wrappers to run asynchronous functions in a blocking environment.
- vendor functions from ipython_genutils that should be retired at some point.
"""
import asyncio
import atexit
import inspect
import os
import threading
from typing import Optional


class _TaskRunner:
"""A task runner that runs an asyncio event loop on a background thread."""

def __init__(self):
self.__io_loop: Optional[asyncio.AbstractEventLoop] = None
self.__runner_thread: Optional[threading.Thread] = None
self.__lock = threading.Lock()
atexit.register(self._close)

def _close(self):
if self.__io_loop:
self.__io_loop.stop()

def _runner(self):
loop = self.__io_loop
assert loop is not None
try:
loop.run_forever()
finally:
loop.close()

def run(self, coro):
"""Synchronously run a coroutine on a background thread."""
with self.__lock:
name = f"{threading.current_thread().name} - runner"
if self.__io_loop is None:
self.__io_loop = asyncio.new_event_loop()
self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name)
self.__runner_thread.start()
fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop)
return fut.result(None)


_runner_map = {}
_loop_map = {}


def run_sync(coro):
def wrapped(*args, **kwargs):
name = threading.current_thread().name
inner = coro(*args, **kwargs)
try:
# If a loop is currently running in this thread,
# use a task runner.
asyncio.get_running_loop()
if name not in _runner_map:
_runner_map[name] = _TaskRunner()
return _runner_map[name].run(inner)
except RuntimeError:
pass

# Run the loop for this thread.
if name not in _loop_map:
_loop_map[name] = asyncio.new_event_loop()
loop = _loop_map[name]
return loop.run_until_complete(inner)

wrapped.__doc__ = coro.__doc__
return wrapped


async def ensure_async(obj):
"""Ensure a returned object is asynchronous.L

NOTE: This should only be used on methods of external classes,
not on a `self` method.
"""
if inspect.isawaitable(obj):
return await obj
return obj

from jupyter_core.utils import ensure_async # noqa: F401
from jupyter_core.utils import run_sync # noqa: F401


def _filefind(filename, path_dirs=None):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [
requires-python = ">=3.8"
dependencies = [
"importlib_metadata>=4.8.3;python_version<\"3.10\"",
"jupyter_core>=4.9.2",
"jupyter_core>=4.12,!=~5.0",
"python-dateutil>=2.8.2",
"pyzmq>=23.0",
"tornado>=6.2",
Expand Down