Skip to content

Commit

Permalink
Initial work towards using jupyter_kernel_mgmt in jupyter server
Browse files Browse the repository at this point in the history
Cherry-pick Notebook PR jupyter/notebook#4837
  • Loading branch information
takluyver authored and kevin-bates committed Dec 6, 2019
1 parent 5c873e3 commit 494a30e
Show file tree
Hide file tree
Showing 15 changed files with 614 additions and 622 deletions.
10 changes: 5 additions & 5 deletions jupyter_server/base/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,11 @@ def contents_js_source(self):
#---------------------------------------------------------------
# Manager objects
#---------------------------------------------------------------


@property
def kernel_finder(self):
return self.settings['kernel_finder']

@property
def kernel_manager(self):
return self.settings['kernel_manager']
Expand All @@ -261,10 +265,6 @@ def session_manager(self):
@property
def terminal_manager(self):
return self.settings['terminal_manager']

@property
def kernel_spec_manager(self):
return self.settings['kernel_spec_manager']

@property
def config_manager(self):
Expand Down
174 changes: 3 additions & 171 deletions jupyter_server/base/zmqhandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,81 +4,11 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import json
import struct
import sys
import tornado

from urllib.parse import urlparse
from tornado import gen, ioloop, web
from tornado.websocket import WebSocketHandler

from jupyter_client.session import Session
from jupyter_client.jsonutil import date_default, extract_dates
from ipython_genutils.py3compat import cast_unicode

from .handlers import JupyterHandler
from jupyter_server.utils import maybe_future


def serialize_binary_message(msg):
"""serialize a message as a binary blob
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
The message serialized to bytes.
"""
# don't modify msg or buffer list in-place
msg = msg.copy()
buffers = list(msg.pop('buffers'))
if sys.version_info < (3, 4):
buffers = [x.tobytes() for x in buffers]
bmsg = json.dumps(msg, default=date_default).encode('utf8')
buffers.insert(0, bmsg)
nbufs = len(buffers)
offsets = [4 * (nbufs + 1)]
for buf in buffers[:-1]:
offsets.append(offsets[-1] + len(buf))
offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets)
buffers.insert(0, offsets_buf)
return b''.join(buffers)


def deserialize_binary_message(bmsg):
"""deserialize a message from a binary blog
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
message dictionary
"""
nbufs = struct.unpack('!i', bmsg[:4])[0]
offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)]))
offsets.append(None)
bufs = []
for start, stop in zip(offsets[:-1], offsets[1:]):
bufs.append(bmsg[start:stop])
msg = json.loads(bufs[0].decode('utf8'))
msg['header'] = extract_dates(msg['header'])
msg['parent_header'] = extract_dates(msg['parent_header'])
msg['buffers'] = bufs[1:]
return msg
from tornado import ioloop
from tornado.iostream import StreamClosedError
from tornado.websocket import WebSocketHandler, WebSocketClosedError

# ping interval for keeping websockets alive (30 seconds)
WS_PING_INTERVAL = 30000
Expand Down Expand Up @@ -188,101 +118,3 @@ def send_ping(self):
def on_pong(self, data):
self.last_pong = ioloop.IOLoop.current().time()


class ZMQStreamHandler(WebSocketMixin, WebSocketHandler):

if tornado.version_info < (4,1):
"""Backport send_error from tornado 4.1 to 4.0"""
def send_error(self, *args, **kwargs):
if self.stream is None:
super(WebSocketHandler, self).send_error(*args, **kwargs)
else:
# If we get an uncaught exception during the handshake,
# we have no choice but to abruptly close the connection.
# TODO: for uncaught exceptions after the handshake,
# we can close the connection more gracefully.
self.stream.close()


def _reserialize_reply(self, msg_or_list, channel=None):
"""Reserialize a reply message using JSON.
msg_or_list can be an already-deserialized msg dict or the zmq buffer list.
If it is the zmq list, it will be deserialized with self.session.
This takes the msg list from the ZMQ socket and serializes the result for the websocket.
This method should be used by self._on_zmq_reply to build messages that can
be sent back to the browser.
"""
if isinstance(msg_or_list, dict):
# already unpacked
msg = msg_or_list
else:
idents, msg_list = self.session.feed_identities(msg_or_list)
msg = self.session.deserialize(msg_list)
if channel:
msg['channel'] = channel
if msg['buffers']:
buf = serialize_binary_message(msg)
return buf
else:
smsg = json.dumps(msg, default=date_default)
return cast_unicode(smsg)

def _on_zmq_reply(self, stream, msg_list):
# Sometimes this gets triggered when the on_close method is scheduled in the
# eventloop but hasn't been called.
if self.ws_connection is None or stream.closed():
self.log.warning("zmq message arrived on closed channel")
self.close()
return
channel = getattr(stream, 'channel', None)
try:
msg = self._reserialize_reply(msg_list, channel=channel)
except Exception:
self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
else:
self.write_message(msg, binary=isinstance(msg, bytes))


class AuthenticatedZMQStreamHandler(ZMQStreamHandler, JupyterHandler):

def set_default_headers(self):
"""Undo the set_default_headers in JupyterHandler
which doesn't make sense for websockets
"""
pass

def pre_get(self):
"""Run before finishing the GET request
Extend this method to add logic that should fire before
the websocket finishes completing.
"""
# authenticate the request before opening the websocket
if self.get_current_user() is None:
self.log.warning("Couldn't authenticate WebSocket connection")
raise web.HTTPError(403)

if self.get_argument('session_id', False):
self.session.session = cast_unicode(self.get_argument('session_id'))
else:
self.log.warning("No session ID specified")

@gen.coroutine
def get(self, *args, **kwargs):
# pre_get can be a coroutine in subclasses
# assign and yield in two step to avoid tornado 3 issues
res = self.pre_get()
yield maybe_future(res)
res = super(AuthenticatedZMQStreamHandler, self).get(*args, **kwargs)
yield maybe_future(res)

def initialize(self):
self.log.debug("Initializing websocket connection %s", self.request.path)
self.session = Session(config=self.config)

def get_compression_options(self):
return self.settings.get('websocket_compression_options', None)
20 changes: 12 additions & 8 deletions jupyter_server/kernelspecs/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@ def initialize(self):

@web.authenticated
def get(self, kernel_name, path, include_body=True):
ksm = self.kernel_spec_manager
try:
self.root = ksm.get_kernel_spec(kernel_name).resource_dir
except KeyError:
raise web.HTTPError(404, u'Kernel spec %s not found' % kernel_name)
self.log.debug("Serving kernel resource from: %s", self.root)
return web.StaticFileHandler.get(self, path, include_body=include_body)
kf = self.kernel_finder
# TODO: Do we actually want all kernel type names to be case-insensitive?
kernel_name = kernel_name.lower()
for name, info in kf.find_kernels():
if name == kernel_name:
self.root = info['resource_dir']
self.log.debug("Serving kernel resource from: %s", self.root)
return web.StaticFileHandler.get(self, path,
include_body=include_body)

raise web.HTTPError(404, u'Kernel spec %s not found' % kernel_name)

@web.authenticated
def head(self, kernel_name, path):
return self.get(kernel_name, path, include_body=False)


default_handlers = [
(r"/kernelspecs/%s/(?P<path>.*)" % kernel_name_regex, KernelSpecResourceHandler),
]

55 changes: 27 additions & 28 deletions jupyter_server/serverapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@
)
from jupyter_core.paths import jupyter_config_path
from jupyter_client import KernelManager
from jupyter_client.kernelspec import KernelSpecManager, NoSuchKernel, NATIVE_KERNEL_NAME
from jupyter_client.session import Session
from jupyter_kernel_mgmt.discovery import KernelFinder
from nbformat.sign import NotebookNotary
from traitlets import (
Any, Dict, Unicode, Integer, List, Bool, Bytes, Instance,
Expand Down Expand Up @@ -159,21 +159,21 @@ def load_handlers(name):
class ServerWebApplication(web.Application):

def __init__(self, jupyter_app, default_services, kernel_manager, contents_manager,
session_manager, kernel_spec_manager,
session_manager, kernel_finder,
config_manager, extra_services, log,
base_url, default_url, settings_overrides, jinja_env_options):

settings = self.init_settings(
jupyter_app, kernel_manager, contents_manager,
session_manager, kernel_spec_manager, config_manager,
session_manager, kernel_finder, config_manager,
extra_services, log, base_url,
default_url, settings_overrides, jinja_env_options)
handlers = self.init_handlers(default_services, settings)

super(ServerWebApplication, self).__init__(handlers, **settings)

def init_settings(self, jupyter_app, kernel_manager, contents_manager,
session_manager, kernel_spec_manager,
session_manager, kernel_finder,
config_manager, extra_services,
log, base_url, default_url, settings_overrides,
jinja_env_options=None):
Expand Down Expand Up @@ -248,10 +248,10 @@ def init_settings(self, jupyter_app, kernel_manager, contents_manager,
local_hostnames=jupyter_app.local_hostnames,

# managers
kernel_finder=kernel_finder,
kernel_manager=kernel_manager,
contents_manager=contents_manager,
session_manager=session_manager,
kernel_spec_manager=kernel_spec_manager,
config_manager=config_manager,

# handlers
Expand Down Expand Up @@ -555,7 +555,7 @@ class ServerApp(JupyterApp):
flags = flags

classes = [
KernelManager, Session, MappingKernelManager, KernelSpecManager,
KernelManager, Session, MappingKernelManager,
ContentsManager, FileContentsManager, NotebookNotary,
GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, GatewayClient,
]
Expand Down Expand Up @@ -1033,6 +1033,12 @@ def template_file_path(self):
(shutdown the Jupyter server)."""
)

kernel_providers = List(config=True,
help=_('A list of kernel provider instances. '
'If not specified, all installed kernel providers are found '
'using entry points.')
)

contents_manager_class = Type(
default_value=LargeFileManager,
klass=ContentsManager,
Expand All @@ -1058,20 +1064,6 @@ def template_file_path(self):
help=_('The config manager class to use')
)

kernel_spec_manager = Instance(KernelSpecManager, allow_none=True)

kernel_spec_manager_class = Type(
default_value=KernelSpecManager,
config=True,
help="""
The kernel spec manager class to use. Should be a subclass
of `jupyter_client.kernelspec.KernelSpecManager`.
The Api of KernelSpecManager is provisional and might change
without warning between this version of Jupyter and the next stable one.
"""
)

login_handler_class = Type(
default_value=LoginHandler,
klass=web.RequestHandler,
Expand Down Expand Up @@ -1104,7 +1096,7 @@ def _default_info_file(self):
def _default_browser_open_file(self):
basename = "jpserver-%s-open.html" % os.getpid()
return os.path.join(self.runtime_dir, basename)

pylab = Unicode('disabled', config=True,
help=_("""
DISABLED: use %pylab or %matplotlib in the notebook to enable matplotlib.
Expand Down Expand Up @@ -1237,16 +1229,23 @@ def init_configurables(self):
if self.gateway_config.gateway_enabled:
self.kernel_manager_class = 'jupyter_server.gateway.managers.GatewayKernelManager'
self.session_manager_class = 'jupyter_server.gateway.managers.GatewaySessionManager'
self.kernel_spec_manager_class = 'jupyter_server.gateway.managers.GatewayKernelSpecManager'
# FIXME - no more kernel-spec-manager!
# self.kernel_spec_manager_class = 'jupyter_server.gateway.managers.GatewayKernelSpecManager'
#
# self.kernel_spec_manager = self.kernel_spec_manager_class(
# parent=self,
# )

if self.kernel_providers:
self.kernel_finder = KernelFinder(self.kernel_providers)
else:
self.kernel_finder = KernelFinder.from_entrypoints()

self.kernel_spec_manager = self.kernel_spec_manager_class(
parent=self,
)
self.kernel_manager = self.kernel_manager_class(
parent=self,
log=self.log,
connection_dir=self.runtime_dir,
kernel_spec_manager=self.kernel_spec_manager,
kernel_finder=self.kernel_finder,
)
self.contents_manager = self.contents_manager_class(
parent=self,
Expand Down Expand Up @@ -1301,7 +1300,7 @@ def init_webapp(self):

self.web_app = ServerWebApplication(
self, self.default_services, self.kernel_manager, self.contents_manager,
self.session_manager, self.kernel_spec_manager,
self.session_manager, self.kernel_finder,
self.config_manager, self.extra_services,
self.log, self.base_url, self.default_url, self.tornado_settings,
self.jinja_environment_options,
Expand Down Expand Up @@ -1490,7 +1489,7 @@ def init_server_extensions(self):
Import the module, then call the load_jupyter_server_extension function,
if one exists.
The extension API is experimental, and may change in future releases.
"""
# Initialize extensions
Expand Down
Loading

0 comments on commit 494a30e

Please sign in to comment.