Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support external kernels #961

Merged
merged 4 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ async def wrapper(self, *args, **kwargs):
out = await method(self, *args, **kwargs)
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
self._ready.set_result(None)
if self.owns_kernel:
self._ready.set_result(None)
return out
except Exception as e:
self._ready.set_exception(e)
Expand All @@ -105,6 +106,7 @@ class KernelManager(ConnectionFileMixin):

def __init__(self, *args, **kwargs):
"""Initialize a kernel manager."""
self._owns_kernel = kwargs.pop("owns_kernel", True)
super().__init__(**kwargs)
self._shutdown_status = _ShutdownStatus.Unset
self._attempted_start = False
Expand Down Expand Up @@ -495,6 +497,9 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
Will this kernel be restarted after it is shutdown. When this
is True, connection files will not be cleaned up.
"""
if not self.owns_kernel:
return

self.shutting_down = True # Used by restarter to prevent race condition
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()
Expand Down Expand Up @@ -558,6 +563,10 @@ async def _async_restart_kernel(

restart_kernel = run_sync(_async_restart_kernel)

@property
def owns_kernel(self) -> bool:
return self._owns_kernel

@property
def has_kernel(self) -> bool:
"""Has a kernel process been started that we are actively managing."""
Expand Down Expand Up @@ -646,6 +655,9 @@ async def _async_signal_kernel(self, signum: int) -> None:

async def _async_is_alive(self) -> bool:
"""Is the kernel process still running?"""
if not self.owns_kernel:
return True

if self.has_kernel:
assert self.provisioner is not None
ret = await self.provisioner.poll()
Expand Down
55 changes: 54 additions & 1 deletion jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
import json
import os
import socket
import typing as t
import uuid
from functools import wraps
from pathlib import Path

import zmq
from traitlets import Any, Bool, Dict, DottedObjectName, Instance, Unicode, default, observe
from traitlets.config.configurable import LoggingConfigurable
from traitlets.utils.importstring import import_item

from .connect import KernelConnectionInfo
from .kernelspec import NATIVE_KERNEL_NAME, KernelSpecManager
from .manager import KernelManager
from .utils import ensure_async, run_sync
from .utils import ensure_async, run_sync, utcnow


class DuplicateKernelError(Exception):
Expand Down Expand Up @@ -105,9 +108,14 @@ def _context_default(self) -> zmq.Context:
return zmq.Context()

connection_dir = Unicode("")
external_connection_dir = Unicode(None, allow_none=True)
davidbrochart marked this conversation as resolved.
Show resolved Hide resolved

_kernels = Dict()

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.kernel_id_to_connection_file = {}

def __del__(self):
"""Handle garbage collection. Destroy context if applicable."""
if self._created_context and self.context and not self.context.closed:
Expand All @@ -123,6 +131,51 @@ def __del__(self):

def list_kernel_ids(self) -> t.List[str]:
"""Return a list of the kernel ids of the active kernels."""
if self.external_connection_dir is not None:
external_connection_dir = Path(self.external_connection_dir)
if external_connection_dir.is_dir():
connection_files = [p for p in external_connection_dir.iterdir() if p.is_file()]

# remove kernels (whose connection file has disappeared) from our list
k = list(self.kernel_id_to_connection_file.keys())
v = list(self.kernel_id_to_connection_file.values())
for connection_file in list(self.kernel_id_to_connection_file.values()):
if connection_file not in connection_files:
kernel_id = k[v.index(connection_file)]
del self.kernel_id_to_connection_file[kernel_id]
del self._kernels[kernel_id]

# add kernels (whose connection file appeared) to our list
for connection_file in connection_files:
if connection_file in self.kernel_id_to_connection_file.values():
continue
try:
connection_info: KernelConnectionInfo = json.loads(
connection_file.read_text()
)
except Exception: # noqa: S112
continue
self.log.debug("Loading connection file %s", connection_file)
if not ("kernel_name" in connection_info and "key" in connection_info):
continue
# it looks like a connection file
kernel_id = self.new_kernel_id()
self.kernel_id_to_connection_file[kernel_id] = connection_file
km = self.kernel_manager_factory(
parent=self,
log=self.log,
owns_kernel=False,
)
km.load_connection_info(connection_info)
km.last_activity = utcnow()
km.execution_state = "idle"
km.connections = 1
km.kernel_id = kernel_id
km.kernel_name = connection_info["kernel_name"]
km.ready.set_result(None)

self._kernels[kernel_id] = km

# Create a copy so we can iterate over kernels in operations
# that delete keys.
return list(self._kernels.keys())
Expand Down
33 changes: 33 additions & 0 deletions jupyter_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- vendor functions from ipython_genutils that should be retired at some point.
"""
import os
from datetime import datetime, timedelta, tzinfo

from jupyter_core.utils import ensure_async, run_sync # noqa: F401 # noqa: F401

Expand Down Expand Up @@ -83,3 +84,35 @@ def _expand_path(s):
if os.name == "nt":
s = s.replace("IPYTHON_TEMP", "$\\")
return s


# constant for zero offset
ZERO = timedelta(0)


class tzUTC(tzinfo): # noqa
"""tzinfo object for UTC (zero offset)"""

def utcoffset(self, d):
"""Compute utcoffset."""
return ZERO

def dst(self, d):
"""Compute dst."""
return ZERO


UTC = tzUTC() # type:ignore


def utc_aware(unaware):
"""decorator for adding UTC tzinfo to datetime's utcfoo methods"""

def utc_method(*args, **kwargs):
dt = unaware(*args, **kwargs)
return dt.replace(tzinfo=UTC)

return utc_method


utcnow = utc_aware(datetime.utcnow)