-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Basic Event Bus #820
Merged
Merged
Basic Event Bus #820
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
13922f6
basic event bus
Zsailer accadd0
remove unnecessary super call
Zsailer 2696bd2
add auth to the websocket
Zsailer 411a7cf
Clean up the eventlog singleton on server shutdown
Zsailer 8de4704
get tests working
Zsailer 4e194f6
allow empty event bus
Zsailer 71747af
ignore flake 8 error in tests
Zsailer dd0d807
Update jupyter_server/services/events/handlers.py
Zsailer cadc063
review comments
Zsailer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
"""An EventBus for use in the Jupyter server. | ||
|
||
.. versionadded:: 2.0 | ||
""" | ||
# Copyright (c) Jupyter Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||
from jupyter_telemetry.eventlog import EventLog | ||
from traitlets.config import SingletonConfigurable | ||
|
||
|
||
class EventBus(EventLog, SingletonConfigurable): | ||
"""A singleton eventlog that behaves as an event | ||
bus for emitting Jupyter Server (and extension) | ||
event data. | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
"""A Websocket Handler for emitting Jupyter server events. | ||
|
||
.. versionadded:: 2.0 | ||
""" | ||
import logging | ||
|
||
from jupyter_telemetry.eventlog import _skip_message | ||
from pythonjsonlogger import jsonlogger | ||
from tornado import web, websocket | ||
|
||
from jupyter_server.base.handlers import JupyterHandler | ||
|
||
AUTH_RESOURCE = "events" | ||
|
||
|
||
class WebSocketLoggingHandler(logging.Handler): | ||
"""Python logging handler that routes records to a Tornado websocket.""" | ||
|
||
def __init__(self, websocket, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self.websocket = websocket | ||
|
||
def emit(self, record): | ||
"""Emit the message across the websocket""" | ||
self.websocket.write_message(record.msg) | ||
|
||
|
||
class SubscribeWebsocket( | ||
JupyterHandler, | ||
websocket.WebSocketHandler, | ||
): | ||
"""Websocket handler for subscribing to events""" | ||
|
||
auth_resource = AUTH_RESOURCE | ||
|
||
def pre_get(self): | ||
"""Handles authentication/authorization when | ||
attempting to subscribe to events emitted by | ||
Jupyter Server's eventbus. | ||
""" | ||
# authenticate the request before opening the websocket | ||
user = self.current_user | ||
if user is None: | ||
self.log.warning("Couldn't authenticate WebSocket connection") | ||
raise web.HTTPError(403) | ||
|
||
# authorize the user. | ||
if not self.authorizer.is_authorized(self, user, "execute", "events"): | ||
raise web.HTTPError(403) | ||
|
||
async def get(self, *args, **kwargs): | ||
self.pre_get() | ||
res = super().get(*args, **kwargs) | ||
await res | ||
|
||
@property | ||
def event_bus(self): | ||
"""Jupyter Server's event bus that emits structured event data.""" | ||
return self.settings["event_bus"] | ||
|
||
def open(self): | ||
"""Routes events that are emitted by Jupyter Server's | ||
EventBus to a WebSocket client in the browser. | ||
""" | ||
self.logging_handler = WebSocketLoggingHandler(self) | ||
# Add a JSON formatter to the handler. | ||
formatter = jsonlogger.JsonFormatter(json_serializer=_skip_message) | ||
self.logging_handler.setFormatter(formatter) | ||
Zsailer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# To do: add an eventlog.add_handler method to jupyter_telemetry. | ||
self.event_bus.log.addHandler(self.logging_handler) | ||
self.event_bus.handlers.append(self.logging_handler) | ||
Zsailer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def on_close(self): | ||
self.event_bus.log.removeHandler(self.logging_handler) | ||
Zsailer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.event_bus.handlers.remove(self.logging_handler) | ||
|
||
|
||
default_handlers = [ | ||
(r"/api/events/subscribe", SubscribeWebsocket), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
$id: event.mock.jupyter.com/message | ||
version: 1 | ||
title: Message | ||
description: | | ||
Emit a message | ||
type: object | ||
properties: | ||
event_message: | ||
title: Event Messages | ||
description: | | ||
Mock event message to read. | ||
required: | ||
- event_message |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from .mock_extension import _load_jupyter_server_extension # noqa: F401 | ||
|
||
# Function that makes these extensions discoverable | ||
# by the test functions. | ||
|
||
|
||
def _jupyter_server_extension_points(): | ||
return [ | ||
{"module": "tests.services.events.mockextension"}, | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import pathlib | ||
|
||
from jupyter_server.base.handlers import JupyterHandler | ||
from jupyter_server.utils import url_path_join | ||
|
||
|
||
class MockEventHandler(JupyterHandler): | ||
def get(self): | ||
# Emit an event. | ||
self.event_bus.record_event( | ||
schema_name="event.mockextension.jupyter.com/message", | ||
version=1, | ||
event={"event_message": "Hello world, from mock extension!"}, | ||
) | ||
|
||
|
||
def _load_jupyter_server_extension(serverapp): | ||
# Register a schema with the EventBus | ||
schema_file = pathlib.Path(__file__).parent / "mock_extension_event.yaml" | ||
serverapp.event_bus.register_schema_file(schema_file) | ||
serverapp.web_app.add_handlers( | ||
".*$", [(url_path_join(serverapp.base_url, "/mock/event"), MockEventHandler)] | ||
) |
13 changes: 13 additions & 0 deletions
13
tests/services/events/mockextension/mock_extension_event.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
$id: event.mockextension.jupyter.com/message | ||
version: 1 | ||
title: Message | ||
description: | | ||
Emit a message | ||
type: object | ||
properties: | ||
event_message: | ||
title: Event Message | ||
description: | | ||
Mock event message to read. | ||
required: | ||
- event_message |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import json | ||
import pathlib | ||
|
||
import pytest | ||
|
||
|
||
@pytest.fixture | ||
def event_bus(jp_serverapp): | ||
event_bus = jp_serverapp.event_bus | ||
# Register the event schema defined in this directory. | ||
schema_file = pathlib.Path(__file__).parent / "mock_event.yaml" | ||
event_bus.register_schema_file(schema_file) | ||
# | ||
event_bus.allowed_schemas = ["event.mock.jupyter.com/message"] | ||
return event_bus | ||
|
||
|
||
async def test_subscribe_websocket(jp_ws_fetch, event_bus): | ||
# Open a websocket connection. | ||
ws = await jp_ws_fetch("/api/events/subscribe") | ||
|
||
event_bus.record_event( | ||
schema_name="event.mock.jupyter.com/message", | ||
version=1, | ||
event={"event_message": "Hello, world!"}, | ||
) | ||
message = await ws.read_message() | ||
event_data = json.loads(message) | ||
# Close websocket | ||
ws.close() | ||
|
||
assert event_data.get("event_message") == "Hello, world!" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import json | ||
|
||
import pytest | ||
|
||
|
||
@pytest.fixture | ||
def jp_server_config(): | ||
config = { | ||
"ServerApp": { | ||
"jpserver_extensions": {"tests.services.events.mockextension": True}, | ||
}, | ||
"EventBus": {"allowed_schemas": ["event.mockextension.jupyter.com/message"]}, | ||
} | ||
return config | ||
|
||
|
||
async def test_subscribe_websocket(jp_ws_fetch, jp_fetch): | ||
# Open an event listener websocket | ||
ws = await jp_ws_fetch("/api/events/subscribe") | ||
|
||
# Hit the extension endpoint that emits an event | ||
await jp_fetch("/mock/event") | ||
|
||
# Check the event listener for a message | ||
message = await ws.read_message() | ||
event_data = json.loads(message) | ||
|
||
# Close websocket | ||
ws.close() | ||
|
||
# Verify that an event message was received. | ||
assert event_data.get("event_message") == "Hello world, from mock extension!" |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Event bus is instantiated down on line 1925. What is the purpose of this statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is merely defining the trait type and validation that traitlets should do once the trait is created on line 1925. This doesn't do the actual instantiation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining.