From 77b96ad3e495a00cd4dacf30f772299cfc385958 Mon Sep 17 00:00:00 2001 From: Min RK Date: Wed, 29 Mar 2023 14:12:28 +0200 Subject: [PATCH] use c.f.Future to wait across threads Future allows us to propagate errors across threads, instead of threading.Event, which only waits forever and can result in uninformative hangs if there are errors. --- jupyter_client/threaded.py | 76 +++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/jupyter_client/threaded.py b/jupyter_client/threaded.py index 1cd65163..b0f28d92 100644 --- a/jupyter_client/threaded.py +++ b/jupyter_client/threaded.py @@ -5,7 +5,8 @@ import atexit import time from concurrent.futures import Future -from threading import Event, Thread +from functools import partial +from threading import Thread from typing import Any, Dict, List, Optional import zmq @@ -54,17 +55,22 @@ def __init__( self.socket = socket self.session = session self.ioloop = loop - evt = Event() + f: Future = Future() def setup_stream(): - assert self.socket is not None - self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) - self.stream.on_recv(self._handle_recv) - evt.set() + try: + assert self.socket is not None + self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) + self.stream.on_recv(self._handle_recv) + except Exception as e: + f.set_exception(e) + else: + f.set_result(None) assert self.ioloop is not None self.ioloop.add_callback(setup_stream) - evt.wait() + # don't wait forever, raise any errors + f.result(timeout=10) _is_alive = False @@ -179,13 +185,31 @@ def flush(self, timeout: float = 1.0) -> None: """ # We do the IOLoop callback process twice to ensure that the IOLoop # gets to perform at least one full poll. - stop_time = time.time() + timeout + stop_time = time.monotonic() + timeout assert self.ioloop is not None + if self.stream is None or self.stream.closed(): + # don't bother scheduling flush on a thread if we're closed + _msg = "Attempt to flush closed stream" + raise OSError(_msg) + + def flush(f): + try: + self._flush() + except Exception as e: + f.set_exception(e) + else: + f.set_result(None) + for _ in range(2): - self._flushed = False - self.ioloop.add_callback(self._flush) - while not self._flushed and time.time() < stop_time: - time.sleep(0.01) + f: Future = Future() + self.ioloop.add_callback(partial(flush, f)) + # wait for async flush, re-raise any errors + timeout = max(stop_time - time.monotonic(), 0) + try: + f.result(max(stop_time - time.monotonic(), 0)) + except TimeoutError: + # flush with a timeout means stop waiting, not raise + return def _flush(self) -> None: """Callback for :method:`self.flush`.""" @@ -219,24 +243,32 @@ def start(self) -> None: Don't return until self.ioloop is defined, which is created in the thread """ - self._start_event = Event() + self._start_future: Future = Future() Thread.start(self) - self._start_event.wait() + # wait for start, re-raise any errors + self._start_future.result(timeout=10) def run(self) -> None: """Run my loop, ignoring EINTR events in the poller""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + async def assign_ioloop(): + self.ioloop = IOLoop.current() + + loop.run_until_complete(assign_ioloop()) + except Exception as e: + self._start_future.set_exception(e) + else: + self._start_future.set_result(None) + loop.run_until_complete(self._async_run()) async def _async_run(self): - self.ioloop = IOLoop.current() - # signal that self.ioloop is defined - self._start_event.set() - while True: + """Run forever (until self._exiting is set)""" + while not self._exiting: await asyncio.sleep(1) - if self._exiting: - break def stop(self) -> None: """Stop the channel's event loop and join its thread.