From 7a3ae6e4a4bfbef9e722f2bad4c9532f96010cf4 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Sat, 12 Dec 2020 10:46:56 +0100 Subject: [PATCH 1/7] Nudge kernel with info request until we receive IOPub messages --- notebook/services/kernels/handlers.py | 87 ++++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 8 deletions(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 73da737b15..9ee62f7a39 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -128,6 +128,65 @@ def create_stream(self): self.channels[channel] = stream = meth(self.kernel_id, identity=identity) stream.channel = channel + def nudge(self): + shell_channel = self.channels['shell'] + iopub_channel = self.channels['iopub'] + + future = Future() + info_future = Future() + iopub_future = Future() + + def finish(): + """Common cleanup""" + loop.remove_timeout(timeout) + loop.remove_timeout(nudge_handle) + iopub_channel.stop_on_recv() + shell_channel.stop_on_recv() + + def on_shell_reply(msg): + if not info_future.done(): + self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) + shell_channel.stop_on_recv() + self.log.debug("Nudge: resolving shell future") + info_future.set_result(msg) + if iopub_future.done(): + finish() + self.log.debug("Nudge: resolving main future in shell handler") + future.set_result(info_future.result()) + + def on_iopub(msg): + if not iopub_future.done(): + self.log.debug("Nudge: first IOPub received: %s", self.kernel_id) + iopub_channel.stop_on_recv() + self.log.debug("Nudge: resolving iopub future") + iopub_future.set_result(None) + if info_future.done(): + finish() + self.log.debug("Nudge: resolving main future in iopub handler") + future.set_result(info_future.result()) + + def on_timeout(): + self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id) + finish() + if not future.done(): + future.set_exception(TimeoutError("Timeout waiting for nudge")) + + iopub_channel.on_recv(on_iopub) + shell_channel.on_recv(on_shell_reply) + loop = IOLoop.current() + + # Nudge the kernel with kernel info requests until we get an IOPub message + def nudge(): + self.log.debug("Nudge") + if not future.done(): + self.log.debug("nudging") + self.session.send(shell_channel, "kernel_info_request") + nudge_handle = loop.call_later(0.5, nudge) + nudge_handle = loop.call_later(0, nudge) + + timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) + return future + def request_kernel_info(self): """send a request for kernel_info""" km = self.kernel_manager @@ -253,6 +312,7 @@ def _register_session(self): yield stale_handler.close() self._open_sessions[self.session_key] = self + @gen.coroutine def open(self, kernel_id): super().open() km = self.kernel_manager @@ -263,15 +323,21 @@ def open(self, kernel_id): if buffer_info and buffer_info['session_key'] == self.session_key: self.log.info("Restoring connection for %s", self.session_key) self.channels = buffer_info['channels'] - replay_buffer = buffer_info['buffer'] - if replay_buffer: - self.log.info("Replaying %s buffered messages", len(replay_buffer)) - for channel, msg_list in replay_buffer: - stream = self.channels[channel] - self._on_zmq_reply(stream, msg_list) + connected = self.nudge() + + def replay(value): + replay_buffer = buffer_info['buffer'] + if replay_buffer: + self.log.info("Replaying %s buffered messages", len(replay_buffer)) + for channel, msg_list in replay_buffer: + stream = self.channels[channel] + self._on_zmq_reply(stream, msg_list) + + connected.add_done_callback(replay) else: try: self.create_stream() + connected = self.nudge() except web.HTTPError as e: self.log.error("Error opening stream: %s", e) # WebSockets don't response to traditional error codes so we @@ -285,8 +351,13 @@ def open(self, kernel_id): km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') - for channel, stream in self.channels.items(): - stream.on_recv_stream(self._on_zmq_reply) + def subscribe(value): + for channel, stream in self.channels.items(): + stream.on_recv_stream(self._on_zmq_reply) + + connected.add_done_callback(subscribe) + + return connected def on_message(self, msg): if not self.channels: From 9770ef77606694332d067d988b47635461008af2 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Tue, 15 Dec 2020 17:33:38 +0100 Subject: [PATCH 2/7] Review: open is not a coroutine --- notebook/services/kernels/handlers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 9ee62f7a39..2f3f6b8d86 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -312,7 +312,6 @@ def _register_session(self): yield stale_handler.close() self._open_sessions[self.session_key] = self - @gen.coroutine def open(self, kernel_id): super().open() km = self.kernel_manager From 2c87813ef40d48ac47205142b0652e5bd1b7e28c Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Tue, 15 Dec 2020 18:32:35 +0100 Subject: [PATCH 3/7] Add counter for nudge attempts --- notebook/services/kernels/handlers.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 2f3f6b8d86..40f5c7cac7 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -176,13 +176,15 @@ def on_timeout(): loop = IOLoop.current() # Nudge the kernel with kernel info requests until we get an IOPub message - def nudge(): - self.log.debug("Nudge") + def nudge(count): + count += 1 if not future.done(): - self.log.debug("nudging") + self.log.debug("Nudging attempt %s or kernel %s" % (count, self.kernel_id)) self.session.send(shell_channel, "kernel_info_request") - nudge_handle = loop.call_later(0.5, nudge) - nudge_handle = loop.call_later(0, nudge) + nudge_handle = loop.call_later(0.5, nudge, count) + + nudge_count = 0 + nudge_handle = loop.call_later(0, nudge, nudge_count) timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) return future From b4f5f6b4d4da0cbb5a97967deb889207bbe6281f Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Wed, 16 Dec 2020 20:39:46 +0100 Subject: [PATCH 4/7] Log nudge attempt count at warning-level if it keeps going up Co-authored-by: Min RK --- notebook/services/kernels/handlers.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 40f5c7cac7..10d8389b94 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -178,17 +178,18 @@ def on_timeout(): # Nudge the kernel with kernel info requests until we get an IOPub message def nudge(count): count += 1 + nonlocal nudge_handle if not future.done(): - self.log.debug("Nudging attempt %s or kernel %s" % (count, self.kernel_id)) + log = self.log.warning if count % 10 == 0 else self.log.debug + log("Nudging attempt %s on kernel %s" % (count, self.kernel_id)) self.session.send(shell_channel, "kernel_info_request") nudge_handle = loop.call_later(0.5, nudge, count) - nudge_count = 0 - nudge_handle = loop.call_later(0, nudge, nudge_count) + nudge_handle = loop.call_later(0, nudge, count=0) timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) return future - + def request_kernel_info(self): """send a request for kernel_info""" km = self.kernel_manager From f716328d90a09039742172ff22144b67ef256f09 Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 17 Dec 2020 13:27:49 +0100 Subject: [PATCH 5/7] relax busy/idle ordering on reconnect nudge can leak idle messages, so it could be busy->idle, or idle->busy->idle. The important thing is that we get at least one idle message --- notebook/tests/services/kernel.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/notebook/tests/services/kernel.js b/notebook/tests/services/kernel.js index df69fdf02c..4f22b802e9 100644 --- a/notebook/tests/services/kernel.js +++ b/notebook/tests/services/kernel.js @@ -242,8 +242,9 @@ casper.notebook_test(function () { 'kernel_disconnected.Kernel', 'kernel_reconnecting.Kernel', 'kernel_connected.Kernel', - 'kernel_busy.Kernel', - 'kernel_idle.Kernel' + // note: there will be a 'busy' in here, too, + // but we can't guarantee which will come first + 'kernel_idle.Kernel', ], function () { this.thenEvaluate(function () { @@ -262,8 +263,7 @@ casper.notebook_test(function () { 'kernel_connection_failed.Kernel', 'kernel_reconnecting.Kernel', 'kernel_connected.Kernel', - 'kernel_busy.Kernel', - 'kernel_idle.Kernel' + 'kernel_idle.Kernel', ], function () { this.thenEvaluate(function () { From 94678545db723a5d66a2dd30a085d3a4695875cf Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Thu, 17 Dec 2020 14:28:48 +0100 Subject: [PATCH 6/7] Use transient shell channel, and do not nudge busy kernels --- notebook/services/kernels/handlers.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 10d8389b94..2fd17be5f6 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -129,7 +129,12 @@ def create_stream(self): stream.channel = channel def nudge(self): - shell_channel = self.channels['shell'] + # Use a transient shell channel to prevent leaking + # shell responses to the front-end. + kernel = self.kernel_manager.get_kernel(self.kernel_id) + shell_channel = kernel.connect_shell() + + # The IOPub used by the client. iopub_channel = self.channels['iopub'] future = Future() @@ -141,18 +146,20 @@ def finish(): loop.remove_timeout(timeout) loop.remove_timeout(nudge_handle) iopub_channel.stop_on_recv() - shell_channel.stop_on_recv() + if not shell_channel.closed(): + shell_channel.close() def on_shell_reply(msg): if not info_future.done(): self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) - shell_channel.stop_on_recv() + if not shell_channel.closed(): + shell_channel.close() self.log.debug("Nudge: resolving shell future") - info_future.set_result(msg) + info_future.set_result(None) if iopub_future.done(): finish() self.log.debug("Nudge: resolving main future in shell handler") - future.set_result(info_future.result()) + future.set_result(None) def on_iopub(msg): if not iopub_future.done(): @@ -163,7 +170,7 @@ def on_iopub(msg): if info_future.done(): finish() self.log.debug("Nudge: resolving main future in iopub handler") - future.set_result(info_future.result()) + future.set_result(None) def on_timeout(): self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id) @@ -177,16 +184,18 @@ def on_timeout(): # Nudge the kernel with kernel info requests until we get an IOPub message def nudge(count): - count += 1 - nonlocal nudge_handle + # Do not nudge busy kernels as kernel info requests sent to shell are + # queued behind execution requests. + if kernel.execution_state == 'busy': + future.set_result(None) if not future.done(): log = self.log.warning if count % 10 == 0 else self.log.debug log("Nudging attempt %s on kernel %s" % (count, self.kernel_id)) self.session.send(shell_channel, "kernel_info_request") + nonlocal nudge_handle nudge_handle = loop.call_later(0.5, nudge, count) nudge_handle = loop.call_later(0, nudge, count=0) - timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) return future From 672f0a6e203e17d706a65c016f58b25425a101b0 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 18 Dec 2020 11:21:48 +0100 Subject: [PATCH 7/7] nudge: handle failure cases - connect iopub first (tiny effect on the race!) - docstrings, log details - resolve immediately if kernel is busy, rather than setting up timeouts, futures - use gen.with_timeout instead of separately managed timeout - use gen.multi to wait for both futures instead of duplicated check in each handler, third Future - add various cancel conditions (sockets closed, kernel stopped, etc.) --- notebook/services/kernels/handlers.py | 119 ++++++++++++++++++-------- 1 file changed, 81 insertions(+), 38 deletions(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 2fd17be5f6..5b624c25d0 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -123,60 +123,76 @@ def __repr__(self): def create_stream(self): km = self.kernel_manager identity = self.session.bsession - for channel in ('shell', 'control', 'iopub', 'stdin'): - meth = getattr(km, 'connect_' + channel) + for channel in ("iopub", "shell", "control", "stdin"): + meth = getattr(km, "connect_" + channel) self.channels[channel] = stream = meth(self.kernel_id, identity=identity) stream.channel = channel - def nudge(self): + def nudge(self): + """Nudge the zmq connections with kernel_info_requests + + Returns a Future that will resolve when we have received + a shell reply and at least one iopub message, + ensuring that zmq subscriptions are established, + sockets are fully connected, and kernel is responsive. + + Keeps retrying kernel_info_request until these are both received. + """ + kernel = self.kernel_manager.get_kernel(self.kernel_id) + + # Do not nudge busy kernels as kernel info requests sent to shell are + # queued behind execution requests. + # nudging in this case would cause a potentially very long wait + # before connections are opened, + # plus it is *very* unlikely that a busy kernel will not finish + # establishing its zmq subscriptions before processing the next request. + if getattr(kernel, "execution_state") == "busy": + self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id) + f = Future() + f.set_result(None) + return f + # Use a transient shell channel to prevent leaking # shell responses to the front-end. - kernel = self.kernel_manager.get_kernel(self.kernel_id) shell_channel = kernel.connect_shell() + # The IOPub used by the client, whose subscriptions we are verifying. + iopub_channel = self.channels["iopub"] - # The IOPub used by the client. - iopub_channel = self.channels['iopub'] - - future = Future() info_future = Future() iopub_future = Future() + both_done = gen.multi([info_future, iopub_future]) + + def finish(f=None): + """Ensure all futures are resolved - def finish(): + which in turn triggers cleanup + """ + for f in (info_future, iopub_future): + if not f.done(): + f.set_result(None) + + def cleanup(f=None): """Common cleanup""" - loop.remove_timeout(timeout) loop.remove_timeout(nudge_handle) iopub_channel.stop_on_recv() if not shell_channel.closed(): shell_channel.close() + # trigger cleanup when both message futures are resolved + both_done.add_done_callback(cleanup) + def on_shell_reply(msg): + self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) if not info_future.done(): - self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) - if not shell_channel.closed(): - shell_channel.close() - self.log.debug("Nudge: resolving shell future") + self.log.debug("Nudge: resolving shell future: %s", self.kernel_id) info_future.set_result(None) - if iopub_future.done(): - finish() - self.log.debug("Nudge: resolving main future in shell handler") - future.set_result(None) def on_iopub(msg): + self.log.debug("Nudge: IOPub received: %s", self.kernel_id) if not iopub_future.done(): - self.log.debug("Nudge: first IOPub received: %s", self.kernel_id) iopub_channel.stop_on_recv() - self.log.debug("Nudge: resolving iopub future") + self.log.debug("Nudge: resolving iopub future: %s", self.kernel_id) iopub_future.set_result(None) - if info_future.done(): - finish() - self.log.debug("Nudge: resolving main future in iopub handler") - future.set_result(None) - - def on_timeout(): - self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id) - finish() - if not future.done(): - future.set_exception(TimeoutError("Timeout waiting for nudge")) iopub_channel.on_recv(on_iopub) shell_channel.on_recv(on_shell_reply) @@ -184,19 +200,46 @@ def on_timeout(): # Nudge the kernel with kernel info requests until we get an IOPub message def nudge(count): - # Do not nudge busy kernels as kernel info requests sent to shell are - # queued behind execution requests. - if kernel.execution_state == 'busy': - future.set_result(None) - if not future.done(): + count += 1 + + # NOTE: this close check appears to never be True during on_open, + # even when the peer has closed the connection + if self.ws_connection is None or self.ws_connection.is_closing(): + self.log.debug( + "Nudge: cancelling on closed websocket: %s", self.kernel_id + ) + finish() + return + + # check for stopped kernel + if self.kernel_id not in self.kernel_manager: + self.log.debug( + "Nudge: cancelling on stopped kernel: %s", self.kernel_id + ) + finish() + return + + # check for closed zmq socket + if shell_channel.closed(): + self.log.debug( + "Nudge: cancelling on closed zmq socket: %s", self.kernel_id + ) + finish() + return + + if not both_done.done(): log = self.log.warning if count % 10 == 0 else self.log.debug - log("Nudging attempt %s on kernel %s" % (count, self.kernel_id)) + log("Nudge: attempt %s on kernel %s" % (count, self.kernel_id)) self.session.send(shell_channel, "kernel_info_request") nonlocal nudge_handle nudge_handle = loop.call_later(0.5, nudge, count) nudge_handle = loop.call_later(0, nudge, count=0) - timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) + + # resolve with a timeout if we get no response + future = gen.with_timeout(loop.time() + self.kernel_info_timeout, both_done) + # ensure we have no dangling resources or unresolved Futures in case of timeout + future.add_done_callback(finish) return future def request_kernel_info(self): @@ -221,7 +264,7 @@ def request_kernel_info(self): self.log.debug("Waiting for pending kernel_info request") future.add_done_callback(lambda f: self._finish_kernel_info(f.result())) return self._kernel_info_future - + def _handle_kernel_info_reply(self, msg): """process the kernel_info_reply