Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Jupyter Events and emit events from the Kernel Manager #832

Merged
merged 9 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
- name: Run the tests on pypy and windows
if: ${{ startsWith(matrix.python-version, 'pypy') || startsWith(matrix.os, 'windows') }}
run: |
python -m pytest -vv || python -m pytest -vv --lf
python -m pytest --pyargs jupyter_client -vv || python -m pytest -vv --lf
blink1073 marked this conversation as resolved.
Show resolved Hide resolved

- name: Code coverage
run: codecov
Expand Down
5 changes: 5 additions & 0 deletions jupyter_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
"""Client-side implementations of the Jupyter protocol"""
import pathlib

from ._version import __version__ # noqa
from ._version import protocol_version # noqa
from ._version import protocol_version_info # noqa
from ._version import version_info # noqa

JUPYTER_CLIENT_EVENTS_URI = "https://events.jupyter.org/jupyter_client"
DEFAULT_EVENTS_SCHEMA_PATH = pathlib.Path(__file__).parent / "event_schemas"

try:
from .asynchronous import AsyncKernelClient # noqa
from .blocking import BlockingKernelClient # noqa
Expand Down
36 changes: 36 additions & 0 deletions jupyter_client/event_schemas/kernel_manager/v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"$id": https://events.jupyter.org/jupyter_client/kernel_manager/v1
version: 1
title: Kernel Manager Events
description: |
Record actions on kernels by the KernelManager.
type: object
required:
- kernel_id
- action
properties:
kernel_id:
oneOf:
- type: string
- type: "null"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good @Zsailer - just have one question.

Is this in anticipation of supporting other actions, because all of the currently defined actions fire with a non-None kernel_id? If we retained that approach, we could then apply a format or pattern, allowing only UUID formats (and remove the oneOf and null). This might allow event consumers to have an easier experience as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because all of the currently defined actions fire with a non-None kernel_id?

I don't think start_kernel or pre_start_kernel methods strictly require a kernel_id to start, right? This logic seems to suggest that if None is provided, it creates a uuid on the fly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct (on both counts; non-required kernel_id and auto-create logic). The provisioners should have access to the kernel-id from the get-go so they can do indexing, etc. as necessary.

description: The kernel's unique ID.
action:
enum:
- pre_start
- launch
- post_start
- interrupt
- restart
- kill
- request_shutdown
- finish_shutdown
- cleanup_resources
- restart_started
- restart_finished
- shutdown_started
- shutdown_finished
description: |
Action performed by the KernelManager API.
caller:
type: string
enum:
- kernel_manager
36 changes: 36 additions & 0 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from enum import Enum

import zmq
from jupyter_events import EventLogger
from traitlets import Any
from traitlets import Bool
from traitlets import default
Expand All @@ -33,6 +34,8 @@
from .provisioning import KernelProvisionerFactory as KPF
from .utils import ensure_async
from .utils import run_sync
from jupyter_client import DEFAULT_EVENTS_SCHEMA_PATH
from jupyter_client import JUPYTER_CLIENT_EVENTS_URI
from jupyter_client import KernelClient
from jupyter_client import kernelspec

Expand Down Expand Up @@ -91,6 +94,27 @@ class KernelManager(ConnectionFileMixin):
This version starts kernels with Popen.
"""

event_schema_id = JUPYTER_CLIENT_EVENTS_URI + "/kernel_manager/v1"
event_logger = Instance(EventLogger).tag(config=True)

@default("event_logger")
def _default_event_logger(self):
if self.parent and hasattr(self.parent, "event_logger"):
return self.parent.event_logger
else:
# If parent does not have an event logger, create one.
logger = EventLogger()
schema_path = DEFAULT_EVENTS_SCHEMA_PATH / "kernel_manager" / "v1.yaml"
logger.register_event_schema(schema_path)
return logger

def _emit(self, *, action: str):
"""Emit event using the core event schema from Jupyter Server's Contents Manager."""
self.event_logger.emit(
schema_id=self.event_schema_id,
data={"action": action, "kernel_id": self.kernel_id, "caller": "kernel_manager"},
)

_ready: t.Union[Future, CFuture]

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -308,6 +332,7 @@ async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> No
assert self.provisioner.has_process
# Provisioner provides the connection information. Load into kernel manager and write file.
self._force_connection_info(connection_info)
self._emit(action="launch")

_launch_kernel = run_sync(_async_launch_kernel)

Expand Down Expand Up @@ -350,6 +375,7 @@ async def _async_pre_start_kernel(
)
kw = await self.provisioner.pre_launch(**kw)
kernel_cmd = kw.pop('cmd')
self._emit(action="pre_start")
return kernel_cmd, kw

pre_start_kernel = run_sync(_async_pre_start_kernel)
Expand All @@ -366,6 +392,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None:
self._connect_control_socket()
assert self.provisioner is not None
await self.provisioner.post_launch(**kw)
self._emit(action="post_start")

post_start_kernel = run_sync(_async_post_start_kernel)

Expand Down Expand Up @@ -401,6 +428,7 @@ async def _async_request_shutdown(self, restart: bool = False) -> None:
assert self.provisioner is not None
await self.provisioner.shutdown_requested(restart=restart)
self._shutdown_status = _ShutdownStatus.ShutdownRequest
self._emit(action="request_shutdown")

request_shutdown = run_sync(_async_request_shutdown)

Expand Down Expand Up @@ -442,6 +470,7 @@ async def _async_finish_shutdown(
if self.has_kernel:
assert self.provisioner is not None
await self.provisioner.wait()
self._emit(action="finish_shutdown")

finish_shutdown = run_sync(_async_finish_shutdown)

Expand All @@ -459,6 +488,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:

if self.provisioner:
await self.provisioner.cleanup(restart=restart)
self._emit(action="cleanup_resources")

cleanup_resources = run_sync(_async_cleanup_resources)

Expand All @@ -481,6 +511,7 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
Will this kernel be restarted after it is shutdown. When this
is True, connection files will not be cleaned up.
"""
self._emit(action="shutdown_started")
self.shutting_down = True # Used by restarter to prevent race condition
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()
Expand All @@ -498,6 +529,7 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
await ensure_async(self.finish_shutdown(restart=restart))

await ensure_async(self.cleanup_resources(restart=restart))
self._emit(action="shutdown_finished")

shutdown_kernel = run_sync(_async_shutdown_kernel)

Expand Down Expand Up @@ -528,6 +560,7 @@ async def _async_restart_kernel(
Any options specified here will overwrite those used to launch the
kernel.
"""
self._emit(action="restart_started")
if self._launch_args is None:
raise RuntimeError("Cannot restart the kernel. No previous call to 'start_kernel'.")

Expand All @@ -540,6 +573,7 @@ async def _async_restart_kernel(
# Start new kernel.
self._launch_args.update(kw)
await ensure_async(self.start_kernel(**self._launch_args))
self._emit(action="restart_finished")

restart_kernel = run_sync(_async_restart_kernel)

Expand Down Expand Up @@ -576,6 +610,7 @@ async def _async_kill_kernel(self, restart: bool = False) -> None:
# Process is no longer alive, wait and clear
if self.has_kernel:
await self.provisioner.wait()
self._emit(action="kill")

_kill_kernel = run_sync(_async_kill_kernel)

Expand All @@ -597,6 +632,7 @@ async def _async_interrupt_kernel(self) -> None:
self.session.send(self._control_socket, msg)
else:
raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
self._emit(action="interrupt")

interrupt_kernel = run_sync(_async_interrupt_kernel)

Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"pyzmq>=23.0",
"tornado>=6.2",
"traitlets",
"jupyter_events>=0.5.0"
]

[[project.authors]]
Expand Down Expand Up @@ -59,6 +60,7 @@ test = [
"pytest-asyncio>=0.18",
"pytest-cov",
"pytest-timeout",
"jupyter_events[test]"
]
doc = [
"ipykernel",
Expand Down
3 changes: 3 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
pjoin = os.path.join


pytest_plugins = ["jupyter_events.pytest_plugin"]


# Handle resource limit
# Ensure a minimal soft limit of DEFAULT_SOFT if the current hard limit is at least that much.
if resource is not None:
Expand Down
62 changes: 52 additions & 10 deletions tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .utils import AsyncKMSubclass
from .utils import SyncKMSubclass
from jupyter_client import AsyncKernelManager
from jupyter_client import DEFAULT_EVENTS_SCHEMA_PATH
from jupyter_client import KernelManager
from jupyter_client.manager import _ShutdownStatus
from jupyter_client.manager import start_new_async_kernel
Expand Down Expand Up @@ -92,14 +93,14 @@ def start_kernel():


@pytest.fixture
def km(config):
km = KernelManager(config=config)
def km(config, jp_event_logger):
km = KernelManager(config=config, event_logger=jp_event_logger)
return km


@pytest.fixture
def km_subclass(config):
km = SyncKMSubclass(config=config)
def km_subclass(config, jp_event_logger):
km = SyncKMSubclass(config=config, event_logger=jp_event_logger)
return km


Expand All @@ -112,15 +113,36 @@ def zmq_context():
ctx.term()


@pytest.fixture
def jp_event_schemas():
return [DEFAULT_EVENTS_SCHEMA_PATH / "kernel_manager" / "v1.yaml"]


@pytest.fixture
def check_emitted_events(jp_read_emitted_events):
"""Check the given events where emitted"""

def _(*expected_list):
read_events = jp_read_emitted_events()
events = [e for e in read_events if e["caller"] == "kernel_manager"]
# Ensure that the number of read events match the expected events.
assert len(events) == len(expected_list)
# Loop through the events and make sure they are in order of expected.
for i, action in enumerate(expected_list):
assert "action" in events[i] and action == events[i]["action"]

return _


@pytest.fixture(params=[AsyncKernelManager, AsyncKMSubclass])
def async_km(request, config):
km = request.param(config=config)
def async_km(request, config, jp_event_logger):
km = request.param(config=config, event_logger=jp_event_logger)
return km


@pytest.fixture
def async_km_subclass(config):
km = AsyncKMSubclass(config=config)
def async_km_subclass(config, jp_event_logger):
km = AsyncKMSubclass(config=config, event_logger=jp_event_logger)
return km


Expand Down Expand Up @@ -193,18 +215,35 @@ async def test_async_signal_kernel_subprocesses(self, name, install, expected):


class TestKernelManager:
def test_lifecycle(self, km):
def test_lifecycle(self, km, jp_read_emitted_events, check_emitted_events):
km.start_kernel(stdout=PIPE, stderr=PIPE)
check_emitted_events("pre_start", "launch", "post_start")
kc = km.client()
assert km.is_alive()
is_done = km.ready.done()
assert is_done
km.restart_kernel(now=True)
check_emitted_events(
"restart_started",
"shutdown_started",
"interrupt",
"kill",
"cleanup_resources",
"shutdown_finished",
"pre_start",
"launch",
"post_start",
"restart_finished",
)
assert km.is_alive()
km.interrupt_kernel()
check_emitted_events("interrupt")
assert isinstance(km, KernelManager)
kc.stop_channels()
km.shutdown_kernel(now=True)
check_emitted_events(
"shutdown_started", "interrupt", "kill", "cleanup_resources", "shutdown_finished"
)
assert km.context.closed

def test_get_connect_info(self, km):
Expand Down Expand Up @@ -448,7 +487,10 @@ def execute(cmd):

@pytest.mark.asyncio
class TestAsyncKernelManager:
async def test_lifecycle(self, async_km):
async def test_lifecycle(
self,
async_km,
):
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
is_alive = await async_km.is_alive()
assert is_alive
Expand Down
10 changes: 10 additions & 0 deletions tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,13 @@ def test_connection_file_real_path():
km._launch_args = {}
cmds = km.format_kernel_cmd()
assert cmds[4] == "foobar"


def test_kernel_manager_event_logger(jp_event_handler, jp_read_emitted_events):
action = "pre_start"
km = KernelManager()
km.event_logger.register_handler(jp_event_handler)
km._emit(action=action)
output = jp_read_emitted_events()[0]
assert "kernel_id" in output and output["kernel_id"] == None
assert "action" in output and output["action"] == action