From 079763b32fc7e5037e8cbea2e51d4e9753b1602e Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 11 Oct 2022 08:22:35 -0500 Subject: [PATCH 1/4] Backport PR #845: Fix pending kernels again --- jupyter_client/manager.py | 112 ++++++++++++------ jupyter_client/multikernelmanager.py | 16 ++- jupyter_client/tests/test_kernelmanager.py | 8 +- .../tests/test_multikernelmanager.py | 8 +- 4 files changed, 90 insertions(+), 54 deletions(-) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 19afd0db7..0a5886f49 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -55,34 +55,37 @@ class _ShutdownStatus(Enum): F = t.TypeVar('F', bound=t.Callable[..., t.Any]) -def in_pending_state(method: F) -> F: - """Sets the kernel to a pending state by - creating a fresh Future for the KernelManager's `ready` - attribute. Once the method is finished, set the Future's results. - """ +def in_pending_state(prefix: str = '') -> t.Callable[[F], F]: + def decorator(method: F) -> F: + """Sets the kernel to a pending state by + creating a fresh Future for the KernelManager's `ready` + attribute. Once the method is finished, set the Future's results. + """ - @t.no_type_check - @functools.wraps(method) - async def wrapper(self, *args, **kwargs): - # Create a future for the decorated method - try: - self._ready = Future() - except RuntimeError: - # No event loop running, use concurrent future - self._ready = CFuture() - try: - # call wrapped method, await, and set the result or exception. - out = await method(self, *args, **kwargs) - # Add a small sleep to ensure tests can capture the state before done - await asyncio.sleep(0.01) - self._ready.set_result(None) - return out - except Exception as e: - self._ready.set_exception(e) - self.log.exception(self._ready.exception()) - raise e + @t.no_type_check + @functools.wraps(method) + async def wrapper(self, *args, **kwargs): + # Create a future for the decorated method + name = f"{prefix}_ready" + future = getattr(self, name) + if not future or future.done(): + future = self._future_factory() + setattr(self, name, future) + try: + # call wrapped method, await, and set the result or exception. + out = await method(self, *args, **kwargs) + # Add a small sleep to ensure tests can capture the state before done + await asyncio.sleep(0.01) + future.set_result(None) + return out + except Exception as e: + future.set_exception(e) + self.log.exception(future.exception()) + raise e + + return t.cast(F, wrapper) - return t.cast(F, wrapper) + return decorator class KernelManager(ConnectionFileMixin): @@ -93,16 +96,32 @@ class KernelManager(ConnectionFileMixin): _ready: t.Union[Future, CFuture] + @default("event_logger") + def _default_event_logger(self): + if self.parent and hasattr(self.parent, "event_logger"): + return self.parent.event_logger + else: + # If parent does not have an event logger, create one. + logger = EventLogger() + schema_path = DEFAULT_EVENTS_SCHEMA_PATH / "kernel_manager" / "v1.yaml" + logger.register_event_schema(schema_path) + return logger + + def _emit(self, *, action: str) -> None: + """Emit event using the core event schema from Jupyter Server's Contents Manager.""" + self.event_logger.emit( + schema_id=self.event_schema_id, + data={"action": action, "kernel_id": self.kernel_id, "caller": "kernel_manager"}, + ) + + _ready: t.Optional[CFuture] + _shutdown_ready: t.Optional[CFuture] + def __init__(self, *args, **kwargs): super().__init__(**kwargs) self._shutdown_status = _ShutdownStatus.Unset - # Create a place holder future. - try: - asyncio.get_running_loop() - self._ready = Future() - except RuntimeError: - # No event loop running, use concurrent future - self._ready = CFuture() + self._ready = None + self._shutdown_ready = None _created_context: Bool = Bool(False) @@ -120,6 +139,8 @@ def _context_default(self) -> zmq.Context: ) client_factory: Type = Type(klass="jupyter_client.KernelClient") + _future_factory: t.Type[CFuture] = CFuture + @default("client_factory") # type:ignore[misc] def _client_factory_default(self) -> Type: return import_item(self.client_class) @@ -185,10 +206,21 @@ def _default_cache_ports(self) -> bool: return self.transport == "tcp" @property - def ready(self) -> t.Union[CFuture, Future]: - """A future that resolves when the kernel process has started for the first time""" + def ready(self) -> CFuture: + """A future that resolves when the kernel process has started.""" + if not self._ready: + self._ready = self._future_factory() + assert self._ready is not None return self._ready + @property + def shutdown_ready(self) -> CFuture: + """A future that resolves when the kernel process has shut down.""" + if not self._shutdown_ready: + self._shutdown_ready = self._future_factory() + assert self._shutdown_ready is not None + return self._shutdown_ready + @property def ipykernel(self) -> bool: return self.kernel_name in {"python", "python2", "python3"} @@ -369,7 +401,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None: post_start_kernel = run_sync(_async_post_start_kernel) - @in_pending_state + @in_pending_state() async def _async_start_kernel(self, **kw: t.Any) -> None: """Starts a kernel on this host in a separate process. @@ -462,7 +494,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None: cleanup_resources = run_sync(_async_cleanup_resources) - @in_pending_state + @in_pending_state('_shutdown') async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None: """Attempts to stop the kernel process cleanly. @@ -481,6 +513,8 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) Will this kernel be restarted after it is shutdown. When this is True, connection files will not be cleaned up. """ + # Reset the start ready future. + self._ready = self._future_factory() self.shutting_down = True # Used by restarter to prevent race condition # Stop monitoring for restarting while we shutdown. self.stop_restarter() @@ -643,6 +677,10 @@ class AsyncKernelManager(KernelManager): ) client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient") + # The PyZMQ Context to use for communication with the kernel. + context: Instance = Instance(zmq.asyncio.Context) + + _future_factory: t.Type[Future] = Future # type:ignore[assignment] _launch_kernel = KernelManager._async_launch_kernel start_kernel = KernelManager._async_start_kernel pre_start_kernel = KernelManager._async_pre_start_kernel diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 7dceb9448..682b5e097 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -221,9 +221,7 @@ async def _async_start_kernel( self._kernels[kernel_id] = km else: await task - # raise an exception if one occurred during kernel startup. - if km.ready.exception(): - raise km.ready.exception() # type: ignore + await asyncio.wrap_future(km.ready) return kernel_id @@ -253,7 +251,7 @@ async def _async_shutdown_kernel( try: await task km = self.get_kernel(kernel_id) - await t.cast(asyncio.Future, km.ready) + await asyncio.wrap_future(km.ready) except asyncio.CancelledError: pass except Exception: @@ -261,7 +259,9 @@ async def _async_shutdown_kernel( return km = self.get_kernel(kernel_id) # If a pending kernel raised an exception, remove it. - if not km.ready.cancelled() and km.ready.exception(): + try: + await asyncio.wrap_future(km.ready) + except Exception: self.remove_kernel(kernel_id) return stopper = ensure_async(km.shutdown_kernel(now, restart)) @@ -270,9 +270,7 @@ async def _async_shutdown_kernel( # Await the kernel if not using pending kernels. if not self._using_pending_kernels(): await fut - # raise an exception if one occurred during kernel shutdown. - if km.ready.exception(): - raise km.ready.exception() # type: ignore + await asyncio.wrap_future(km.shutdown_ready) shutdown_kernel = run_sync(_async_shutdown_kernel) @@ -315,7 +313,7 @@ async def _async_shutdown_all(self, now: bool = False) -> None: if self._using_pending_kernels(): for km in kms: try: - await km.ready + await km.shutdown_ready except asyncio.CancelledError: self._pending_kernels[km.kernel_id].cancel() except Exception: diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index e1198597b..6bac48459 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -112,13 +112,13 @@ def zmq_context(): @pytest.fixture(params=[AsyncKernelManager, AsyncKMSubclass]) -def async_km(request, config): +async def async_km(request, config): km = request.param(config=config) return km @pytest.fixture -def async_km_subclass(config): +async def async_km_subclass(config): km = AsyncKMSubclass(config=config) return km @@ -451,11 +451,11 @@ async def test_lifecycle(self, async_km): await async_km.start_kernel(stdout=PIPE, stderr=PIPE) is_alive = await async_km.is_alive() assert is_alive - is_ready = async_km.ready.done() - assert is_ready + await async_km.ready await async_km.restart_kernel(now=True) is_alive = await async_km.is_alive() assert is_alive + await async_km.ready await async_km.interrupt_kernel() assert isinstance(async_km, AsyncKernelManager) await async_km.shutdown_kernel(now=True) diff --git a/jupyter_client/tests/test_multikernelmanager.py b/jupyter_client/tests/test_multikernelmanager.py index 0437e5349..d61c2830d 100644 --- a/jupyter_client/tests/test_multikernelmanager.py +++ b/jupyter_client/tests/test_multikernelmanager.py @@ -395,7 +395,7 @@ async def test_use_pending_kernels(self): assert isinstance(k, AsyncKernelManager) await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.ready + await kernel.shutdown_ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -409,7 +409,7 @@ async def test_use_pending_kernels_early_restart(self): await kernel.ready await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.ready + await kernel.shutdown_ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -421,7 +421,7 @@ async def test_use_pending_kernels_early_shutdown(self): # Try shutting down while the kernel is pending await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.ready + await kernel.shutdown_ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -436,7 +436,7 @@ async def test_use_pending_kernels_early_interrupt(self): await kernel.ready await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown - await kernel.ready + await kernel.shutdown_ready assert kid not in km, f"{kid} not in {km}" @gen_test From 7190a1d7b6a936a43ea0834c0ca7af02c65619c6 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 11 Oct 2022 08:29:18 -0500 Subject: [PATCH 2/4] remove extra changes --- jupyter_client/manager.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 0a5886f49..4176a3dfa 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -95,26 +95,6 @@ class KernelManager(ConnectionFileMixin): """ _ready: t.Union[Future, CFuture] - - @default("event_logger") - def _default_event_logger(self): - if self.parent and hasattr(self.parent, "event_logger"): - return self.parent.event_logger - else: - # If parent does not have an event logger, create one. - logger = EventLogger() - schema_path = DEFAULT_EVENTS_SCHEMA_PATH / "kernel_manager" / "v1.yaml" - logger.register_event_schema(schema_path) - return logger - - def _emit(self, *, action: str) -> None: - """Emit event using the core event schema from Jupyter Server's Contents Manager.""" - self.event_logger.emit( - schema_id=self.event_schema_id, - data={"action": action, "kernel_id": self.kernel_id, "caller": "kernel_manager"}, - ) - - _ready: t.Optional[CFuture] _shutdown_ready: t.Optional[CFuture] def __init__(self, *args, **kwargs): @@ -677,9 +657,6 @@ class AsyncKernelManager(KernelManager): ) client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient") - # The PyZMQ Context to use for communication with the kernel. - context: Instance = Instance(zmq.asyncio.Context) - _future_factory: t.Type[Future] = Future # type:ignore[assignment] _launch_kernel = KernelManager._async_launch_kernel start_kernel = KernelManager._async_start_kernel From 162cc7139dea4f8427f7e4fcad49ac391ddfc9d4 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 11 Oct 2022 08:31:47 -0500 Subject: [PATCH 3/4] ignore warnings in prereleases --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4c3540ec8..157da7506 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -130,7 +130,7 @@ jobs: pip check - name: Run the tests run: | - pytest -vv jupyter_client || pytest -vv jupyter_client --lf + pytest -vv -W default jupyter_client || pytest -vv -W default jupyter_client --lf make_sdist: name: Make SDist From 7496beac0869f64efb18eda2110ea48cba635920 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 11 Oct 2022 08:35:38 -0500 Subject: [PATCH 4/4] lint --- jupyter_client/manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 4176a3dfa..426acf69d 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -94,7 +94,7 @@ class KernelManager(ConnectionFileMixin): This version starts kernels with Popen. """ - _ready: t.Union[Future, CFuture] + _ready: t.Optional[t.Union[Future, CFuture]] _shutdown_ready: t.Optional[CFuture] def __init__(self, *args, **kwargs): @@ -191,7 +191,7 @@ def ready(self) -> CFuture: if not self._ready: self._ready = self._future_factory() assert self._ready is not None - return self._ready + return self._ready # type:ignore[return-value] @property def shutdown_ready(self) -> CFuture: