Skip to content

Commit

Permalink
use c.f.Future to wait across threads
Browse files Browse the repository at this point in the history
Future allows us to propagate errors across threads,
instead of threading.Event, which only waits forever and can result in uninformative hangs if there are errors.
  • Loading branch information
minrk committed Mar 29, 2023
1 parent 2f6357f commit 77b96ad
Showing 1 changed file with 54 additions and 22 deletions.
76 changes: 54 additions & 22 deletions jupyter_client/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import atexit
import time
from concurrent.futures import Future
from threading import Event, Thread
from functools import partial
from threading import Thread
from typing import Any, Dict, List, Optional

import zmq
Expand Down Expand Up @@ -54,17 +55,22 @@ def __init__(
self.socket = socket
self.session = session
self.ioloop = loop
evt = Event()
f: Future = Future()

def setup_stream():
assert self.socket is not None
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
evt.set()
try:
assert self.socket is not None
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
except Exception as e:
f.set_exception(e)
else:
f.set_result(None)

assert self.ioloop is not None
self.ioloop.add_callback(setup_stream)
evt.wait()
# don't wait forever, raise any errors
f.result(timeout=10)

_is_alive = False

Expand Down Expand Up @@ -179,13 +185,31 @@ def flush(self, timeout: float = 1.0) -> None:
"""
# We do the IOLoop callback process twice to ensure that the IOLoop
# gets to perform at least one full poll.
stop_time = time.time() + timeout
stop_time = time.monotonic() + timeout
assert self.ioloop is not None
if self.stream is None or self.stream.closed():
# don't bother scheduling flush on a thread if we're closed
_msg = "Attempt to flush closed stream"
raise OSError(_msg)

def flush(f):
try:
self._flush()
except Exception as e:
f.set_exception(e)
else:
f.set_result(None)

for _ in range(2):
self._flushed = False
self.ioloop.add_callback(self._flush)
while not self._flushed and time.time() < stop_time:
time.sleep(0.01)
f: Future = Future()
self.ioloop.add_callback(partial(flush, f))
# wait for async flush, re-raise any errors
timeout = max(stop_time - time.monotonic(), 0)
try:
f.result(max(stop_time - time.monotonic(), 0))
except TimeoutError:
# flush with a timeout means stop waiting, not raise
return

def _flush(self) -> None:
"""Callback for :method:`self.flush`."""
Expand Down Expand Up @@ -219,24 +243,32 @@ def start(self) -> None:
Don't return until self.ioloop is defined,
which is created in the thread
"""
self._start_event = Event()
self._start_future: Future = Future()
Thread.start(self)
self._start_event.wait()
# wait for start, re-raise any errors
self._start_future.result(timeout=10)

def run(self) -> None:
"""Run my loop, ignoring EINTR events in the poller"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

async def assign_ioloop():
self.ioloop = IOLoop.current()

loop.run_until_complete(assign_ioloop())
except Exception as e:
self._start_future.set_exception(e)
else:
self._start_future.set_result(None)

loop.run_until_complete(self._async_run())

async def _async_run(self):
self.ioloop = IOLoop.current()
# signal that self.ioloop is defined
self._start_event.set()
while True:
"""Run forever (until self._exiting is set)"""
while not self._exiting:
await asyncio.sleep(1)
if self._exiting:
break

def stop(self) -> None:
"""Stop the channel's event loop and join its thread.
Expand Down

0 comments on commit 77b96ad

Please sign in to comment.