Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jupyter): add handling for comms #24250

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ ignore = "0.4"
import_map = { version = "=0.19.0", features = ["ext"] }
indexmap.workspace = true
jsonc-parser.workspace = true
jupyter_runtime = { package = "runtimelib", version = "=0.11.0" }
jupyter_runtime = { package = "runtimelib", git = "https://github.com/runtimed/runtimed.git", branch = "comm-id-public" }
lazy-regex.workspace = true
libc.workspace = true
libz-sys.workspace = true
Expand Down
113 changes: 97 additions & 16 deletions cli/js/40_jupyter.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,14 @@ function isDataFrameLike(obj) {
return false;
}
const df = obj;
return df.schema !== void 0 && typeof df.schema === "object" &&
df.head !== void 0 && typeof df.head === "function" &&
df.toRecords !== void 0 && typeof df.toRecords === "function";
return (
df.schema !== void 0 &&
typeof df.schema === "object" &&
df.head !== void 0 &&
typeof df.head === "function" &&
df.toRecords !== void 0 &&
typeof df.toRecords === "function"
);
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
}
/**
* Map Polars DataType to JSON Schema data types.
Expand Down Expand Up @@ -158,13 +163,15 @@ function extractDataFrame(df) {
});
htmlTable += "</tr></thead>";
htmlTable += "<tbody>";
df.head(10).toRecords().forEach((row) => {
htmlTable += "<tr>";
schema.fields.forEach((field) => {
htmlTable += `<td>${escapeHTML(String(row[field.name]))}</td>`;
df.head(10)
.toRecords()
.forEach((row) => {
htmlTable += "<tr>";
schema.fields.forEach((field) => {
htmlTable += `<td>${escapeHTML(String(row[field.name]))}</td>`;
});
htmlTable += "</tr>";
});
htmlTable += "</tr>";
});
htmlTable += "</tbody></table>";
return {
"application/vnd.dataresource+json": { data, schema },
Expand All @@ -179,19 +186,32 @@ function isCanvasLike(obj) {

/** Possible HTML and SVG Elements */
function isSVGElementLike(obj) {
return obj !== null && typeof obj === "object" && "outerHTML" in obj &&
typeof obj.outerHTML === "string" && obj.outerHTML.startsWith("<svg");
return (
obj !== null &&
typeof obj === "object" &&
"outerHTML" in obj &&
typeof obj.outerHTML === "string" &&
obj.outerHTML.startsWith("<svg")
);
}

function isHTMLElementLike(obj) {
return obj !== null && typeof obj === "object" && "outerHTML" in obj &&
typeof obj.outerHTML === "string";
return (
obj !== null &&
typeof obj === "object" &&
"outerHTML" in obj &&
typeof obj.outerHTML === "string"
);
}

/** Check to see if an object already contains a `Symbol.for("Jupyter.display") */
function hasDisplaySymbol(obj) {
return obj !== null && typeof obj === "object" && $display in obj &&
typeof obj[$display] === "function";
return (
obj !== null &&
typeof obj === "object" &&
$display in obj &&
typeof obj[$display] === "function"
);
}

function makeDisplayable(obj) {
Expand Down Expand Up @@ -336,8 +356,67 @@ async function formatInner(obj, raw) {

internals.jupyter = { formatInner };

/**
class CommMessage(TypedDict):
header: dict
# typically UUID, must be unique per message
msg_id: str
msg_type: str
parent_header: dict
metadata: dict
content: <custom payload>
buffers: list[memoryview]

((async) => {
const data = await Deno.jupyter.comms.recv("1234-5678");
})();
((async) => {
const data = await Deno.jupyter.comms.recv("1234-5678");
})();

const comm = await Deno.jupyter.comms.open("1234-5678");
const data = await comm.recv();

const data = await Deno.jupyter.comms.recv("1234-5678");

c = Comm("1234-5678")

c.on("update", data => {
console.log(data);
Deno.jupyter.broadcast(...);
});


{
msg_type: "comm_msg",
content: {
comm_id: "1234-5678",
data: {

}
}
}
*/

function enableJupyter() {
const { op_jupyter_broadcast } = core.ops;
const { op_jupyter_broadcast, op_jupyter_comm_recv, op_jupyter_comm_open } =
core.ops;

function commOpen(commId, targetName) {
op_jupyter_comm_open(commId, targetName);
}

async function commRecv(commId) {
const [data, buffers] = await op_jupyter_comm_recv(commId);

if (!data) {
return undefined;
}
return {
...data,
buffers,
};
}

async function broadcast(
msgType,
Expand Down Expand Up @@ -414,6 +493,8 @@ function enableJupyter() {

globalThis.Deno.jupyter = {
broadcast,
commRecv,
commOpen,
display,
format,
md,
Expand Down
100 changes: 100 additions & 0 deletions cli/ops/jupyter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,32 @@ use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;

use bytes::Bytes;

use jupyter_runtime::CommId;
use jupyter_runtime::CommInfo;
use jupyter_runtime::CommMsg;
use jupyter_runtime::JupyterMessage;
use jupyter_runtime::JupyterMessageContent;
use jupyter_runtime::KernelIoPubConnection;
use jupyter_runtime::StreamContent;

use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::parking_lot::Mutex as PlMutex;
use deno_core::serde_json;
use deno_core::OpState;
use deno_core::ToJsBuffer;
use std::collections::HashMap;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::Mutex;

deno_core::extension!(deno_jupyter,
ops = [
op_jupyter_broadcast,
op_jupyter_comm_recv,
op_jupyter_comm_open,
],
options = {
sender: mpsc::UnboundedSender<StreamContent>,
Expand All @@ -32,6 +43,95 @@ deno_core::extension!(deno_jupyter,
},
);

pub struct CommChannel {
pub target_name: String,
pub sender: broadcast::Sender<(CommMsg, Vec<Bytes>)>,
pub receiver: broadcast::Receiver<(CommMsg, Vec<Bytes>)>,
}

#[derive(Clone, Default)]
pub struct CommContainer(pub Arc<PlMutex<HashMap<String, CommChannel>>>);

impl CommContainer {
pub fn create(
&mut self,
comm_id: &str,
target_name: &str,
// For pulling off the metadata and buffers
_msg: Option<&JupyterMessage>,
) {
let mut container = self.0.lock();

// We will not replace existing comms
if container.contains_key(comm_id) {
return;
}

let (tx, rx) = broadcast::channel(16);
let comm_channel = CommChannel {
target_name: target_name.to_string(),
sender: tx,
receiver: rx,
};

container.insert(comm_id.to_string(), comm_channel);
}

pub fn comms(&self) -> HashMap<CommId, CommInfo> {
let container = self.0.lock();

container
.iter()
.map(|(comm_id, comm)| {
(
CommId(comm_id.to_string()),
CommInfo {
target_name: comm.target_name.clone(),
},
)
})
.collect()
}
}

#[op2(fast)]
pub fn op_jupyter_comm_open(
state: &mut OpState,
#[string] comm_id: String,
#[string] target_name: String,
) {
let container = state.borrow_mut::<CommContainer>();
container.create(&comm_id, &target_name, None);
}

#[op2(async)]
#[serde]
pub async fn op_jupyter_comm_recv(
state: Rc<RefCell<OpState>>,
#[string] comm_id: String,
) -> (serde_json::Value, Vec<ToJsBuffer>) {
let mut receiver = {
let state = state.borrow();
let container = state.borrow::<CommContainer>();
let container = container.0.lock();
let maybe_comm = container.get(&comm_id);
let Some(comm) = maybe_comm else {
return (serde_json::Value::Null, vec![]);
};
comm.receiver.resubscribe()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@manzt @rgbkrk if we go with EventListener approach for a comm then we can expect more than one consumer of the "comm". That means we need to use tokio::sync::broadcast channel which is buffered. We can go with a rather big number for the buffer like 1024 or 65536. If we expect only a single consumer then we could use an mpsc channel that is unbounded. Which one should we go with?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For widgets, I think it will basically only be one comm listener. There is new comm created for each new widget instance.

};

let (msg, buffers) = receiver.recv().await.unwrap();

(
serde_json::to_value(msg).unwrap(),
buffers
.into_iter()
.map(|b| ToJsBuffer::from(b.to_vec()))
.collect(),
)
}

#[op2(async)]
pub async fn op_jupyter_broadcast(
state: Rc<RefCell<OpState>>,
Expand Down
4 changes: 1 addition & 3 deletions cli/tools/jupyter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ pub async fn kernel(
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self
.0
.send(StreamContent::stdout(
String::from_utf8_lossy(buf).into_owned(),
))
.send(StreamContent::stdout(&String::from_utf8_lossy(buf)))
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved
.ok();
Ok(buf.len())
}
Expand Down
Loading
Loading