From d63960f3093a067387c0284d95bfcbe678270051 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 21 Dec 2022 09:06:08 +0100 Subject: [PATCH] Implement sub-shells --- ipykernel/control.py | 10 +- ipykernel/debugger.py | 30 +- ipykernel/iostream.py | 68 ++-- ipykernel/ipkernel.py | 40 ++- ipykernel/kernelapp.py | 69 ++-- ipykernel/kernelbase.py | 528 +++++++++++++------------------ ipykernel/shell.py | 24 ++ ipykernel/tests/test_subshell.py | 57 ++++ pyproject.toml | 5 +- 9 files changed, 433 insertions(+), 398 deletions(-) create mode 100644 ipykernel/shell.py create mode 100644 ipykernel/tests/test_subshell.py diff --git a/ipykernel/control.py b/ipykernel/control.py index 1aaf9a7e8..9c6a73deb 100644 --- a/ipykernel/control.py +++ b/ipykernel/control.py @@ -1,19 +1,19 @@ +import asyncio from threading import Thread -from tornado.ioloop import IOLoop - class ControlThread(Thread): def __init__(self, **kwargs): Thread.__init__(self, name="Control", **kwargs) - self.io_loop = IOLoop(make_current=False) + self.io_loop = asyncio.new_event_loop() self.pydev_do_not_trace = True self.is_pydev_daemon_thread = True def run(self): self.name = "Control" + asyncio.set_event_loop(self.io_loop) try: - self.io_loop.start() + self.io_loop.run_forever() finally: self.io_loop.close() @@ -22,4 +22,4 @@ def stop(self): This method is threadsafe. """ - self.io_loop.add_callback(self.io_loop.stop) + self.io_loop.call_soon_threadsafe(self.io_loop.stop) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 224678232..8df100403 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -170,9 +170,9 @@ async def get_message(self): class DebugpyClient: - def __init__(self, log, debugpy_stream, event_callback): + def __init__(self, log, debugpy_socket, event_callback): self.log = log - self.debugpy_stream = debugpy_stream + self.debugpy_socket = debugpy_socket self.event_callback = event_callback self.message_queue = DebugpyMessageQueue(self._forward_event, self.log) self.debugpy_host = "127.0.0.1" @@ -192,9 +192,9 @@ def _forward_event(self, msg): self.init_event_seq = msg["seq"] self.event_callback(msg) - def _send_request(self, msg): + async def _send_request(self, msg): if self.routing_id is None: - self.routing_id = self.debugpy_stream.socket.getsockopt(ROUTING_ID) + self.routing_id = self.debugpy_socket.getsockopt(ROUTING_ID) content = jsonapi.dumps( msg, default=json_default, @@ -209,7 +209,7 @@ def _send_request(self, msg): self.log.debug("DEBUGPYCLIENT:") self.log.debug(self.routing_id) self.log.debug(buf) - self.debugpy_stream.send_multipart((self.routing_id, buf)) + await self.debugpy_socket.send_multipart((self.routing_id, buf)) async def _wait_for_response(self): # Since events are never pushed to the message_queue @@ -227,7 +227,7 @@ async def _handle_init_sequence(self): "seq": int(self.init_event_seq) + 1, "command": "configurationDone", } - self._send_request(configurationDone) + await self._send_request(configurationDone) # 3] Waits for configurationDone response await self._wait_for_response() @@ -238,7 +238,7 @@ async def _handle_init_sequence(self): def get_host_port(self): if self.debugpy_port == -1: - socket = self.debugpy_stream.socket + socket = self.debugpy_socket socket.bind_to_random_port("tcp://" + self.debugpy_host) self.endpoint = socket.getsockopt(zmq.LAST_ENDPOINT).decode("utf-8") socket.unbind(self.endpoint) @@ -247,11 +247,11 @@ def get_host_port(self): return self.debugpy_host, self.debugpy_port def connect_tcp_socket(self): - self.debugpy_stream.socket.connect(self._get_endpoint()) - self.routing_id = self.debugpy_stream.socket.getsockopt(ROUTING_ID) + self.debugpy_socket.connect(self._get_endpoint()) + self.routing_id = self.debugpy_socket.getsockopt(ROUTING_ID) def disconnect_tcp_socket(self): - self.debugpy_stream.socket.disconnect(self._get_endpoint()) + self.debugpy_socket.disconnect(self._get_endpoint()) self.routing_id = None self.init_event = Event() self.init_event_seq = -1 @@ -261,7 +261,7 @@ def receive_dap_frame(self, frame): self.message_queue.put_tcp_frame(frame) async def send_dap_request(self, msg): - self._send_request(msg) + await self._send_request(msg) if self.wait_for_attach and msg["command"] == "attach": rep = await self._handle_init_sequence() self.wait_for_attach = False @@ -290,10 +290,10 @@ class Debugger: static_debug_msg_types = ["debugInfo", "inspectVariables", "richInspectVariables", "modules"] def __init__( - self, log, debugpy_stream, event_callback, shell_socket, session, just_my_code=True + self, log, debugpy_socket, event_callback, shell_socket, session, just_my_code=True ): self.log = log - self.debugpy_client = DebugpyClient(log, debugpy_stream, self._handle_event) + self.debugpy_client = DebugpyClient(log, debugpy_socket, self._handle_event) self.shell_socket = shell_socket self.session = session self.is_started = False @@ -375,7 +375,7 @@ async def handle_stopped_event(self): def tcp_client(self): return self.debugpy_client - def start(self): + async def start(self): if not self.debugpy_initialized: tmp_dir = get_tmp_directory() if not os.path.exists(tmp_dir): @@ -639,7 +639,7 @@ async def process_request(self, message): if self.is_started: self.log.info("The debugger has already started") else: - self.is_started = self.start() + self.is_started = await self.start() if self.is_started: self.log.info("The debugger has started") else: diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 5ea77452e..306905050 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -3,6 +3,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import atexit import io import os @@ -18,8 +19,6 @@ import zmq from jupyter_client.session import extract_header -from tornado.ioloop import IOLoop -from zmq.eventloop.zmqstream import ZMQStream # ----------------------------------------------------------------------------- # Globals @@ -57,7 +56,7 @@ def __init__(self, socket, pipe=False): self.background_socket = BackgroundSocket(self) self._master_pid = os.getpid() self._pipe_flag = pipe - self.io_loop = IOLoop(make_current=False) + self.io_loop = asyncio.new_event_loop() if pipe: self._setup_pipe_in() self._local = threading.local() @@ -72,20 +71,27 @@ def __init__(self, socket, pipe=False): def _thread_main(self): """The inner loop that's actually run in a thread""" - self.io_loop.start() - self.io_loop.close(all_fds=True) + asyncio.set_event_loop(self.io_loop) + try: + self.io_loop.run_until_complete(self._process_messages()) + finally: + self.io_loop.close() + + async def _process_messages(self): + asyncio.create_task(self._handle_event()) + if self._pipe_flag: + asyncio.create_task(self._handle_pipe_msg()) + await asyncio.Future() # wait forever def _setup_event_pipe(self): """Create the PULL socket listening for events that should fire in this thread.""" ctx = self.socket.context - pipe_in = ctx.socket(zmq.PULL) - pipe_in.linger = 0 + self._pipe_in0 = ctx.socket(zmq.PULL) + self._pipe_in0.linger = 0 _uuid = b2a_hex(os.urandom(16)).decode("ascii") iface = self._event_interface = "inproc://%s" % _uuid - pipe_in.bind(iface) - self._event_puller = ZMQStream(pipe_in, self.io_loop) - self._event_puller.on_recv(self._handle_event) + self._pipe_in0.bind(iface) @property def _event_pipe(self): @@ -104,7 +110,7 @@ def _event_pipe(self): self._event_pipes.add(event_pipe) return event_pipe - def _handle_event(self, msg): + async def _handle_event(self): """Handle an event on the event pipe Content of the message is ignored. @@ -112,12 +118,14 @@ def _handle_event(self, msg): Whenever *an* event arrives on the event stream, *all* waiting events are processed in order. """ - # freeze event count so new writes don't extend the queue - # while we are processing - n_events = len(self._events) - for _ in range(n_events): - event_f = self._events.popleft() - event_f() + while True: + await self._pipe_in0.recv_multipart() + # freeze event count so new writes don't extend the queue + # while we are processing + n_events = len(self._events) + for _ in range(n_events): + event_f = self._events.popleft() + event_f() def _setup_pipe_in(self): """setup listening pipe for IOPub from forked subprocesses""" @@ -126,30 +134,30 @@ def _setup_pipe_in(self): # use UUID to authenticate pipe messages self._pipe_uuid = os.urandom(16) - pipe_in = ctx.socket(zmq.PULL) - pipe_in.linger = 0 + self._pipe_in1 = ctx.socket(zmq.PULL) + self._pipe_in1.linger = 0 try: - self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1") + self._pipe_port = self._pipe_in1.bind_to_random_port("tcp://127.0.0.1") except zmq.ZMQError as e: warnings.warn( "Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e + "\nsubprocess output will be unavailable." ) self._pipe_flag = False - pipe_in.close() + self._pipe_in1.close() return - self._pipe_in = ZMQStream(pipe_in, self.io_loop) - self._pipe_in.on_recv(self._handle_pipe_msg) - def _handle_pipe_msg(self, msg): + async def _handle_pipe_msg(self): """handle a pipe message from a subprocess""" - if not self._pipe_flag or not self._is_master_process(): - return - if msg[0] != self._pipe_uuid: - print("Bad pipe message: %s", msg, file=sys.__stderr__) - return - self.send_multipart(msg[1:]) + while True: + msg = await self._pipe_in1.recv_multipart() + if not self._pipe_flag or not self._is_master_process(): + return + if msg[0] != self._pipe_uuid: + print("Bad pipe message: %s", msg, file=sys.__stderr__) + return + self.send_multipart(msg[1:]) def _setup_pipe_out(self): # must be new context after fork diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index 4dfe59c46..4bbbab8db 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -3,7 +3,6 @@ import asyncio import builtins import getpass -import signal import sys import threading import typing as t @@ -11,10 +10,10 @@ from functools import partial import comm +import zmq.asyncio from IPython.core import release from IPython.utils.tokenutil import line_at_cursor, token_at_cursor from traitlets import Any, Bool, HasTraits, Instance, List, Type, observe, observe_compat -from zmq.eventloop.zmqstream import ZMQStream from .comm.comm import BaseComm from .comm.manager import CommManager @@ -39,6 +38,11 @@ _use_experimental_60_completion = False +def DEBUG(msg): + with open("debug.log", "a") as f: + f.write(f"{msg}\n") + + _EXPERIMENTAL_KEY_NAME = "_jupyter_types_experimental" @@ -75,7 +79,9 @@ class IPythonKernel(KernelBase): help="Set this flag to False to deactivate the use of experimental IPython completion APIs.", ).tag(config=True) - debugpy_stream = Instance(ZMQStream, allow_none=True) if _is_debugpy_available else None + debugpy_socket = ( + Instance(zmq.asyncio.Socket, allow_none=True) if _is_debugpy_available else None + ) user_module = Any() @@ -106,7 +112,7 @@ def __init__(self, **kwargs): if _is_debugpy_available: self.debugger = Debugger( self.log, - self.debugpy_stream, + self.debugpy_socket, self._publish_debug_event, self.debug_shell_socket, self.session, @@ -188,8 +194,9 @@ def __init__(self, **kwargs): "file_extension": ".py", } - def dispatch_debugpy(self, msg): - if _is_debugpy_available: + async def dispatch_debugpy(self): + while True: + msg = await self.debugpy_socket.recv_multipart() # The first frame is the socket id, we can drop it frame = msg[1].bytes.decode("utf-8") self.log.debug("Debugpy received: %s", frame) @@ -205,15 +212,13 @@ async def poll_stopped_queue(self): def start(self): self.shell.exit_now = False - if self.debugpy_stream is None: - self.log.warning("debugpy_stream undefined, debugging will not be enabled") - else: - self.debugpy_stream.on_recv(self.dispatch_debugpy, copy=False) + if self.debugpy_socket is None: + self.log.warning("debugpy_socket undefined, debugging will not be enabled") + elif _is_debugpy_available: + asyncio.create_task(self.dispatch_debugpy()) super().start() - if self.debugpy_stream: - asyncio.run_coroutine_threadsafe( - self.poll_stopped_queue(), self.control_thread.io_loop.asyncio_loop - ) + if self.debugpy_socket: + asyncio.run_coroutine_threadsafe(self.poll_stopped_queue(), self.control_thread.io_loop) def set_parent(self, ident, parent, channel="shell"): """Overridden from parent to tell the display hook and output streams @@ -309,6 +314,8 @@ def cancel_unless_done(f, _ignored): future.add_done_callback(partial(cancel_unless_done, sigint_future)) def handle_sigint(*args): + DEBUG("handle_sigint") + def set_sigint_result(): if sigint_future.cancelled() or sigint_future.done(): return @@ -318,12 +325,13 @@ def set_sigint_result(): self.io_loop.add_callback(set_sigint_result) # set the custom sigint hander during this context - save_sigint = signal.signal(signal.SIGINT, handle_sigint) + # save_sigint = signal.signal(signal.SIGINT, handle_sigint) try: yield finally: # restore the previous sigint handler - signal.signal(signal.SIGINT, save_sigint) + # signal.signal(signal.SIGINT, save_sigint) + pass async def do_execute( self, diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 3f4aa708a..df02ed07b 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -3,11 +3,11 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import atexit import errno import logging import os -import signal import sys import traceback from functools import partial @@ -15,6 +15,7 @@ from logging import StreamHandler import zmq +import zmq.asyncio from IPython.core.application import ( BaseIPythonApplication, base_aliases, @@ -27,7 +28,6 @@ from jupyter_client.connect import ConnectionFileMixin from jupyter_client.session import Session, session_aliases, session_flags from jupyter_core.paths import jupyter_runtime_dir -from tornado import ioloop from traitlets.traitlets import ( Any, Bool, @@ -41,7 +41,6 @@ ) from traitlets.utils import filefind from traitlets.utils.importstring import import_item -from zmq.eventloop.zmqstream import ZMQStream from .control import ControlThread from .heartbeat import Heartbeat @@ -50,8 +49,15 @@ from .iostream import IOPubThread from .ipkernel import IPythonKernel from .parentpoller import ParentPollerUnix, ParentPollerWindows +from .shell import ShellThread from .zmqshell import ZMQInteractiveShell + +def DEBUG(msg): + with open("debug.log", "a") as f: + f.write(f"{msg}\n") + + # ----------------------------------------------------------------------------- # Flags and Aliases # ----------------------------------------------------------------------------- @@ -130,6 +136,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, ConnectionFileMix heartbeat = Instance(Heartbeat, allow_none=True) context = Any() + acontext = Any() shell_socket = Any() control_socket = Any() debugpy_socket = Any() @@ -137,6 +144,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, ConnectionFileMix stdin_socket = Any() iopub_socket = Any() iopub_thread = Any() + shell_thread = Any() control_thread = Any() _ports = Dict() @@ -301,10 +309,12 @@ def init_sockets(self): # Create a context, a session, and the kernel sockets. self.log.info("Starting the kernel at pid: %i", os.getpid()) assert self.context is None, "init_sockets cannot be called twice!" + assert self.acontext is None, "init_sockets cannot be called twice!" self.context = context = zmq.Context() + self.acontext = acontext = zmq.asyncio.Context() atexit.register(self.close) - self.shell_socket = context.socket(zmq.ROUTER) + self.shell_socket = acontext.socket(zmq.ROUTER) self.shell_socket.linger = 1000 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port) self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port) @@ -320,8 +330,8 @@ def init_sockets(self): # see ipython/ipykernel#270 and zeromq/libzmq#2892 self.shell_socket.router_handover = self.stdin_socket.router_handover = 1 - self.init_control(context) - self.init_iopub(context) + self.init_control(acontext) + self.init_iopub(acontext) def init_control(self, context): self.control_socket = context.socket(zmq.ROUTER) @@ -343,6 +353,7 @@ def init_control(self, context): # see ipython/ipykernel#270 and zeromq/libzmq#2892 self.control_socket.router_handover = 1 + self.shell_thread = ShellThread(daemon=True) self.control_thread = ControlThread(daemon=True) def init_iopub(self, context): @@ -369,6 +380,7 @@ def init_heartbeat(self): def close(self): """Close zmq sockets in an orderly fashion""" # un-capture IO before we start closing channels + DEBUG("close") self.reset_io() self.log.info("Cleaning up sockets") if self.heartbeat: @@ -378,6 +390,10 @@ def close(self): self.log.debug("Closing iopub channel") self.iopub_thread.stop() self.iopub_thread.close() + if self.shell_thread and self.shell_thread.is_alive(): + self.log.debug("Closing shell thread") + self.shell_thread.stop() + self.shell_thread.join() if self.control_thread and self.control_thread.is_alive(): self.log.debug("Closing control thread") self.control_thread.stop() @@ -395,6 +411,7 @@ def close(self): socket.close() self.log.debug("Terminating zmq context") self.context.term() + self.acontext.term() self.log.debug("Terminated zmq context") def log_connection_info(self): @@ -517,23 +534,23 @@ def register(signum, file=sys.__stderr__, all_threads=True, chain=False, **kwarg faulthandler.register = register def init_signal(self): - signal.signal(signal.SIGINT, signal.SIG_IGN) + # signal.signal(signal.SIGINT, signal.SIG_IGN) + pass def init_kernel(self): """Create the Kernel object itself""" - shell_stream = ZMQStream(self.shell_socket) - control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop) - debugpy_stream = ZMQStream(self.debugpy_socket, self.control_thread.io_loop) + self.shell_thread.start() self.control_thread.start() kernel_factory = self.kernel_class.instance kernel = kernel_factory( parent=self, session=self.session, - control_stream=control_stream, - debugpy_stream=debugpy_stream, + control_socket=self.control_socket, + debugpy_socket=self.debugpy_socket, debug_shell_socket=self.debug_shell_socket, - shell_stream=shell_stream, + shell_socket=self.shell_socket, + shell_thread=self.shell_thread, control_thread=self.control_thread, iopub_thread=self.iopub_thread, iopub_socket=self.iopub_socket, @@ -691,26 +708,38 @@ def initialize(self, argv=None): sys.stderr.flush() def start(self): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + while True: + try: + loop.run_until_complete(self.async_start()) + except KeyboardInterrupt: + DEBUG("KeyboardInterrupt") + continue + else: + DEBUG("break") + break + finally: + DEBUG("finally") + DEBUG("async_start end") + + async def async_start(self): if self.subapp is not None: return self.subapp.start() if self.poller is not None: self.poller.start() self.kernel.start() - self.io_loop = ioloop.IOLoop.current() + DEBUG("after kernel start") if self.trio_loop: from ipykernel.trio_runner import TrioRunner tr = TrioRunner() - tr.initialize(self.kernel, self.io_loop) + tr.initialize(self.kernel, self.io_loop) # FIXME try: tr.run() except KeyboardInterrupt: pass - else: - try: - self.io_loop.start() - except KeyboardInterrupt: - pass + await asyncio.Future() launch_new_instance = IPKernelApp.launch_instance diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index e5032c738..71ada7157 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -4,20 +4,17 @@ # Distributed under the terms of the Modified BSD License. import asyncio -import concurrent.futures import inspect import itertools import logging import os -import socket import sys import time import typing as t import uuid import warnings from datetime import datetime -from functools import partial -from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal +from signal import SIGINT, SIGTERM, Signals if sys.platform != "win32": from signal import SIGKILL @@ -32,12 +29,11 @@ # jupyter_client < 5, use local now() now = datetime.now +import janus import psutil import zmq from IPython.core.error import StdinNotImplementedError from jupyter_client.session import Session -from tornado import ioloop -from tornado.queues import Queue, QueueEmpty from traitlets.config.configurable import SingletonConfigurable from traitlets.traitlets import ( Any, @@ -52,13 +48,17 @@ default, observe, ) -from zmq.eventloop.zmqstream import ZMQStream from ipykernel.jsonutil import json_clean from ._version import kernel_protocol_version +def DEBUG(msg): + with open("debug.log", "a") as f: + f.write(f"{msg}\n") + + def _accepts_cell_id(meth): parameters = inspect.signature(meth).parameters cid_param = parameters.get("cell_id") @@ -81,58 +81,23 @@ class Kernel(SingletonConfigurable): @observe("eventloop") def _update_eventloop(self, change): """schedule call to eventloop from IOLoop""" - loop = ioloop.IOLoop.current() + loop = asyncio.get_running_loop() if change.new is not None: - loop.add_callback(self.enter_eventloop) + loop.call_soon(self.enter_eventloop) session = Instance(Session, allow_none=True) profile_dir = Instance("IPython.core.profiledir.ProfileDir", allow_none=True) - shell_stream = Instance(ZMQStream, allow_none=True) - - shell_streams = List( - help="""Deprecated shell_streams alias. Use shell_stream - - .. versionchanged:: 6.0 - shell_streams is deprecated. Use shell_stream. - """ - ) + shell_socket = Instance(zmq.asyncio.Socket, allow_none=True) implementation: str implementation_version: str banner: str - @default("shell_streams") - def _shell_streams_default(self): # pragma: no cover - warnings.warn( - "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream", - DeprecationWarning, - stacklevel=2, - ) - if self.shell_stream is not None: - return [self.shell_stream] - else: - return [] - - @observe("shell_streams") - def _shell_streams_changed(self, change): # pragma: no cover - warnings.warn( - "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream", - DeprecationWarning, - stacklevel=2, - ) - if len(change.new) > 1: - warnings.warn( - "Kernel only supports one shell stream. Additional streams will be ignored.", - RuntimeWarning, - stacklevel=2, - ) - if change.new: - self.shell_stream = change.new[0] - - control_stream = Instance(ZMQStream, allow_none=True) + control_socket = Instance(zmq.asyncio.Socket, allow_none=True) debug_shell_socket = Any() + shell_thread = Any() control_thread = Any() iopub_socket = Any() iopub_thread = Any() @@ -256,10 +221,12 @@ def _parent_header(self): "abort_request", "debug_request", "usage_request", + "create_subshell_request", ] def __init__(self, **kwargs): super().__init__(**kwargs) + self.shell_msg_queues = {} # Build dict of handlers for message types self.shell_handlers = {} for msg_type in self.msg_types: @@ -269,79 +236,7 @@ def __init__(self, **kwargs): for msg_type in self.control_msg_types: self.control_handlers[msg_type] = getattr(self, msg_type) - self.control_queue: Queue[t.Any] = Queue() - - def dispatch_control(self, msg): - self.control_queue.put_nowait(msg) - - async def poll_control_queue(self): - while True: - msg = await self.control_queue.get() - # handle tracers from _flush_control_queue - if isinstance(msg, (concurrent.futures.Future, asyncio.Future)): - msg.set_result(None) - continue - await self.process_control(msg) - - async def _flush_control_queue(self): - """Flush the control queue, wait for processing of any pending messages""" - tracer_future: t.Union[concurrent.futures.Future[object], asyncio.Future[object]] - if self.control_thread: - control_loop = self.control_thread.io_loop - # concurrent.futures.Futures are threadsafe - # and can be used to await across threads - tracer_future = concurrent.futures.Future() - awaitable_future = asyncio.wrap_future(tracer_future) - else: - control_loop = self.io_loop - tracer_future = awaitable_future = asyncio.Future() - - def _flush(): - # control_stream.flush puts messages on the queue - self.control_stream.flush() - # put Future on the queue after all of those, - # so we can wait for all queued messages to be processed - self.control_queue.put(tracer_future) - - control_loop.add_callback(_flush) - return awaitable_future - - async def process_control(self, msg): - """dispatch control requests""" - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except Exception: - self.log.error("Invalid Control Message", exc_info=True) - return - - self.log.debug("Control received: %s", msg) - - # Set the parent message for side effects. - self.set_parent(idents, msg, channel="control") - self._publish_status("busy", "control") - - header = msg["header"] - msg_type = header["msg_type"] - - handler = self.control_handlers.get(msg_type, None) - if handler is None: - self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) - else: - try: - result = handler(self.control_stream, idents, msg) - if inspect.isawaitable(result): - await result - except Exception: - self.log.error("Exception in control handler:", exc_info=True) - - sys.stdout.flush() - sys.stderr.flush() - self._publish_status("idle", "control") - # flush to ensure reply is sent - self.control_stream.flush(zmq.POLLOUT) - - def should_handle(self, stream, msg, idents): + async def should_handle(self, socket, msg, idents): """Check whether a shell-channel message should be handled Allows subclasses to prevent handling of certain messages (e.g. aborted requests). @@ -350,86 +245,20 @@ def should_handle(self, stream, msg, idents): if msg_id in self.aborted: # is it safe to assume a msg_id will not be resubmitted? self.aborted.remove(msg_id) - self._send_abort_reply(stream, msg, idents) + await self._send_abort_reply(socket, msg, idents) return False return True - async def dispatch_shell(self, msg): - """dispatch shell requests""" - - # flush control queue before handling shell requests - await self._flush_control_queue() - - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except Exception: - self.log.error("Invalid Message", exc_info=True) - return - - # Set the parent message for side effects. - self.set_parent(idents, msg, channel="shell") - self._publish_status("busy", "shell") - - msg_type = msg["header"]["msg_type"] - - # Only abort execute requests - if self._aborting and msg_type == "execute_request": - self._send_abort_reply(self.shell_stream, msg, idents) - self._publish_status("idle", "shell") - # flush to ensure reply is sent before - # handling the next request - self.shell_stream.flush(zmq.POLLOUT) - return - - # Print some info about this message and leave a '--->' marker, so it's - # easier to trace visually the message chain when debugging. Each - # handler prints its message at the end. - self.log.debug("\n*** MESSAGE TYPE:%s***", msg_type) - self.log.debug(" Content: %s\n --->\n ", msg["content"]) - - if not self.should_handle(self.shell_stream, msg, idents): - return - - handler = self.shell_handlers.get(msg_type, None) - if handler is None: - self.log.warning("Unknown message type: %r", msg_type) - else: - self.log.debug("%s: %s", msg_type, msg) - try: - self.pre_handler_hook() - except Exception: - self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) - try: - result = handler(self.shell_stream, idents, msg) - if inspect.isawaitable(result): - await result - except Exception: - self.log.error("Exception in message handler:", exc_info=True) - except KeyboardInterrupt: - # Ctrl-c shouldn't crash the kernel here. - self.log.error("KeyboardInterrupt caught in kernel.") - finally: - try: - self.post_handler_hook() - except Exception: - self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) - - sys.stdout.flush() - sys.stderr.flush() - self._publish_status("idle", "shell") - # flush to ensure reply is sent before - # handling the next request - self.shell_stream.flush(zmq.POLLOUT) - def pre_handler_hook(self): """Hook to execute before calling message handler""" # ensure default_int_handler during handler call - self.saved_sigint_handler = signal(SIGINT, default_int_handler) + # self.saved_sigint_handler = signal(SIGINT, default_int_handler) + pass def post_handler_hook(self): """Hook to execute after calling message handler""" - signal(SIGINT, self.saved_sigint_handler) + # signal(SIGINT, self.saved_sigint_handler) + pass def enter_eventloop(self): """enter eventloop""" @@ -445,11 +274,6 @@ def advance_eventloop(): if self.eventloop is not eventloop: self.log.info("exiting eventloop %s", eventloop) return - if self.msg_queue.qsize(): - self.log.debug("Delaying eventloop due to waiting messages") - # still messages to process, make the eventloop wait - schedule_next() - return self.log.debug("Advancing eventloop %s", eventloop) try: eventloop(self) @@ -466,99 +290,163 @@ def schedule_next(): # flush the eventloop every so often, # giving us a chance to handle messages in the meantime self.log.debug("Scheduling eventloop advance") - self.io_loop.call_later(0.001, advance_eventloop) + asyncio.get_running_loop().call_later(0.001, advance_eventloop) # begin polling the eventloop schedule_next() - async def do_one_iteration(self): - """Process a single shell message + _message_counter = Any( + help="""Monotonic counter of messages + """, + ) - Any pending control messages will be flushed as well + @default("_message_counter") + def _message_counter_default(self): + return itertools.count() - .. versionchanged:: 5 - This is now a coroutine - """ - # flush messages off of shell stream into the message queue - self.shell_stream.flush() - # process at most one shell message per iteration - await self.process_one(wait=False) + async def get_shell_messages(self): + while True: + msg = await self.shell_socket.recv_multipart() + idents, msg = self.session.feed_identities(msg, copy=True) + try: + msg = self.session.deserialize(msg, content=True, copy=True) + except BaseException: + self.log.error("Invalid Message", exc_info=True) + continue - async def process_one(self, wait=True): - """Process one request + shell_id = msg["header"].get("shell_id") + if self.shell_thread: + self.shell_msg_queues[shell_id].sync_q.put((idents, msg)) + else: + self.shell_msg_queues[shell_id].put_nowait((idents, msg)) - Returns None if no message was handled. - """ - if wait: - t, dispatch, args = await self.msg_queue.get() - else: + async def process_shell_messages(self, shell_id=None): + while True: try: - t, dispatch, args = self.msg_queue.get_nowait() - except (asyncio.QueueEmpty, QueueEmpty): - return None - await dispatch(*args) + if self.shell_thread: + # not sure if we need async queue here, so maybe not janus.Queue and just queue.Queue + idents, msg = await self.shell_msg_queues[shell_id].async_q.get() + else: + idents, msg = await self.shell_msg_queues[shell_id].get() - async def dispatch_queue(self): - """Coroutine to preserve order of message handling + DEBUG(f"{shell_id} {msg}") + # Set the parent message for side effects. + self.set_parent(idents, msg, channel="shell") + self._publish_status("busy", "shell") - Ensures that only one message is processing at a time, - even when the handler is async - """ + msg_type = msg["header"]["msg_type"] + + # Only abort execute requests + if self._aborting and msg_type == "execute_request": + await self._send_abort_reply(self.shell_socket, msg, idents) + self._publish_status("idle", "shell") + # flush to ensure reply is sent before + # handling the next request + return + # Print some info about this message and leave a '--->' marker, so it's + # easier to trace visually the message chain when debugging. Each + # handler prints its message at the end. + self.log.debug("\n*** MESSAGE TYPE:%s***", msg_type) + self.log.debug(" Content: %s\n --->\n ", msg["content"]) + + if not await self.should_handle(self.shell_socket, msg, idents): + return + + handler = self.shell_handlers.get(msg_type, None) + if handler is None: + self.log.warning("Unknown message type: %r", msg_type) + else: + self.log.debug("%s: %s", msg_type, msg) + try: + self.pre_handler_hook() + except Exception: + self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) + try: + result = handler(self.shell_socket, idents, msg) + if inspect.isawaitable(result): + await result + except Exception: + self.log.error("Exception in message handler:", exc_info=True) + except KeyboardInterrupt: + # Ctrl-c shouldn't crash the kernel here. + self.log.error("KeyboardInterrupt caught in kernel.") + finally: + try: + self.post_handler_hook() + except Exception: + self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) + + sys.stdout.flush() + sys.stderr.flush() + self._publish_status("idle", "shell") + # flush to ensure reply is sent before + # handling the next request + + except BaseException as e: + self.log.warning(f"{e=}") + + async def process_control_messages(self): while True: + msg = await self.control_socket.recv_multipart() + idents, msg = self.session.feed_identities(msg, copy=True) try: - await self.process_one() + msg = self.session.deserialize(msg, content=True, copy=True) except Exception: - self.log.exception("Error in message handler") + self.log.error("Invalid Control Message", exc_info=True) + return - _message_counter = Any( - help="""Monotonic counter of messages - """, - ) + DEBUG(f"{msg=}") + self.log.debug("Control received: %s", msg) - @default("_message_counter") - def _message_counter_default(self): - return itertools.count() + # Set the parent message for side effects. + self.set_parent(idents, msg, channel="control") + self._publish_status("busy", "control") - def schedule_dispatch(self, dispatch, *args): - """schedule a message for dispatch""" - idx = next(self._message_counter) + header = msg["header"] + msg_type = header["msg_type"] - self.msg_queue.put_nowait( - ( - idx, - dispatch, - args, - ) - ) - # ensure the eventloop wakes up - self.io_loop.add_callback(lambda: None) + handler = self.control_handlers.get(msg_type, None) + if handler is None: + self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) + else: + try: + result = handler(self.control_socket, idents, msg) + if inspect.isawaitable(result): + await result + except Exception: + self.log.error("Exception in control handler:", exc_info=True) - def start(self): - """register dispatchers for streams""" - self.io_loop = ioloop.IOLoop.current() - self.msg_queue: Queue[t.Any] = Queue() - self.io_loop.add_callback(self.dispatch_queue) + sys.stdout.flush() + sys.stderr.flush() + self._publish_status("idle", "control") + # flush to ensure reply is sent - self.control_stream.on_recv(self.dispatch_control, copy=False) + def start(self): + """Process messages on shell and control channels""" + self.io_loop = asyncio.get_running_loop() if self.control_thread: - control_loop = self.control_thread.io_loop + asyncio.run_coroutine_threadsafe( + self.process_control_messages(), self.control_thread.io_loop + ) else: - control_loop = self.io_loop + asyncio.create_task(self.process_control_messages()) - asyncio.run_coroutine_threadsafe(self.poll_control_queue(), control_loop.asyncio_loop) + if self.shell_thread: + self.shell_msg_queues[None] = janus.Queue() + asyncio.run_coroutine_threadsafe(self.get_shell_messages(), self.shell_thread.loop) + else: + # if sub-shells are not supported, getting messages and processing them is done in the main thread + # so use an asyncio.Queue directly, no need to handle multi-threading + self.shell_msg_queues[None] = asyncio.Queue() + asyncio.create_task(self.get_shell_messages()) - self.shell_stream.on_recv( - partial( - self.schedule_dispatch, - self.dispatch_shell, - ), - copy=False, - ) + asyncio.create_task(self.process_shell_messages()) # main shell # publish idle status self._publish_status("starting", "shell") + DEBUG("start end") def record_ports(self, ports): """Record the ports that this kernel is using. @@ -681,7 +569,7 @@ def finish_metadata(self, parent, metadata, reply_content): """ return metadata - async def execute_request(self, stream, ident, parent): + async def execute_request(self, socket, ident, parent): """handle an execute_request""" try: content = parent["content"] @@ -707,23 +595,26 @@ async def execute_request(self, stream, ident, parent): cell_id = (parent.get("metadata") or {}).get("cellId") - if _accepts_cell_id(self.do_execute): - reply_content = self.do_execute( - code, - silent, - store_history, - user_expressions, - allow_stdin, - cell_id=cell_id, - ) - else: - reply_content = self.do_execute( - code, - silent, - store_history, - user_expressions, - allow_stdin, - ) + try: + if _accepts_cell_id(self.do_execute): + reply_content = self.do_execute( + code, + silent, + store_history, + user_expressions, + allow_stdin, + cell_id=cell_id, + ) + else: + reply_content = self.do_execute( + code, + silent, + store_history, + user_expressions, + allow_stdin, + ) + except KeyboardInterrupt: + DEBUG("KeyboardInterrupt") if inspect.isawaitable(reply_content): reply_content = await reply_content @@ -742,7 +633,7 @@ async def execute_request(self, stream, ident, parent): metadata = self.finish_metadata(parent, metadata, reply_content) reply_msg = self.session.send( - stream, + socket, "execute_reply", reply_content, parent, @@ -753,7 +644,7 @@ async def execute_request(self, stream, ident, parent): self.log.debug("%s", reply_msg) if not silent and reply_msg["content"]["status"] == "error" and stop_on_error: - self._abort_queues() + await self._abort_queues() def do_execute( self, @@ -857,10 +748,20 @@ def kernel_info(self): "help_links": self.help_links, } - async def kernel_info_request(self, stream, ident, parent): + async def create_subshell_request(self, socket, ident, parent): + shell_id = str(uuid.uuid4()) + self.shell_msg_queues[shell_id] = janus.Queue() + asyncio.create_task(self.process_shell_messages(shell_id)) + content = { + "status": "ok", + "shell_id": shell_id, + } + self.session.send(socket, "create_subshell_reply", content, parent, ident) + + async def kernel_info_request(self, socket, ident, parent): content = {"status": "ok"} content.update(self.kernel_info) - msg = self.session.send(stream, "kernel_info_reply", content, parent, ident) + msg = self.session.send(socket, "kernel_info_reply", content, parent, ident) self.log.debug("%s", msg) async def comm_info_request(self, stream, ident, parent): @@ -899,29 +800,28 @@ def _send_interupt_children(self): except OSError: pass - async def interrupt_request(self, stream, ident, parent): + async def interrupt_request(self, socket, ident, parent): self._send_interupt_children() content = parent["content"] - self.session.send(stream, "interrupt_reply", content, parent, ident=ident) + self.session.send(socket, "interrupt_reply", content, parent, ident=ident) return - async def shutdown_request(self, stream, ident, parent): + async def shutdown_request(self, socket, ident, parent): content = self.do_shutdown(parent["content"]["restart"]) if inspect.isawaitable(content): content = await content - self.session.send(stream, "shutdown_reply", content, parent, ident=ident) + self.session.send(socket, "shutdown_reply", content, parent, ident=ident) # same content, but different msg_id for broadcasting on IOPub self._shutdown_message = self.session.msg("shutdown_reply", content, parent) await self._at_shutdown() self.log.debug("Stopping control ioloop") - control_io_loop = self.control_stream.io_loop - control_io_loop.add_callback(control_io_loop.stop) + if self.control_thread: + self.control_thread.stop() self.log.debug("Stopping shell ioloop") - shell_io_loop = self.shell_stream.io_loop - shell_io_loop.add_callback(shell_io_loop.stop) + asyncio.get_running_loop().stop() def do_shutdown(self, restart): """Override in subclasses to do things when the frontend shuts down the @@ -929,7 +829,7 @@ def do_shutdown(self, restart): """ return {"status": "ok", "restart": restart} - async def is_complete_request(self, stream, ident, parent): + async def is_complete_request(self, socket, ident, parent): content = parent["content"] code = content["code"] @@ -937,20 +837,24 @@ async def is_complete_request(self, stream, ident, parent): if inspect.isawaitable(reply_content): reply_content = await reply_content reply_content = json_clean(reply_content) - reply_msg = self.session.send(stream, "is_complete_reply", reply_content, parent, ident) + reply_msg = self.session.send( + socket, "is_complete_reply", reply_content, parent, ident + ) self.log.debug("%s", reply_msg) def do_is_complete(self, code): """Override in subclasses to find completions.""" return {"status": "unknown"} - async def debug_request(self, stream, ident, parent): + async def debug_request(self, socket, ident, parent): content = parent["content"] reply_content = self.do_debug_request(content) if inspect.isawaitable(reply_content): reply_content = await reply_content reply_content = json_clean(reply_content) - reply_msg = self.session.send(stream, "debug_reply", reply_content, parent, ident) + reply_msg = self.session.send( + socket, "debug_reply", reply_content, parent, ident + ) self.log.debug("%s", reply_msg) def get_process_metric_value(self, process, name, attribute=None): @@ -965,7 +869,7 @@ def get_process_metric_value(self, process, name, attribute=None): except BaseException: return None - async def usage_request(self, stream, ident, parent): + async def usage_request(self, socket, ident, parent): reply_content = {"hostname": socket.gethostname(), "pid": os.getpid()} current_process = psutil.Process() all_processes = [current_process] + current_process.children(recursive=True) @@ -994,7 +898,9 @@ async def usage_request(self, stream, ident, parent): reply_content["host_cpu_percent"] = cpu_percent reply_content["cpu_count"] = psutil.cpu_count(logical=True) reply_content["host_virtual_memory"] = dict(psutil.virtual_memory()._asdict()) - reply_msg = self.session.send(stream, "usage_reply", reply_content, parent, ident) + reply_msg = self.session.send( + socket, "usage_reply", reply_content, parent, ident + ) self.log.debug("%s", reply_msg) async def do_debug_request(self, msg): @@ -1004,7 +910,7 @@ async def do_debug_request(self, msg): # Engine methods (DEPRECATED) # --------------------------------------------------------------------------- - async def apply_request(self, stream, ident, parent): # pragma: no cover + async def apply_request(self, socket, ident, parent): # pragma: no cover self.log.warning("apply_request is deprecated in kernel_base, moving to ipyparallel.") try: content = parent["content"] @@ -1025,7 +931,7 @@ async def apply_request(self, stream, ident, parent): # pragma: no cover md = self.finish_metadata(parent, md, reply_content) self.session.send( - stream, + socket, "apply_reply", reply_content, parent=parent, @@ -1042,7 +948,7 @@ def do_apply(self, content, bufs, msg_id, reply_metadata): # Control messages (DEPRECATED) # --------------------------------------------------------------------------- - async def abort_request(self, stream, ident, parent): # pragma: no cover + async def abort_request(self, socket, ident, parent): # pragma: no cover """abort a specific msg by id""" self.log.warning( "abort_request is deprecated in kernel_base. It is only part of IPython parallel" @@ -1051,23 +957,25 @@ async def abort_request(self, stream, ident, parent): # pragma: no cover if isinstance(msg_ids, str): msg_ids = [msg_ids] if not msg_ids: - self._abort_queues() + await self._abort_queues() for mid in msg_ids: self.aborted.add(str(mid)) content = dict(status="ok") reply_msg = self.session.send( - stream, "abort_reply", content=content, parent=parent, ident=ident + socket, "abort_reply", content=content, parent=parent, ident=ident ) self.log.debug("%s", reply_msg) - async def clear_request(self, stream, idents, parent): # pragma: no cover + async def clear_request(self, socket, idents, parent): # pragma: no cover """Clear our namespace.""" self.log.warning( "clear_request is deprecated in kernel_base. It is only part of IPython parallel" ) content = self.do_clear() - self.session.send(stream, "clear_reply", ident=idents, parent=parent, content=content) + self.session.send( + socket, "clear_reply", ident=idents, parent=parent, content=content + ) def do_clear(self): """DEPRECATED since 4.0.3""" @@ -1085,7 +993,7 @@ def _topic(self, topic): _aborting = Bool(False) - def _abort_queues(self): + async def _abort_queues(self): # while this flag is true, # execute requests will be aborted self._aborting = True @@ -1093,24 +1001,25 @@ def _abort_queues(self): # flush streams, so all currently waiting messages # are added to the queue - self.shell_stream.flush() # Callback to signal that we are done aborting # dispatch functions _must_ be async async def stop_aborting(): + await asyncio.sleep(self.stop_on_error_timeout) self.log.info("Finishing abort") self._aborting = False # put the stop-aborting event on the message queue # so that all messages already waiting in the queue are aborted # before we reset the flag - schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting) + # schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting) # if we have a delay, give messages this long to arrive on the queue # before we stop aborting requests - asyncio.get_event_loop().call_later(self.stop_on_error_timeout, schedule_stop_aborting) + # asyncio.get_event_loop().call_later(self.stop_on_error_timeout, schedule_stop_aborting) + asyncio.create_task(stop_aborting()) - def _send_abort_reply(self, stream, msg, idents): + async def _send_abort_reply(self, socket, msg, idents): """Send a reply to an aborted request""" self.log.info(f"Aborting {msg['header']['msg_id']}: {msg['header']['msg_type']}") reply_type = msg["header"]["msg_type"].rsplit("_", 1)[0] + "_reply" @@ -1120,7 +1029,7 @@ def _send_abort_reply(self, stream, msg, idents): md.update(status) self.session.send( - stream, + socket, reply_type, metadata=md, content=status, @@ -1304,4 +1213,3 @@ async def _at_shutdown(self): ident=self._topic("shutdown"), ) self.log.debug("%s", self._shutdown_message) - self.control_stream.flush(zmq.POLLOUT) diff --git a/ipykernel/shell.py b/ipykernel/shell.py new file mode 100644 index 000000000..f264031c2 --- /dev/null +++ b/ipykernel/shell.py @@ -0,0 +1,24 @@ +import asyncio +from threading import Thread + + +def DEBUG(msg): + with open("debug.log", "a") as f: + f.write(f"{msg}\n") + + +class ShellThread(Thread): + def __init__(self, **kwargs): + super().__init__(name="Shell", **kwargs) + self.loop = asyncio.new_event_loop() + + def run(self): + DEBUG("running shell in thread") + asyncio.set_event_loop(self.loop) + try: + self.loop.run_forever() + finally: + self.loop.close() + + def stop(self): + self.loop.call_soon_threadsafe(self.loop.stop) diff --git a/ipykernel/tests/test_subshell.py b/ipykernel/tests/test_subshell.py new file mode 100644 index 000000000..d68df1505 --- /dev/null +++ b/ipykernel/tests/test_subshell.py @@ -0,0 +1,57 @@ +"""Test subshell""" + +import time + +from .utils import flush_channels, get_reply, start_new_kernel + +KC = KM = None + + +def setup_function(): + """start the global kernel (if it isn't running) and return its client""" + global KM, KC + KM, KC = start_new_kernel() + flush_channels(KC) + + +def teardown_function(): + assert KC is not None + assert KM is not None + KC.stop_channels() + KM.shutdown_kernel(now=True) + + +def test_subshell(): + flush_channels(KC) + + # create new sub-shell + msg = KC.session.msg("create_subshell_request") + KC.control_channel.send(msg) + reply = get_reply(KC, msg["header"]["msg_id"], channel="control") + shell_id = reply["content"]["shell_id"] + + # try running (blocking) code in parallel + seconds = 1 + # will wait some time in main shell + msg1 = KC.session.msg( + "execute_request", {"code": f"import time; time.sleep({seconds})", "silent": False} + ) + # will wait more time in sub-shell + msg2 = KC.session.msg( + "execute_request", {"code": f"import time; time.sleep({seconds * 1.5})", "silent": False} + ) + msg2["header"]["shell_id"] = shell_id + # run and measure time + t = time.time() + KC.shell_channel.send(msg1) + KC.shell_channel.send(msg2) + # in any case, main shell should finish first + get_reply(KC, msg1["header"]["msg_id"]) + dt1 = time.time() - t + # main shell should not take much more that the duration of first cell + assert seconds < dt1 < seconds * 1.5 + # in any case, sub-shell should finish second + get_reply(KC, msg2["header"]["msg_id"]) + dt2 = time.time() - t - dt1 + # remaining time should be less than the duration of second cell + assert seconds * 0.5 < dt2 < seconds * 1.5 diff --git a/pyproject.toml b/pyproject.toml index 9904b54b9..e55a89203 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,13 +30,13 @@ dependencies = [ "comm>=0.1.1", "traitlets>=5.4.0", "jupyter_client>=6.1.12", - "tornado>=6.1", "matplotlib-inline>=0.1", 'appnope;platform_system=="Darwin"', "pyzmq>=17", "psutil", "nest_asyncio", "packaging", + "janus>=1.0.0", ] [project.optional-dependencies] @@ -53,7 +53,8 @@ test = [ "ipyparallel", "pre-commit", "pytest-asyncio", - "pytest-timeout" + "pytest-timeout", + "tornado>=6.1", ] cov = [ "coverage[toml]",