From e85724dd041f3f5ccd83c4067aa25087ad3c1c5a Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Thu, 9 Dec 2021 09:56:59 -0800 Subject: [PATCH] nudge both the shell and control channels --- jupyter_server/services/kernels/handlers.py | 34 +++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/jupyter_server/services/kernels/handlers.py b/jupyter_server/services/kernels/handlers.py index c17a8c72d2..46b54b9372 100644 --- a/jupyter_server/services/kernels/handlers.py +++ b/jupyter_server/services/kernels/handlers.py @@ -131,13 +131,27 @@ def create_stream(self): def nudge(self): """Nudge the zmq connections with kernel_info_requests Returns a Future that will resolve when we have received - a control reply and at least one iopub message, + a shell or control reply and at least one iopub message, ensuring that zmq subscriptions are established, sockets are fully connected, and kernel is responsive. Keeps retrying kernel_info_request until these are both received. """ kernel = self.kernel_manager.get_kernel(self.kernel_id) + # Do not nudge busy kernels as kernel info requests sent to shell are + # queued behind execution requests. + # nudging in this case would cause a potentially very long wait + # before connections are opened, + # plus it is *very* unlikely that a busy kernel will not finish + # establishing its zmq subscriptions before processing the next request. + if getattr(kernel, "execution_state") == "busy": + self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id) + f = Future() + f.set_result(None) + return f + # Use a transient shell channel to prevent leaking + # shell responses to the front-end. + shell_channel = kernel.connect_shell() # Use a transient control channel to prevent leaking # control responses to the front-end. control_channel = kernel.connect_control() @@ -160,18 +174,26 @@ def cleanup(_=None): """Common cleanup""" loop.remove_timeout(nudge_handle) iopub_channel.stop_on_recv() + if not shell_channel.closed(): + shell_channel.close() if not control_channel.closed(): control_channel.close() # trigger cleanup when both message futures are resolved both_done.add_done_callback(cleanup) - def on_control_reply(msg): + def on_shell_reply(msg): self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) if not info_future.done(): self.log.debug("Nudge: resolving shell future: %s", self.kernel_id) info_future.set_result(None) + def on_control_reply(msg): + self.log.debug("Nudge: control info reply received: %s", self.kernel_id) + if not info_future.done(): + self.log.debug("Nudge: resolving control future: %s", self.kernel_id) + info_future.set_result(None) + def on_iopub(msg): self.log.debug("Nudge: IOPub received: %s", self.kernel_id) if not iopub_future.done(): @@ -180,6 +202,7 @@ def on_iopub(msg): iopub_future.set_result(None) iopub_channel.on_recv(on_iopub) + shell_channel.on_recv(on_shell_reply) control_channel.on_recv(on_control_reply) loop = IOLoop.current() @@ -200,6 +223,12 @@ def nudge(count): finish() return + # check for closed zmq socket + if shell_channel.closed(): + self.log.debug("Nudge: cancelling on closed zmq socket: %s", self.kernel_id) + finish() + return + # check for closed zmq socket if control_channel.closed(): self.log.debug("Nudge: cancelling on closed zmq socket: %s", self.kernel_id) @@ -209,6 +238,7 @@ def nudge(count): if not both_done.done(): log = self.log.warning if count % 10 == 0 else self.log.debug log("Nudge: attempt %s on kernel %s" % (count, self.kernel_id)) + self.session.send(shell_channel, "kernel_info_request") self.session.send(control_channel, "kernel_info_request") nonlocal nudge_handle nudge_handle = loop.call_later(0.5, nudge, count)