diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4c3540ec8..157da7506 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -130,7 +130,7 @@ jobs: pip check - name: Run the tests run: | - pytest -vv jupyter_client || pytest -vv jupyter_client --lf + pytest -vv -W default jupyter_client || pytest -vv -W default jupyter_client --lf make_sdist: name: Make SDist diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 19afd0db7..426acf69d 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -55,34 +55,37 @@ class _ShutdownStatus(Enum): F = t.TypeVar('F', bound=t.Callable[..., t.Any]) -def in_pending_state(method: F) -> F: - """Sets the kernel to a pending state by - creating a fresh Future for the KernelManager's `ready` - attribute. Once the method is finished, set the Future's results. - """ +def in_pending_state(prefix: str = '') -> t.Callable[[F], F]: + def decorator(method: F) -> F: + """Sets the kernel to a pending state by + creating a fresh Future for the KernelManager's `ready` + attribute. Once the method is finished, set the Future's results. + """ - @t.no_type_check - @functools.wraps(method) - async def wrapper(self, *args, **kwargs): - # Create a future for the decorated method - try: - self._ready = Future() - except RuntimeError: - # No event loop running, use concurrent future - self._ready = CFuture() - try: - # call wrapped method, await, and set the result or exception. - out = await method(self, *args, **kwargs) - # Add a small sleep to ensure tests can capture the state before done - await asyncio.sleep(0.01) - self._ready.set_result(None) - return out - except Exception as e: - self._ready.set_exception(e) - self.log.exception(self._ready.exception()) - raise e + @t.no_type_check + @functools.wraps(method) + async def wrapper(self, *args, **kwargs): + # Create a future for the decorated method + name = f"{prefix}_ready" + future = getattr(self, name) + if not future or future.done(): + future = self._future_factory() + setattr(self, name, future) + try: + # call wrapped method, await, and set the result or exception. + out = await method(self, *args, **kwargs) + # Add a small sleep to ensure tests can capture the state before done + await asyncio.sleep(0.01) + future.set_result(None) + return out + except Exception as e: + future.set_exception(e) + self.log.exception(future.exception()) + raise e - return t.cast(F, wrapper) + return t.cast(F, wrapper) + + return decorator class KernelManager(ConnectionFileMixin): @@ -91,18 +94,14 @@ class KernelManager(ConnectionFileMixin): This version starts kernels with Popen. """ - _ready: t.Union[Future, CFuture] + _ready: t.Optional[t.Union[Future, CFuture]] + _shutdown_ready: t.Optional[CFuture] def __init__(self, *args, **kwargs): super().__init__(**kwargs) self._shutdown_status = _ShutdownStatus.Unset - # Create a place holder future. - try: - asyncio.get_running_loop() - self._ready = Future() - except RuntimeError: - # No event loop running, use concurrent future - self._ready = CFuture() + self._ready = None + self._shutdown_ready = None _created_context: Bool = Bool(False) @@ -120,6 +119,8 @@ def _context_default(self) -> zmq.Context: ) client_factory: Type = Type(klass="jupyter_client.KernelClient") + _future_factory: t.Type[CFuture] = CFuture + @default("client_factory") # type:ignore[misc] def _client_factory_default(self) -> Type: return import_item(self.client_class) @@ -185,9 +186,20 @@ def _default_cache_ports(self) -> bool: return self.transport == "tcp" @property - def ready(self) -> t.Union[CFuture, Future]: - """A future that resolves when the kernel process has started for the first time""" - return self._ready + def ready(self) -> CFuture: + """A future that resolves when the kernel process has started.""" + if not self._ready: + self._ready = self._future_factory() + assert self._ready is not None + return self._ready # type:ignore[return-value] + + @property + def shutdown_ready(self) -> CFuture: + """A future that resolves when the kernel process has shut down.""" + if not self._shutdown_ready: + self._shutdown_ready = self._future_factory() + assert self._shutdown_ready is not None + return self._shutdown_ready @property def ipykernel(self) -> bool: @@ -369,7 +381,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None: post_start_kernel = run_sync(_async_post_start_kernel) - @in_pending_state + @in_pending_state() async def _async_start_kernel(self, **kw: t.Any) -> None: """Starts a kernel on this host in a separate process. @@ -462,7 +474,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None: cleanup_resources = run_sync(_async_cleanup_resources) - @in_pending_state + @in_pending_state('_shutdown') async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None: """Attempts to stop the kernel process cleanly. @@ -481,6 +493,8 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) Will this kernel be restarted after it is shutdown. When this is True, connection files will not be cleaned up. """ + # Reset the start ready future. + self._ready = self._future_factory() self.shutting_down = True # Used by restarter to prevent race condition # Stop monitoring for restarting while we shutdown. self.stop_restarter() @@ -643,6 +657,7 @@ class AsyncKernelManager(KernelManager): ) client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient") + _future_factory: t.Type[Future] = Future # type:ignore[assignment] _launch_kernel = KernelManager._async_launch_kernel start_kernel = KernelManager._async_start_kernel pre_start_kernel = KernelManager._async_pre_start_kernel diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 7dceb9448..682b5e097 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -221,9 +221,7 @@ async def _async_start_kernel( self._kernels[kernel_id] = km else: await task - # raise an exception if one occurred during kernel startup. - if km.ready.exception(): - raise km.ready.exception() # type: ignore + await asyncio.wrap_future(km.ready) return kernel_id @@ -253,7 +251,7 @@ async def _async_shutdown_kernel( try: await task km = self.get_kernel(kernel_id) - await t.cast(asyncio.Future, km.ready) + await asyncio.wrap_future(km.ready) except asyncio.CancelledError: pass except Exception: @@ -261,7 +259,9 @@ async def _async_shutdown_kernel( return km = self.get_kernel(kernel_id) # If a pending kernel raised an exception, remove it. - if not km.ready.cancelled() and km.ready.exception(): + try: + await asyncio.wrap_future(km.ready) + except Exception: self.remove_kernel(kernel_id) return stopper = ensure_async(km.shutdown_kernel(now, restart)) @@ -270,9 +270,7 @@ async def _async_shutdown_kernel( # Await the kernel if not using pending kernels. if not self._using_pending_kernels(): await fut - # raise an exception if one occurred during kernel shutdown. - if km.ready.exception(): - raise km.ready.exception() # type: ignore + await asyncio.wrap_future(km.shutdown_ready) shutdown_kernel = run_sync(_async_shutdown_kernel) @@ -315,7 +313,7 @@ async def _async_shutdown_all(self, now: bool = False) -> None: if self._using_pending_kernels(): for km in kms: try: - await km.ready + await km.shutdown_ready except asyncio.CancelledError: self._pending_kernels[km.kernel_id].cancel() except Exception: diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index e1198597b..6bac48459 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -112,13 +112,13 @@ def zmq_context(): @pytest.fixture(params=[AsyncKernelManager, AsyncKMSubclass]) -def async_km(request, config): +async def async_km(request, config): km = request.param(config=config) return km @pytest.fixture -def async_km_subclass(config): +async def async_km_subclass(config): km = AsyncKMSubclass(config=config) return km @@ -451,11 +451,11 @@ async def test_lifecycle(self, async_km): await async_km.start_kernel(stdout=PIPE, stderr=PIPE) is_alive = await async_km.is_alive() assert is_alive - is_ready = async_km.ready.done() - assert is_ready + await async_km.ready await async_km.restart_kernel(now=True) is_alive = await async_km.is_alive() assert is_alive + await async_km.ready await async_km.interrupt_kernel() assert isinstance(async_km, AsyncKernelManager) await async_km.shutdown_kernel(now=True) diff --git a/jupyter_client/tests/test_multikernelmanager.py b/jupyter_client/tests/test_multikernelmanager.py index 0437e5349..d61c2830d 100644 --- a/jupyter_client/tests/test_multikernelmanager.py +++ b/jupyter_client/tests/test_multikernelmanager.py @@ -395,7 +395,7 @@ async def test_use_pending_kernels(self): assert isinstance(k, AsyncKernelManager) await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.ready + await kernel.shutdown_ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -409,7 +409,7 @@ async def test_use_pending_kernels_early_restart(self): await kernel.ready await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.ready + await kernel.shutdown_ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -421,7 +421,7 @@ async def test_use_pending_kernels_early_shutdown(self): # Try shutting down while the kernel is pending await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.ready + await kernel.shutdown_ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -436,7 +436,7 @@ async def test_use_pending_kernels_early_interrupt(self): await kernel.ready await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.ready + await kernel.shutdown_ready assert kid not in km, f"{kid} not in {km}" @gen_test