Skip to content

Commit

Permalink
MVP Support for inline-rendering of Rerun within jupyter notebooks (#…
Browse files Browse the repository at this point in the history
…1798) (#1834) (#1844)

* Introduce the ability to push an rrd binary via iframe.contentWindow.postMessage
* New API to output the current buffered messages as a cell in jupyter
* Example notebook with the cube demo
* Track that we need to send another recording msg after draining the backlog
* Dynamically resolve the app location based on git commit. Allow override to use self-hosted assets
* Add some crude timeout logic in case the iframe fails to load
* Don't persist app state in notebooks
* Introduce new MemoryRecording for use with Jupyter notebooks (#1834)
* Refactor the relationship between the assorted web / websocket servers (#1844)
* Rename RemoteViewerServer to WebViewerSink
* CLI arguments for specifying ports
* Proper typing for the ports
  • Loading branch information
jleibs authored Apr 14, 2023
1 parent d33dab6 commit 3cd534f
Show file tree
Hide file tree
Showing 32 changed files with 1,104 additions and 130 deletions.
8 changes: 7 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<W: std::io::Write> Encoder<W> {

pub fn encode<'a>(
messages: impl Iterator<Item = &'a LogMsg>,
write: impl std::io::Write,
write: &mut impl std::io::Write,
) -> Result<(), EncodeError> {
let mut encoder = Encoder::new(write)?;
for message in messages {
Expand Down
54 changes: 49 additions & 5 deletions crates/re_log_encoding/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use re_log_types::LogMsg;

pub fn stream_rrd_from_http_to_channel(url: String) -> re_smart_channel::Receiver<LogMsg> {
Expand All @@ -6,14 +8,14 @@ pub fn stream_rrd_from_http_to_channel(url: String) -> re_smart_channel::Receive
});
stream_rrd_from_http(
url,
Box::new(move |msg| {
Arc::new(move |msg| {
tx.send(msg).ok();
}),
);
rx
}

pub fn stream_rrd_from_http(url: String, on_msg: Box<dyn Fn(LogMsg) + Send>) {
pub fn stream_rrd_from_http(url: String, on_msg: Arc<dyn Fn(LogMsg) + Send + Sync>) {
re_log::debug!("Downloading .rrd file from {url:?}…");

// TODO(emilk): stream the http request, progressively decoding the .rrd file.
Expand All @@ -36,9 +38,50 @@ pub fn stream_rrd_from_http(url: String, on_msg: Box<dyn Fn(LogMsg) + Send>) {
});
}

#[cfg(target_arch = "wasm32")]
mod web_event_listener {
use js_sys::Uint8Array;
use re_log_types::LogMsg;
use std::sync::Arc;
use wasm_bindgen::{closure::Closure, JsCast, JsValue};
use web_sys::MessageEvent;

/// Install an event-listener on `window` which will decode the incoming event as an rrd
///
/// From javascript you can send an rrd using:
/// ``` ignore
/// var rrd = new Uint8Array(...); // Get an RRD from somewhere
/// window.postMessage(rrd, "*")
/// ```
pub fn stream_rrd_from_event_listener(on_msg: Arc<dyn Fn(LogMsg) + Send>) {
let window = web_sys::window().expect("no global `window` exists");
let closure =
Closure::wrap(Box::new(
move |event: JsValue| match event.dyn_into::<MessageEvent>() {
Ok(message_event) => {
let uint8_array = Uint8Array::new(&message_event.data());
let result: Vec<u8> = uint8_array.to_vec();

crate::stream_rrd_from_http::decode_rrd(result, on_msg.clone());
}
Err(js_val) => {
re_log::error!("Incoming event was not a MessageEvent. {:?}", js_val);
}
},
) as Box<dyn FnMut(_)>);
window
.add_event_listener_with_callback("message", closure.as_ref().unchecked_ref())
.unwrap();
closure.forget();
}
}

#[cfg(target_arch = "wasm32")]
pub use web_event_listener::stream_rrd_from_event_listener;

#[cfg(not(target_arch = "wasm32"))]
#[allow(clippy::needless_pass_by_value)] // must match wasm version
fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Arc<dyn Fn(LogMsg) + Send>) {
match crate::decoder::Decoder::new(rrd_bytes.as_slice()) {
Ok(decoder) => {
for msg in decoder {
Expand All @@ -61,15 +104,16 @@ fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
#[cfg(target_arch = "wasm32")]
mod web_decode {
use re_log_types::LogMsg;
use std::sync::Arc;

pub fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
pub fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Arc<dyn Fn(LogMsg) + Send>) {
wasm_bindgen_futures::spawn_local(decode_rrd_async(rrd_bytes, on_msg));
}

/// Decodes the file in chunks, with an yield between each chunk.
///
/// This is cooperative multi-tasking.
async fn decode_rrd_async(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
async fn decode_rrd_async(rrd_bytes: Vec<u8>, on_msg: Arc<dyn Fn(LogMsg) + Send>) {
let mut last_yield = instant::Instant::now();

match crate::decoder::Decoder::new(rrd_bytes.as_slice()) {
Expand Down
4 changes: 3 additions & 1 deletion crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ pub mod demo_util;
/// This is how you select whether the log stream ends up
/// sent over TCP, written to file, etc.
pub mod sink {
pub use crate::log_sink::{disabled, BufferedSink, LogSink, TcpSink};
pub use crate::log_sink::{
disabled, BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink,
};

#[cfg(not(target_arch = "wasm32"))]
pub use re_log_encoding::{FileSink, FileSinkError};
Expand Down
47 changes: 47 additions & 0 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,53 @@ impl LogSink for BufferedSink {
}
}

/// Store log messages directly in memory
///
/// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating
/// a new sink the logged messages stay with the `MemorySink` (`drain_backlog` does nothing).
///
/// Additionally the raw storage can be accessed and used to create an in-memory RRD.
/// This is useful for things like the inline rrd-viewer in Jupyter notebooks.
#[derive(Default)]
pub struct MemorySink(MemorySinkStorage);

impl MemorySink {
/// Access the raw `MemorySinkStorage`
pub fn buffer(&self) -> MemorySinkStorage {
self.0.clone()
}
}

impl LogSink for MemorySink {
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
}

fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
}
}

/// The storage used by [`MemorySink`]
#[derive(Default, Clone)]
pub struct MemorySinkStorage(std::sync::Arc<parking_lot::Mutex<Vec<LogMsg>>>);

///
impl MemorySinkStorage {
/// Lock the contained buffer
fn lock(&self) -> parking_lot::MutexGuard<'_, Vec<LogMsg>> {
self.0.lock()
}

/// Convert the stored messages into an in-memory Rerun log file
pub fn rrd_as_bytes(&self) -> Result<Vec<u8>, re_log_encoding::encoder::EncodeError> {
let messages = self.lock();
let mut buffer = std::io::Cursor::new(Vec::new());
re_log_encoding::encoder::encode(messages.iter(), &mut buffer)?;
Ok(buffer.into_inner())
}
}

// ----------------------------------------------------------------------------

/// Stream log messages to a Rerun TCP server.
Expand Down
7 changes: 6 additions & 1 deletion crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ pub enum Source {
/// Streaming an `.rrd` file over http.
RrdHttpStream { url: String },

/// Loading an `.rrd` file from a `postMessage` js event
///
/// Only applicable to web browser iframes
RrdWebEventListener,

/// The source is the logging sdk directly, same process.
Sdk,

Expand All @@ -36,7 +41,7 @@ pub enum Source {
impl Source {
pub fn is_network(&self) -> bool {
match self {
Self::File { .. } | Self::Sdk => false,
Self::File { .. } | Self::Sdk | Self::RrdWebEventListener => false,
Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ winapi = "0.3.9"
[target.'cfg(target_arch = "wasm32")'.dependencies]
console_error_panic_hook = "0.1.6"
wasm-bindgen-futures = "0.4"

web-sys = { version = "0.3.52", features = ["Window"] }

[build-dependencies]
re_build_build_info.workspace = true
9 changes: 6 additions & 3 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,9 @@ fn wait_screen_ui(ui: &mut egui::Ui, rx: &Receiver<LogMsg>) {
re_smart_channel::Source::RrdHttpStream { url } => {
ui.strong(format!("Loading {url}…"));
}
re_smart_channel::Source::RrdWebEventListener => {
ready_and_waiting(ui, "Waiting for logging data…");
}
re_smart_channel::Source::Sdk => {
ready_and_waiting(ui, "Waiting for logging data from SDK");
}
Expand Down Expand Up @@ -1885,9 +1888,9 @@ fn new_recording_confg(
let play_state = match data_source {
// Play files from the start by default - it feels nice and alive./
// RrdHttpStream downloads the whole file before decoding it, so we treat it the same as a file.
re_smart_channel::Source::File { .. } | re_smart_channel::Source::RrdHttpStream { .. } => {
PlayState::Playing
}
re_smart_channel::Source::File { .. }
| re_smart_channel::Source::RrdHttpStream { .. }
| re_smart_channel::Source::RrdWebEventListener => PlayState::Playing,

// Live data - follow it!
re_smart_channel::Source::Sdk
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/remote_viewer_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl RemoteViewerApp {
}
}
Err(err) => {
re_log::error!("Failed to parse message: {}", re_error::format(&err));
re_log::error!("Failed to parse message: {err}");
std::ops::ControlFlow::Break(())
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/viewer_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ impl ViewerAnalytics {
let data_source = match data_source {
re_smart_channel::Source::File { .. } => "file", // .rrd
re_smart_channel::Source::RrdHttpStream { .. } => "http",
re_smart_channel::Source::RrdWebEventListener { .. } => "web_event",
re_smart_channel::Source::Sdk => "sdk", // show()
re_smart_channel::Source::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::Source::TcpServer { .. } => "tcp_server", // connect()
Expand Down
35 changes: 32 additions & 3 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use eframe::wasm_bindgen::{self, prelude::*};
use std::sync::Arc;

use re_memory::AccountingAllocator;

Expand Down Expand Up @@ -54,7 +55,30 @@ pub async fn start(
let egui_ctx = cc.egui_ctx.clone();
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http(
url,
Box::new(move |msg| {
Arc::new(move |msg| {
egui_ctx.request_repaint(); // wake up ui thread
tx.send(msg).ok();
}),
);

Box::new(crate::App::from_receiver(
build_info,
&app_env,
startup_options,
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}
EndpointCategory::WebEventListener => {
// Process an rrd when it's posted via `window.postMessage`
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::Source::RrdWebEventListener,
);
let egui_ctx = cc.egui_ctx.clone();
re_log_encoding::stream_rrd_from_http::stream_rrd_from_event_listener(
Arc::new(move |msg| {
egui_ctx.request_repaint(); // wake up ui thread
tx.send(msg).ok();
}),
Expand Down Expand Up @@ -95,13 +119,18 @@ enum EndpointCategory {

/// A remote Rerun server.
WebSocket(String),

/// An eventListener for rrd posted from containing html
WebEventListener,
}

fn categorize_uri(mut uri: String) -> EndpointCategory {
if uri.starts_with("http") || uri.ends_with(".rrd") {
EndpointCategory::HttpRrd(uri)
} else if uri.starts_with("ws") {
} else if uri.starts_with("ws:") || uri.starts_with("wss:") {
EndpointCategory::WebSocket(uri)
} else if uri.starts_with("web_event:") {
EndpointCategory::WebEventListener
} else {
// If this is sometyhing like `foo.com` we can't know what it is until we connect to it.
// We could/should connect and see what it is, but for now we just take a wild guess instead:
Expand All @@ -119,7 +148,7 @@ fn get_url(info: &eframe::IntegrationInfo) -> String {
url = param.clone();
}
if url.is_empty() {
re_ws_comms::default_server_url(&info.web_info.location.hostname)
re_ws_comms::server_url(&info.web_info.location.hostname, Default::default())
} else {
url
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ analytics = ["dep:re_analytics"]
[dependencies]
re_log.workspace = true

anyhow.workspace = true
ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
thiserror.workspace = true
tokio = { workspace = true, default-features = false, features = [
"macros",
"rt-multi-thread",
Expand Down
Loading

1 comment on commit 3cd534f

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 3cd534f Previous: d33dab6 Ratio
datastore/num_rows=1000/num_instances=1000/packed=false/insert/default 3763364 ns/iter (± 94875) 3010738 ns/iter (± 65920) 1.25
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at/default 373 ns/iter (± 12) 371 ns/iter (± 2) 1.01
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at_missing/primary/default 263 ns/iter (± 2) 261 ns/iter (± 1) 1.01
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at_missing/secondaries/default 423 ns/iter (± 0) 422 ns/iter (± 0) 1.00
datastore/num_rows=1000/num_instances=1000/packed=false/range/default 3958807 ns/iter (± 107245) 3330134 ns/iter (± 112525) 1.19
datastore/num_rows=1000/num_instances=1000/gc/default 2456376 ns/iter (± 8122) 2424916 ns/iter (± 4701) 1.01
mono_points_arrow/generate_message_bundles 29570786 ns/iter (± 173076) 28432068 ns/iter (± 297731) 1.04
mono_points_arrow/generate_messages 115987279 ns/iter (± 849332) 114112970 ns/iter (± 886245) 1.02
mono_points_arrow/encode_log_msg 148749674 ns/iter (± 2692081) 145899870 ns/iter (± 1004942) 1.02
mono_points_arrow/encode_total 284652453 ns/iter (± 2421780) 289080503 ns/iter (± 1272415) 0.98
mono_points_arrow/decode_log_msg 176054401 ns/iter (± 607732) 179424574 ns/iter (± 722632) 0.98
mono_points_arrow/decode_message_bundles 59317900 ns/iter (± 2249038) 62185032 ns/iter (± 620422) 0.95
mono_points_arrow/decode_total 236176653 ns/iter (± 1711565) 240383305 ns/iter (± 1265142) 0.98
mono_points_arrow_batched/generate_message_bundles 18970248 ns/iter (± 934018) 23470439 ns/iter (± 1057275) 0.81
mono_points_arrow_batched/generate_messages 4029170 ns/iter (± 84826) 4586860 ns/iter (± 170445) 0.88
mono_points_arrow_batched/encode_log_msg 1341700 ns/iter (± 9135) 1375933 ns/iter (± 3621) 0.98
mono_points_arrow_batched/encode_total 25609081 ns/iter (± 890607) 30564909 ns/iter (± 904055) 0.84
mono_points_arrow_batched/decode_log_msg 778758 ns/iter (± 1588) 785594 ns/iter (± 2193) 0.99
mono_points_arrow_batched/decode_message_bundles 7657980 ns/iter (± 42302) 7990733 ns/iter (± 219325) 0.96
mono_points_arrow_batched/decode_total 8305891 ns/iter (± 100850) 9926223 ns/iter (± 391846) 0.84
batch_points_arrow/generate_message_bundles 193586 ns/iter (± 398) 195680 ns/iter (± 257) 0.99
batch_points_arrow/generate_messages 5144 ns/iter (± 10) 5108 ns/iter (± 12) 1.01
batch_points_arrow/encode_log_msg 258005 ns/iter (± 662) 265634 ns/iter (± 1424) 0.97
batch_points_arrow/encode_total 484781 ns/iter (± 1677) 491914 ns/iter (± 2123) 0.99
batch_points_arrow/decode_log_msg 211108 ns/iter (± 490) 212014 ns/iter (± 944) 1.00
batch_points_arrow/decode_message_bundles 1897 ns/iter (± 4) 1907 ns/iter (± 5) 0.99
batch_points_arrow/decode_total 220223 ns/iter (± 422) 224269 ns/iter (± 2163) 0.98
arrow_mono_points/insert 2310867913 ns/iter (± 3200071) 2316354742 ns/iter (± 4325193) 1.00
arrow_mono_points/query 1169816 ns/iter (± 9328) 1213093 ns/iter (± 15728) 0.96
arrow_batch_points/insert 1157855 ns/iter (± 13845) 1154639 ns/iter (± 3975) 1.00
arrow_batch_points/query 14374 ns/iter (± 98) 14532 ns/iter (± 93) 0.99
arrow_batch_vecs/insert 26376 ns/iter (± 96) 26419 ns/iter (± 45) 1.00
arrow_batch_vecs/query 325356 ns/iter (± 1011) 325444 ns/iter (± 452) 1.00
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.