diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 157da7506..4c3540ec8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -130,7 +130,7 @@ jobs: pip check - name: Run the tests run: | - pytest -vv -W default jupyter_client || pytest -vv -W default jupyter_client --lf + pytest -vv jupyter_client || pytest -vv jupyter_client --lf make_sdist: name: Make SDist diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 426acf69d..19afd0db7 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -55,37 +55,34 @@ class _ShutdownStatus(Enum): F = t.TypeVar('F', bound=t.Callable[..., t.Any]) -def in_pending_state(prefix: str = '') -> t.Callable[[F], F]: - def decorator(method: F) -> F: - """Sets the kernel to a pending state by - creating a fresh Future for the KernelManager's `ready` - attribute. Once the method is finished, set the Future's results. - """ - - @t.no_type_check - @functools.wraps(method) - async def wrapper(self, *args, **kwargs): - # Create a future for the decorated method - name = f"{prefix}_ready" - future = getattr(self, name) - if not future or future.done(): - future = self._future_factory() - setattr(self, name, future) - try: - # call wrapped method, await, and set the result or exception. - out = await method(self, *args, **kwargs) - # Add a small sleep to ensure tests can capture the state before done - await asyncio.sleep(0.01) - future.set_result(None) - return out - except Exception as e: - future.set_exception(e) - self.log.exception(future.exception()) - raise e +def in_pending_state(method: F) -> F: + """Sets the kernel to a pending state by + creating a fresh Future for the KernelManager's `ready` + attribute. Once the method is finished, set the Future's results. + """ - return t.cast(F, wrapper) + @t.no_type_check + @functools.wraps(method) + async def wrapper(self, *args, **kwargs): + # Create a future for the decorated method + try: + self._ready = Future() + except RuntimeError: + # No event loop running, use concurrent future + self._ready = CFuture() + try: + # call wrapped method, await, and set the result or exception. + out = await method(self, *args, **kwargs) + # Add a small sleep to ensure tests can capture the state before done + await asyncio.sleep(0.01) + self._ready.set_result(None) + return out + except Exception as e: + self._ready.set_exception(e) + self.log.exception(self._ready.exception()) + raise e - return decorator + return t.cast(F, wrapper) class KernelManager(ConnectionFileMixin): @@ -94,14 +91,18 @@ class KernelManager(ConnectionFileMixin): This version starts kernels with Popen. """ - _ready: t.Optional[t.Union[Future, CFuture]] - _shutdown_ready: t.Optional[CFuture] + _ready: t.Union[Future, CFuture] def __init__(self, *args, **kwargs): super().__init__(**kwargs) self._shutdown_status = _ShutdownStatus.Unset - self._ready = None - self._shutdown_ready = None + # Create a place holder future. + try: + asyncio.get_running_loop() + self._ready = Future() + except RuntimeError: + # No event loop running, use concurrent future + self._ready = CFuture() _created_context: Bool = Bool(False) @@ -119,8 +120,6 @@ def _context_default(self) -> zmq.Context: ) client_factory: Type = Type(klass="jupyter_client.KernelClient") - _future_factory: t.Type[CFuture] = CFuture - @default("client_factory") # type:ignore[misc] def _client_factory_default(self) -> Type: return import_item(self.client_class) @@ -186,20 +185,9 @@ def _default_cache_ports(self) -> bool: return self.transport == "tcp" @property - def ready(self) -> CFuture: - """A future that resolves when the kernel process has started.""" - if not self._ready: - self._ready = self._future_factory() - assert self._ready is not None - return self._ready # type:ignore[return-value] - - @property - def shutdown_ready(self) -> CFuture: - """A future that resolves when the kernel process has shut down.""" - if not self._shutdown_ready: - self._shutdown_ready = self._future_factory() - assert self._shutdown_ready is not None - return self._shutdown_ready + def ready(self) -> t.Union[CFuture, Future]: + """A future that resolves when the kernel process has started for the first time""" + return self._ready @property def ipykernel(self) -> bool: @@ -381,7 +369,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None: post_start_kernel = run_sync(_async_post_start_kernel) - @in_pending_state() + @in_pending_state async def _async_start_kernel(self, **kw: t.Any) -> None: """Starts a kernel on this host in a separate process. @@ -474,7 +462,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None: cleanup_resources = run_sync(_async_cleanup_resources) - @in_pending_state('_shutdown') + @in_pending_state async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None: """Attempts to stop the kernel process cleanly. @@ -493,8 +481,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) Will this kernel be restarted after it is shutdown. When this is True, connection files will not be cleaned up. """ - # Reset the start ready future. - self._ready = self._future_factory() self.shutting_down = True # Used by restarter to prevent race condition # Stop monitoring for restarting while we shutdown. self.stop_restarter() @@ -657,7 +643,6 @@ class AsyncKernelManager(KernelManager): ) client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient") - _future_factory: t.Type[Future] = Future # type:ignore[assignment] _launch_kernel = KernelManager._async_launch_kernel start_kernel = KernelManager._async_start_kernel pre_start_kernel = KernelManager._async_pre_start_kernel diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 682b5e097..7dceb9448 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -221,7 +221,9 @@ async def _async_start_kernel( self._kernels[kernel_id] = km else: await task - await asyncio.wrap_future(km.ready) + # raise an exception if one occurred during kernel startup. + if km.ready.exception(): + raise km.ready.exception() # type: ignore return kernel_id @@ -251,7 +253,7 @@ async def _async_shutdown_kernel( try: await task km = self.get_kernel(kernel_id) - await asyncio.wrap_future(km.ready) + await t.cast(asyncio.Future, km.ready) except asyncio.CancelledError: pass except Exception: @@ -259,9 +261,7 @@ async def _async_shutdown_kernel( return km = self.get_kernel(kernel_id) # If a pending kernel raised an exception, remove it. - try: - await asyncio.wrap_future(km.ready) - except Exception: + if not km.ready.cancelled() and km.ready.exception(): self.remove_kernel(kernel_id) return stopper = ensure_async(km.shutdown_kernel(now, restart)) @@ -270,7 +270,9 @@ async def _async_shutdown_kernel( # Await the kernel if not using pending kernels. if not self._using_pending_kernels(): await fut - await asyncio.wrap_future(km.shutdown_ready) + # raise an exception if one occurred during kernel shutdown. + if km.ready.exception(): + raise km.ready.exception() # type: ignore shutdown_kernel = run_sync(_async_shutdown_kernel) @@ -313,7 +315,7 @@ async def _async_shutdown_all(self, now: bool = False) -> None: if self._using_pending_kernels(): for km in kms: try: - await km.shutdown_ready + await km.ready except asyncio.CancelledError: self._pending_kernels[km.kernel_id].cancel() except Exception: diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index 6bac48459..e1198597b 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -112,13 +112,13 @@ def zmq_context(): @pytest.fixture(params=[AsyncKernelManager, AsyncKMSubclass]) -async def async_km(request, config): +def async_km(request, config): km = request.param(config=config) return km @pytest.fixture -async def async_km_subclass(config): +def async_km_subclass(config): km = AsyncKMSubclass(config=config) return km @@ -451,11 +451,11 @@ async def test_lifecycle(self, async_km): await async_km.start_kernel(stdout=PIPE, stderr=PIPE) is_alive = await async_km.is_alive() assert is_alive - await async_km.ready + is_ready = async_km.ready.done() + assert is_ready await async_km.restart_kernel(now=True) is_alive = await async_km.is_alive() assert is_alive - await async_km.ready await async_km.interrupt_kernel() assert isinstance(async_km, AsyncKernelManager) await async_km.shutdown_kernel(now=True) diff --git a/jupyter_client/tests/test_multikernelmanager.py b/jupyter_client/tests/test_multikernelmanager.py index d61c2830d..0437e5349 100644 --- a/jupyter_client/tests/test_multikernelmanager.py +++ b/jupyter_client/tests/test_multikernelmanager.py @@ -395,7 +395,7 @@ async def test_use_pending_kernels(self): assert isinstance(k, AsyncKernelManager) await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.shutdown_ready + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -409,7 +409,7 @@ async def test_use_pending_kernels_early_restart(self): await kernel.ready await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.shutdown_ready + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -421,7 +421,7 @@ async def test_use_pending_kernels_early_shutdown(self): # Try shutting down while the kernel is pending await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.shutdown_ready + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -436,7 +436,7 @@ async def test_use_pending_kernels_early_interrupt(self): await kernel.ready await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.shutdown_ready + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test