Skip to content

Commit

Permalink
Updating kernelMonitor to cull kernels (jupyter-server#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshay Chitneni authored and GitHub Enterprise committed May 18, 2022
1 parent 040f34d commit 3560d6a
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 6 deletions.
2 changes: 1 addition & 1 deletion data_studio_jupyter_extensions/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"default_provisioner_name": "notebook-service-provisioner"
},
"KernelManager": {
"restarter_class": "data_studio_jupyter_extensions.configurables.kernel_restarter.DataStudioKernelRestarter"
"restarter_class": "data_studio_jupyter_extensions.configurables.kernel_monitor.DataStudioKernelMonitor"
},
"KernelRestarter": {"time_to_dead": 1},
"Session": {"key": b"notebooks"},
Expand Down
25 changes: 25 additions & 0 deletions data_studio_jupyter_extensions/configurables/kernel_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from jupyter_client.channels import HBChannel
from jupyter_client.ioloop.manager import AsyncIOLoopKernelManager
from traitlets import Any
from traitlets import Instance
from traitlets import Int
from traitlets import TraitError
from traitlets import Unicode
from traitlets import validate

from data_studio_jupyter_extensions import constants
from data_studio_jupyter_extensions.configurables.connection import (
Expand All @@ -19,6 +23,27 @@ class DataStudioKernelManager(
config=True
)

execution_state = Unicode(
allow_none=True,
help=f"The kernel's execution state, e.g. {constants.EXECUTION_STATES}",
)

reason = Unicode(default_value="", help="The kernel's execution state reason")

last_activity = Any(allow_none=True, help="The kernel's last activity timestamp")

kernel_connections = Int(
default_value=0, help="The kernel's active websocket connection count"
)

@validate("execution_state")
def _valid_execution_state(self, proposal):
if proposal["value"] in constants.EXECUTION_STATES:
return proposal["value"]
raise TraitError(
f"Invalid execution state, {proposal['value']}. Valid states: {constants.EXECUTION_STATES}"
)

@property
def process_id(self):
if self.provisioner:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,51 @@
import time
from datetime import timedelta

from jupyter_client.ioloop.restarter import AsyncIOLoopKernelRestarter
from notebook._tz import utcnow
from traitlets import Bool
from traitlets import Float
from traitlets import Int

from data_studio_jupyter_extensions import constants
from data_studio_jupyter_extensions.configurables.kernel_status import KernelStatusMixin
from data_studio_jupyter_extensions.traits import BoolFromEnv
from data_studio_jupyter_extensions.traits import IntFromEnv


class DataStudioKernelRestarter(AsyncIOLoopKernelRestarter, KernelStatusMixin):
"""Replaces the default Restarter with an object that *does not* restart kernels, but
just polls them.
"""
class DataStudioKernelMonitor(AsyncIOLoopKernelRestarter, KernelStatusMixin):
"""A Kernel Monitor that handles everything polling."""

_attempt_count = Int(0)
_connected_once = Bool(False)
_start_time = Float()

cull_idle_timeout = IntFromEnv(
name=constants.DS_CULL_IDLE_TIMEOUT,
default_value=0,
help="""Timeout (in seconds) after which a kernel is considered idle and ready to be culled.
Values of 0 or lower disable culling. Very short timeouts may result in kernels being culled
for users with poor network connections.""",
).tag(config=True)
cull_interval = IntFromEnv(
name=constants.DS_CULL_INTERVAL,
default_value=300,
help="""The interval (in seconds) on which to check for idle kernels exceeding the
cull timeout value.""",
).tag(config=True)
cull_connected = BoolFromEnv(
name=constants.DS_CULL_CONNECTED,
default_value=False,
help="""Whether to consider culling kernels which have one or more connections.
Only effective if cull_idle_timeout > 0.""",
).tag(config=True)
cull_busy = BoolFromEnv(
name=constants.DS_CULL_BUSY,
default_value=False,
help="""Whether to consider culling kernels which are busy.
Only effective if cull_idle_timeout > 0.""",
).tag(config=True)

@property
def kernel_id(self):
return self.kernel_manager.kernel_id
Expand Down Expand Up @@ -61,6 +89,10 @@ async def poll(self):
"""Poll for Kernel liveliness"""
km = self.kernel_manager

if self.is_culling_enabled:
await self.cull_if_idle()
return

# If the kernel is communicating, we're good here.
if km.is_communicating():
if not self._connected_once or self._attempt_count > 0:
Expand Down Expand Up @@ -107,3 +139,33 @@ async def poll(self):
self._disconnected_state()
await self.kernel_manager.shutdown_kernel()
self._attempt_count = 0

def is_culling_enabled(self) -> bool:
"""check if culling is enabled for this kernel.
TODO: fetch this info from remote metastore based on kernelSpecId
"""
return self.cull_idle_timeout > 0

async def cull_if_idle(self):
"""Cull kernel if inactive."""
km = self.kernel_manager
dt_now = utcnow()
dt_idle = dt_now - km.last_activity
# Compute idle properties
is_idle_time = dt_idle > timedelta(seconds=self.cull_idle_timeout)
is_idle_execute = self.cull_busy or (
km.execution_state != "busy" and km.execution_state != "starting"
)
connections = km.kernel_connections
is_idle_connected = self.cull_connected or not connections
# Cull the kernel if all three criteria are met
if is_idle_time and is_idle_execute and is_idle_connected:
idle_duration = int(dt_idle.total_seconds())
msg = (
f"Culling {km.execution_state} kernel (kernel_id={km.kernel_id} process_id={km.process_id}) "
f"with {connections} connections due to {idle_duration} seconds of inactivity."
)
# self._emit_console("Culling", msg) ## TODO: relay culled msg to user
self.log.info(msg)
self.stop()
await km.shutdown_kernel() ## TODO: clear sessionInfo for this kernel
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,13 @@ def kernel_model(self, kernel_id):
model = super().kernel_model(kernel_id)
model["process_id"] = self._kernels[kernel_id].process_id
return model

def notify_connect(self, kernel_id):
"""Notice a new connection to a kernel"""
super().notify_connect(kernel_id)
self._kernels[kernel_id].kernel_connections += 1

def notify_disconnect(self, kernel_id):
"""Notice a disconnection from a kernel"""
super().notify_disconnect(kernel_id)
self._kernels[kernel_id].kernel_connections -= 1
7 changes: 6 additions & 1 deletion data_studio_jupyter_extensions/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
SHARED_ENCRYPT_KEY = "shared_encrypt_key"
SHARED_SEED = "shared_seed"
DS_KERNEL_ID = "kernel_id"

DS_CULL_IDLE_TIMEOUT = "DS_CULL_IDLE_TIMEOUT"
DS_CULL_INTERVAL = "DS_CULL_INTERVAL"
DS_CULL_CONNECTED = "DS_CULL_CONNECTED"
DS_CULL_BUSY = "DS_CULL_BUSY"

_states = namedtuple(
"KERNEL_STATES",
Expand Down Expand Up @@ -54,3 +57,5 @@
"disconnected",
"reconnecting",
)
# states are listed here: https://jupyter-client.readthedocs.io/en/stable/messaging.html?highlight=execution_state#kernel-status
EXECUTION_STATES = {"busy", "idle", "starting"}

0 comments on commit 3560d6a

Please sign in to comment.