From 3cd534ff0a13e38cb302f780afcfe0a693a7a9fd Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 14 Apr 2023 11:50:15 -0400 Subject: [PATCH] MVP Support for inline-rendering of Rerun within jupyter notebooks (#1798) (#1834) (#1844) * Introduce the ability to push an rrd binary via iframe.contentWindow.postMessage * New API to output the current buffered messages as a cell in jupyter * Example notebook with the cube demo * Track that we need to send another recording msg after draining the backlog * Dynamically resolve the app location based on git commit. Allow override to use self-hosted assets * Add some crude timeout logic in case the iframe fails to load * Don't persist app state in notebooks * Introduce new MemoryRecording for use with Jupyter notebooks (#1834) * Refactor the relationship between the assorted web / websocket servers (#1844) * Rename RemoteViewerServer to WebViewerSink * CLI arguments for specifying ports * Proper typing for the ports --- Cargo.lock | 8 +- crates/re_log_encoding/src/encoder.rs | 2 +- .../src/stream_rrd_from_http.rs | 54 +++- crates/re_sdk/src/lib.rs | 4 +- crates/re_sdk/src/log_sink.rs | 47 +++ crates/re_smart_channel/src/lib.rs | 7 +- crates/re_viewer/Cargo.toml | 2 +- crates/re_viewer/src/app.rs | 9 +- crates/re_viewer/src/remote_viewer_app.rs | 2 +- crates/re_viewer/src/viewer_analytics.rs | 1 + crates/re_viewer/src/web.rs | 35 ++- crates/re_web_viewer_server/Cargo.toml | 2 +- crates/re_web_viewer_server/src/lib.rs | 118 +++++++- crates/re_web_viewer_server/src/main.rs | 3 +- crates/re_ws_comms/Cargo.toml | 1 + crates/re_ws_comms/src/client.rs | 3 +- crates/re_ws_comms/src/lib.rs | 69 ++++- crates/re_ws_comms/src/server.rs | 90 ++++-- crates/rerun/src/clap.rs | 11 +- crates/rerun/src/run.rs | 46 ++- crates/rerun/src/web_viewer.rs | 106 +++---- examples/python/notebook/.gitignore | 1 + examples/python/notebook/README.md | 31 ++ examples/python/notebook/cube.ipynb | 272 ++++++++++++++++++ examples/python/notebook/requirements.txt | 2 + rerun_py/Cargo.toml | 6 +- rerun_py/rerun_sdk/rerun/__init__.py | 52 +++- rerun_py/rerun_sdk/rerun/recording.py | 90 ++++++ rerun_py/src/python_bridge.rs | 83 +++++- rerun_py/src/python_session.rs | 69 ++++- web_viewer/index.html | 4 + web_viewer/index_bundled.html | 4 + 32 files changed, 1104 insertions(+), 130 deletions(-) create mode 100644 examples/python/notebook/.gitignore create mode 100644 examples/python/notebook/README.md create mode 100644 examples/python/notebook/cube.ipynb create mode 100644 examples/python/notebook/requirements.txt create mode 100644 rerun_py/rerun_sdk/rerun/recording.py diff --git a/Cargo.lock b/Cargo.lock index 587df51f948f..24bf0e2dd7f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4207,6 +4207,7 @@ dependencies = [ "uuid", "vec1", "wasm-bindgen-futures", + "web-sys", "wgpu", "winapi", ] @@ -4215,7 +4216,6 @@ dependencies = [ name = "re_web_viewer_server" version = "0.4.0" dependencies = [ - "anyhow", "cargo_metadata", "ctrlc", "document-features", @@ -4225,6 +4225,7 @@ dependencies = [ "re_analytics", "re_build_web_viewer", "re_log", + "thiserror", "tokio", ] @@ -4242,6 +4243,7 @@ dependencies = [ "re_log", "re_log_types", "re_smart_channel", + "thiserror", "tokio", "tokio-tungstenite", "tungstenite", @@ -4352,9 +4354,13 @@ dependencies = [ "re_build_info", "re_error", "re_log", + "re_log_encoding", "re_log_types", "re_memory", + "re_web_viewer_server", + "re_ws_comms", "rerun", + "thiserror", "tokio", "uuid", ] diff --git a/crates/re_log_encoding/src/encoder.rs b/crates/re_log_encoding/src/encoder.rs index 98f39603e5ac..6d444746b6e1 100644 --- a/crates/re_log_encoding/src/encoder.rs +++ b/crates/re_log_encoding/src/encoder.rs @@ -90,7 +90,7 @@ impl Encoder { pub fn encode<'a>( messages: impl Iterator, - write: impl std::io::Write, + write: &mut impl std::io::Write, ) -> Result<(), EncodeError> { let mut encoder = Encoder::new(write)?; for message in messages { diff --git a/crates/re_log_encoding/src/stream_rrd_from_http.rs b/crates/re_log_encoding/src/stream_rrd_from_http.rs index 007105f37f53..065365d3258d 100644 --- a/crates/re_log_encoding/src/stream_rrd_from_http.rs +++ b/crates/re_log_encoding/src/stream_rrd_from_http.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use re_log_types::LogMsg; pub fn stream_rrd_from_http_to_channel(url: String) -> re_smart_channel::Receiver { @@ -6,14 +8,14 @@ pub fn stream_rrd_from_http_to_channel(url: String) -> re_smart_channel::Receive }); stream_rrd_from_http( url, - Box::new(move |msg| { + Arc::new(move |msg| { tx.send(msg).ok(); }), ); rx } -pub fn stream_rrd_from_http(url: String, on_msg: Box) { +pub fn stream_rrd_from_http(url: String, on_msg: Arc) { re_log::debug!("Downloading .rrd file from {url:?}…"); // TODO(emilk): stream the http request, progressively decoding the .rrd file. @@ -36,9 +38,50 @@ pub fn stream_rrd_from_http(url: String, on_msg: Box) { }); } +#[cfg(target_arch = "wasm32")] +mod web_event_listener { + use js_sys::Uint8Array; + use re_log_types::LogMsg; + use std::sync::Arc; + use wasm_bindgen::{closure::Closure, JsCast, JsValue}; + use web_sys::MessageEvent; + + /// Install an event-listener on `window` which will decode the incoming event as an rrd + /// + /// From javascript you can send an rrd using: + /// ``` ignore + /// var rrd = new Uint8Array(...); // Get an RRD from somewhere + /// window.postMessage(rrd, "*") + /// ``` + pub fn stream_rrd_from_event_listener(on_msg: Arc) { + let window = web_sys::window().expect("no global `window` exists"); + let closure = + Closure::wrap(Box::new( + move |event: JsValue| match event.dyn_into::() { + Ok(message_event) => { + let uint8_array = Uint8Array::new(&message_event.data()); + let result: Vec = uint8_array.to_vec(); + + crate::stream_rrd_from_http::decode_rrd(result, on_msg.clone()); + } + Err(js_val) => { + re_log::error!("Incoming event was not a MessageEvent. {:?}", js_val); + } + }, + ) as Box); + window + .add_event_listener_with_callback("message", closure.as_ref().unchecked_ref()) + .unwrap(); + closure.forget(); + } +} + +#[cfg(target_arch = "wasm32")] +pub use web_event_listener::stream_rrd_from_event_listener; + #[cfg(not(target_arch = "wasm32"))] #[allow(clippy::needless_pass_by_value)] // must match wasm version -fn decode_rrd(rrd_bytes: Vec, on_msg: Box) { +fn decode_rrd(rrd_bytes: Vec, on_msg: Arc) { match crate::decoder::Decoder::new(rrd_bytes.as_slice()) { Ok(decoder) => { for msg in decoder { @@ -61,15 +104,16 @@ fn decode_rrd(rrd_bytes: Vec, on_msg: Box) { #[cfg(target_arch = "wasm32")] mod web_decode { use re_log_types::LogMsg; + use std::sync::Arc; - pub fn decode_rrd(rrd_bytes: Vec, on_msg: Box) { + pub fn decode_rrd(rrd_bytes: Vec, on_msg: Arc) { wasm_bindgen_futures::spawn_local(decode_rrd_async(rrd_bytes, on_msg)); } /// Decodes the file in chunks, with an yield between each chunk. /// /// This is cooperative multi-tasking. - async fn decode_rrd_async(rrd_bytes: Vec, on_msg: Box) { + async fn decode_rrd_async(rrd_bytes: Vec, on_msg: Arc) { let mut last_yield = instant::Instant::now(); match crate::decoder::Decoder::new(rrd_bytes.as_slice()) { diff --git a/crates/re_sdk/src/lib.rs b/crates/re_sdk/src/lib.rs index ce3f2baf5009..a7e7fe27607f 100644 --- a/crates/re_sdk/src/lib.rs +++ b/crates/re_sdk/src/lib.rs @@ -49,7 +49,9 @@ pub mod demo_util; /// This is how you select whether the log stream ends up /// sent over TCP, written to file, etc. pub mod sink { - pub use crate::log_sink::{disabled, BufferedSink, LogSink, TcpSink}; + pub use crate::log_sink::{ + disabled, BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink, + }; #[cfg(not(target_arch = "wasm32"))] pub use re_log_encoding::{FileSink, FileSinkError}; diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index 2dd7fa8bf75f..639c682aaf21 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -76,6 +76,53 @@ impl LogSink for BufferedSink { } } +/// Store log messages directly in memory +/// +/// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating +/// a new sink the logged messages stay with the `MemorySink` (`drain_backlog` does nothing). +/// +/// Additionally the raw storage can be accessed and used to create an in-memory RRD. +/// This is useful for things like the inline rrd-viewer in Jupyter notebooks. +#[derive(Default)] +pub struct MemorySink(MemorySinkStorage); + +impl MemorySink { + /// Access the raw `MemorySinkStorage` + pub fn buffer(&self) -> MemorySinkStorage { + self.0.clone() + } +} + +impl LogSink for MemorySink { + fn send(&self, msg: LogMsg) { + self.0.lock().push(msg); + } + + fn send_all(&self, mut messages: Vec) { + self.0.lock().append(&mut messages); + } +} + +/// The storage used by [`MemorySink`] +#[derive(Default, Clone)] +pub struct MemorySinkStorage(std::sync::Arc>>); + +/// +impl MemorySinkStorage { + /// Lock the contained buffer + fn lock(&self) -> parking_lot::MutexGuard<'_, Vec> { + self.0.lock() + } + + /// Convert the stored messages into an in-memory Rerun log file + pub fn rrd_as_bytes(&self) -> Result, re_log_encoding::encoder::EncodeError> { + let messages = self.lock(); + let mut buffer = std::io::Cursor::new(Vec::new()); + re_log_encoding::encoder::encode(messages.iter(), &mut buffer)?; + Ok(buffer.into_inner()) + } +} + // ---------------------------------------------------------------------------- /// Stream log messages to a Rerun TCP server. diff --git a/crates/re_smart_channel/src/lib.rs b/crates/re_smart_channel/src/lib.rs index 0a278dc9e078..38fcb352dfdf 100644 --- a/crates/re_smart_channel/src/lib.rs +++ b/crates/re_smart_channel/src/lib.rs @@ -18,6 +18,11 @@ pub enum Source { /// Streaming an `.rrd` file over http. RrdHttpStream { url: String }, + /// Loading an `.rrd` file from a `postMessage` js event + /// + /// Only applicable to web browser iframes + RrdWebEventListener, + /// The source is the logging sdk directly, same process. Sdk, @@ -36,7 +41,7 @@ pub enum Source { impl Source { pub fn is_network(&self) -> bool { match self { - Self::File { .. } | Self::Sdk => false, + Self::File { .. } | Self::Sdk | Self::RrdWebEventListener => false, Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true, } } diff --git a/crates/re_viewer/Cargo.toml b/crates/re_viewer/Cargo.toml index 22ebab8f6ffc..8ec7e70ddc1b 100644 --- a/crates/re_viewer/Cargo.toml +++ b/crates/re_viewer/Cargo.toml @@ -117,7 +117,7 @@ winapi = "0.3.9" [target.'cfg(target_arch = "wasm32")'.dependencies] console_error_panic_hook = "0.1.6" wasm-bindgen-futures = "0.4" - +web-sys = { version = "0.3.52", features = ["Window"] } [build-dependencies] re_build_build_info.workspace = true diff --git a/crates/re_viewer/src/app.rs b/crates/re_viewer/src/app.rs index b001c12fbbef..41ebad070304 100644 --- a/crates/re_viewer/src/app.rs +++ b/crates/re_viewer/src/app.rs @@ -640,6 +640,9 @@ fn wait_screen_ui(ui: &mut egui::Ui, rx: &Receiver) { re_smart_channel::Source::RrdHttpStream { url } => { ui.strong(format!("Loading {url}…")); } + re_smart_channel::Source::RrdWebEventListener => { + ready_and_waiting(ui, "Waiting for logging data…"); + } re_smart_channel::Source::Sdk => { ready_and_waiting(ui, "Waiting for logging data from SDK"); } @@ -1885,9 +1888,9 @@ fn new_recording_confg( let play_state = match data_source { // Play files from the start by default - it feels nice and alive./ // RrdHttpStream downloads the whole file before decoding it, so we treat it the same as a file. - re_smart_channel::Source::File { .. } | re_smart_channel::Source::RrdHttpStream { .. } => { - PlayState::Playing - } + re_smart_channel::Source::File { .. } + | re_smart_channel::Source::RrdHttpStream { .. } + | re_smart_channel::Source::RrdWebEventListener => PlayState::Playing, // Live data - follow it! re_smart_channel::Source::Sdk diff --git a/crates/re_viewer/src/remote_viewer_app.rs b/crates/re_viewer/src/remote_viewer_app.rs index bcd6f37e609d..2012512f4f2f 100644 --- a/crates/re_viewer/src/remote_viewer_app.rs +++ b/crates/re_viewer/src/remote_viewer_app.rs @@ -56,7 +56,7 @@ impl RemoteViewerApp { } } Err(err) => { - re_log::error!("Failed to parse message: {}", re_error::format(&err)); + re_log::error!("Failed to parse message: {err}"); std::ops::ControlFlow::Break(()) } } diff --git a/crates/re_viewer/src/viewer_analytics.rs b/crates/re_viewer/src/viewer_analytics.rs index 1f847057f026..8269f3c19cc4 100644 --- a/crates/re_viewer/src/viewer_analytics.rs +++ b/crates/re_viewer/src/viewer_analytics.rs @@ -184,6 +184,7 @@ impl ViewerAnalytics { let data_source = match data_source { re_smart_channel::Source::File { .. } => "file", // .rrd re_smart_channel::Source::RrdHttpStream { .. } => "http", + re_smart_channel::Source::RrdWebEventListener { .. } => "web_event", re_smart_channel::Source::Sdk => "sdk", // show() re_smart_channel::Source::WsClient { .. } => "ws_client", // spawn() re_smart_channel::Source::TcpServer { .. } => "tcp_server", // connect() diff --git a/crates/re_viewer/src/web.rs b/crates/re_viewer/src/web.rs index 2f214e1073a4..58dcec080ce2 100644 --- a/crates/re_viewer/src/web.rs +++ b/crates/re_viewer/src/web.rs @@ -1,4 +1,5 @@ use eframe::wasm_bindgen::{self, prelude::*}; +use std::sync::Arc; use re_memory::AccountingAllocator; @@ -54,7 +55,30 @@ pub async fn start( let egui_ctx = cc.egui_ctx.clone(); re_log_encoding::stream_rrd_from_http::stream_rrd_from_http( url, - Box::new(move |msg| { + Arc::new(move |msg| { + egui_ctx.request_repaint(); // wake up ui thread + tx.send(msg).ok(); + }), + ); + + Box::new(crate::App::from_receiver( + build_info, + &app_env, + startup_options, + re_ui, + cc.storage, + rx, + std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), + )) + } + EndpointCategory::WebEventListener => { + // Process an rrd when it's posted via `window.postMessage` + let (tx, rx) = re_smart_channel::smart_channel( + re_smart_channel::Source::RrdWebEventListener, + ); + let egui_ctx = cc.egui_ctx.clone(); + re_log_encoding::stream_rrd_from_http::stream_rrd_from_event_listener( + Arc::new(move |msg| { egui_ctx.request_repaint(); // wake up ui thread tx.send(msg).ok(); }), @@ -95,13 +119,18 @@ enum EndpointCategory { /// A remote Rerun server. WebSocket(String), + + /// An eventListener for rrd posted from containing html + WebEventListener, } fn categorize_uri(mut uri: String) -> EndpointCategory { if uri.starts_with("http") || uri.ends_with(".rrd") { EndpointCategory::HttpRrd(uri) - } else if uri.starts_with("ws") { + } else if uri.starts_with("ws:") || uri.starts_with("wss:") { EndpointCategory::WebSocket(uri) + } else if uri.starts_with("web_event:") { + EndpointCategory::WebEventListener } else { // If this is sometyhing like `foo.com` we can't know what it is until we connect to it. // We could/should connect and see what it is, but for now we just take a wild guess instead: @@ -119,7 +148,7 @@ fn get_url(info: &eframe::IntegrationInfo) -> String { url = param.clone(); } if url.is_empty() { - re_ws_comms::default_server_url(&info.web_info.location.hostname) + re_ws_comms::server_url(&info.web_info.location.hostname, Default::default()) } else { url } diff --git a/crates/re_web_viewer_server/Cargo.toml b/crates/re_web_viewer_server/Cargo.toml index 3631ce230645..2c519db466fe 100644 --- a/crates/re_web_viewer_server/Cargo.toml +++ b/crates/re_web_viewer_server/Cargo.toml @@ -46,11 +46,11 @@ analytics = ["dep:re_analytics"] [dependencies] re_log.workspace = true -anyhow.workspace = true ctrlc.workspace = true document-features = "0.2" futures-util = "0.3" hyper = { version = "0.14", features = ["full"] } +thiserror.workspace = true tokio = { workspace = true, default-features = false, features = [ "macros", "rt-multi-thread", diff --git a/crates/re_web_viewer_server/src/lib.rs b/crates/re_web_viewer_server/src/lib.rs index 7e97775a02a7..4d8f29172588 100644 --- a/crates/re_web_viewer_server/src/lib.rs +++ b/crates/re_web_viewer_server/src/lib.rs @@ -7,11 +7,17 @@ #![forbid(unsafe_code)] #![warn(clippy::all, rust_2018_idioms)] -use std::task::{Context, Poll}; +use std::{ + fmt::Display, + str::FromStr, + task::{Context, Poll}, +}; use futures_util::future; use hyper::{server::conn::AddrIncoming, service::Service, Body, Request, Response}; +pub const DEFAULT_WEB_VIEWER_SERVER_PORT: u16 = 9090; + #[cfg(not(feature = "__ci"))] mod data { // If you add/remove/change the paths here, also update the include-list in `Cargo.toml`! @@ -32,6 +38,21 @@ mod data { pub const VIEWER_WASM_RELEASE: &[u8] = include_bytes!("../web_viewer/re_viewer_bg.wasm"); } +#[derive(thiserror::Error, Debug)] +pub enum WebViewerServerError { + #[error("Could not parse address: {0}")] + AddrParseFailed(#[from] std::net::AddrParseError), + + #[error("failed to bind to port {0}: {1}")] + BindFailed(WebViewerServerPort, hyper::Error), + + #[error("failed to join web viewer server task: {0}")] + JoinError(#[from] tokio::task::JoinError), + + #[error("failed to serve web viewer: {0}")] + ServeFailed(hyper::Error), +} + struct Svc { // NOTE: Optional because it is possible to have the `analytics` feature flag enabled // while at the same time opting-out of analytics at run-time. @@ -149,27 +170,108 @@ impl Service for MakeSvc { // ---------------------------------------------------------------------------- -/// Hosts the Web Viewer Wasm+HTML +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// Typed port for use with [`WebViewerServer`] +pub struct WebViewerServerPort(pub u16); + +impl Default for WebViewerServerPort { + fn default() -> Self { + Self(DEFAULT_WEB_VIEWER_SERVER_PORT) + } +} + +impl Display for WebViewerServerPort { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +// Needed for clap +impl FromStr for WebViewerServerPort { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.parse::() { + Ok(port) => Ok(WebViewerServerPort(port)), + Err(err) => Err(format!("Failed to parse port: {err}")), + } + } +} + +/// HTTP host for the Rerun Web Viewer application +/// This serves the HTTP+Wasm+JS files that make up the web-viewer. pub struct WebViewerServer { server: hyper::Server, } impl WebViewerServer { - pub fn new(port: u16) -> Self { - let bind_addr = format!("0.0.0.0:{port}").parse().unwrap(); - let server = hyper::Server::bind(&bind_addr).serve(MakeSvc); - Self { server } + /// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port. + /// + /// A port of 0 will let the OS choose a free port. + pub fn new(port: WebViewerServerPort) -> Result { + let bind_addr = format!("0.0.0.0:{port}").parse()?; + let server = hyper::Server::try_bind(&bind_addr) + .map_err(|e| WebViewerServerError::BindFailed(port, e))? + .serve(MakeSvc); + Ok(Self { server }) } pub async fn serve( self, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, - ) -> anyhow::Result<()> { + ) -> Result<(), WebViewerServerError> { self.server .with_graceful_shutdown(async { shutdown_rx.recv().await.ok(); }) - .await?; + .await + .map_err(WebViewerServerError::ServeFailed)?; Ok(()) } + + pub fn port(&self) -> WebViewerServerPort { + WebViewerServerPort(self.server.local_addr().port()) + } +} + +/// Sync handle for the [`WebViewerServer`] +/// +/// When dropped, the server will be shut down. +pub struct WebViewerServerHandle { + port: WebViewerServerPort, + shutdown_tx: tokio::sync::broadcast::Sender<()>, +} + +impl Drop for WebViewerServerHandle { + fn drop(&mut self) { + re_log::info!("Shutting down web server on port {}.", self.port); + self.shutdown_tx.send(()).ok(); + } +} + +impl WebViewerServerHandle { + /// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port. + /// Returns a [`WebViewerServerHandle`] that will shutdown the server when dropped. + /// + /// A port of 0 will let the OS choose a free port. + /// + /// The caller needs to ensure that there is a `tokio` runtime running. + pub fn new(requested_port: WebViewerServerPort) -> Result { + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + + let web_server = WebViewerServer::new(requested_port)?; + + let port = web_server.port(); + + tokio::spawn(async move { web_server.serve(shutdown_rx).await }); + + re_log::info!("Started web server on port {}.", port); + + Ok(Self { port, shutdown_tx }) + } + + /// Get the port where the HTTP server is listening + pub fn port(&self) -> WebViewerServerPort { + self.port + } } diff --git a/crates/re_web_viewer_server/src/main.rs b/crates/re_web_viewer_server/src/main.rs index 84bb08a376d6..df2c41d3a2a9 100644 --- a/crates/re_web_viewer_server/src/main.rs +++ b/crates/re_web_viewer_server/src/main.rs @@ -4,7 +4,7 @@ #[tokio::main] async fn main() { re_log::setup_native_logging(); - let port = 9090; + let port = Default::default(); eprintln!("Hosting web-viewer on http://127.0.0.1:{port}"); // Shutdown server via Ctrl+C @@ -16,6 +16,7 @@ async fn main() { .expect("Error setting Ctrl-C handler"); re_web_viewer_server::WebViewerServer::new(port) + .expect("Could not create web server") .serve(shutdown_rx) .await .unwrap(); diff --git a/crates/re_ws_comms/Cargo.toml b/crates/re_ws_comms/Cargo.toml index 55c22735d041..3713fb0cce8a 100644 --- a/crates/re_ws_comms/Cargo.toml +++ b/crates/re_ws_comms/Cargo.toml @@ -45,6 +45,7 @@ re_log_types = { workspace = true, features = ["serde"] } anyhow.workspace = true bincode = "1.3" document-features = "0.2" +thiserror.workspace = true # Client: ewebsock = { version = "0.2", optional = true } diff --git a/crates/re_ws_comms/src/client.rs b/crates/re_ws_comms/src/client.rs index 6faaa54dea3d..4c72dfa540a4 100644 --- a/crates/re_ws_comms/src/client.rs +++ b/crates/re_ws_comms/src/client.rs @@ -2,7 +2,8 @@ use std::ops::ControlFlow; use ewebsock::{WsEvent, WsMessage, WsSender}; -use crate::Result; +// TODO(jleibs): use thiserror +pub type Result = anyhow::Result; /// Represents a connection to the server. /// Disconnects on drop. diff --git a/crates/re_ws_comms/src/lib.rs b/crates/re_ws_comms/src/lib.rs index 41c4d77d2de4..e38527f04dc4 100644 --- a/crates/re_ws_comms/src/lib.rs +++ b/crates/re_ws_comms/src/lib.rs @@ -6,18 +6,18 @@ #[cfg(feature = "client")] mod client; +use std::{fmt::Display, str::FromStr}; + #[cfg(feature = "client")] pub use client::Connection; #[cfg(feature = "server")] mod server; #[cfg(feature = "server")] -pub use server::Server; +pub use server::{RerunServer, RerunServerHandle}; use re_log_types::LogMsg; -pub type Result = anyhow::Result; - pub const DEFAULT_WS_SERVER_PORT: u16 = 9877; #[cfg(feature = "tls")] @@ -26,8 +26,58 @@ pub const PROTOCOL: &str = "wss"; #[cfg(not(feature = "tls"))] pub const PROTOCOL: &str = "ws"; -pub fn default_server_url(hostname: &str) -> String { - format!("{PROTOCOL}://{hostname}:{DEFAULT_WS_SERVER_PORT}") +// ---------------------------------------------------------------------------- + +#[derive(thiserror::Error, Debug)] +pub enum RerunServerError { + #[error("failed to bind to port {0}: {1}")] + BindFailed(RerunServerPort, std::io::Error), + + #[error("received an invalid message")] + InvalidMessagePrefix, + + #[error("received an invalid message")] + InvalidMessage(#[from] bincode::Error), + + #[cfg(feature = "server")] + #[error("failed to join web viewer server task: {0}")] + JoinError(#[from] tokio::task::JoinError), + + #[cfg(feature = "server")] + #[error("tokio error: {0}")] + TokioIoError(#[from] tokio::io::Error), +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// Typed port for use with [`RerunServer`] +pub struct RerunServerPort(pub u16); + +impl Default for RerunServerPort { + fn default() -> Self { + Self(DEFAULT_WS_SERVER_PORT) + } +} + +impl Display for RerunServerPort { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +// Needed for clap +impl FromStr for RerunServerPort { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.parse::() { + Ok(port) => Ok(RerunServerPort(port)), + Err(err) => Err(format!("Failed to parse port: {err}")), + } + } +} + +pub fn server_url(hostname: &str, port: RerunServerPort) -> String { + format!("{PROTOCOL}://{hostname}:{port}") } const PREFIX: [u8; 4] = *b"RR00"; @@ -41,14 +91,11 @@ pub fn encode_log_msg(log_msg: &LogMsg) -> Vec { bytes } -pub fn decode_log_msg(data: &[u8]) -> Result { +pub fn decode_log_msg(data: &[u8]) -> Result { let payload = data .strip_prefix(&PREFIX) - .ok_or_else(|| anyhow::format_err!("Message didn't start with the correct prefix"))?; + .ok_or(RerunServerError::InvalidMessagePrefix)?; - use anyhow::Context as _; use bincode::Options as _; - bincode::DefaultOptions::new() - .deserialize(payload) - .context("bincode") + Ok(bincode::DefaultOptions::new().deserialize(payload)?) } diff --git a/crates/re_ws_comms/src/server.rs b/crates/re_ws_comms/src/server.rs index 6830cd5d57a1..d6fde09d2851 100644 --- a/crates/re_ws_comms/src/server.rs +++ b/crates/re_ws_comms/src/server.rs @@ -16,28 +16,34 @@ use tokio_tungstenite::{accept_async, tungstenite::Error}; use re_log_types::LogMsg; use re_smart_channel::Receiver; -// ---------------------------------------------------------------------------- +use crate::{server_url, RerunServerError, RerunServerPort}; -pub struct Server { +/// Websocket host for relaying [`LogMsg`]s to a web viewer. +pub struct RerunServer { listener: TcpListener, + port: RerunServerPort, } -impl Server { - /// Start a pub-sub server listening on the given port - pub async fn new(port: u16) -> anyhow::Result { - use anyhow::Context as _; - +impl RerunServer { + /// Create new [`RerunServer`] to relay [`LogMsg`]s to a websocket. + /// The websocket will be available at `port`. + /// + /// A port of 0 will let the OS choose a free port. + pub async fn new(port: RerunServerPort) -> Result { let bind_addr = format!("0.0.0.0:{port}"); let listener = TcpListener::bind(&bind_addr) .await - .with_context(|| format!("Can't listen on {bind_addr:?}"))?; + .map_err(|e| RerunServerError::BindFailed(port, e))?; + + let port = RerunServerPort(listener.local_addr()?.port()); re_log::info!( - "Listening for websocket traffic on {bind_addr}. Connect with a web Rerun Viewer." + "Listening for websocket traffic on {}. Connect with a Rerun Web Viewer.", + listener.local_addr()? ); - Ok(Self { listener }) + Ok(Self { listener, port }) } /// Accept new connections until we get a message on `shutdown_rx` @@ -45,9 +51,7 @@ impl Server { self, rx: Receiver, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, - ) -> anyhow::Result<()> { - use anyhow::Context as _; - + ) -> Result<(), RerunServerError> { let history = Arc::new(Mutex::new(Vec::new())); let log_stream = to_broadcast_stream(rx, history.clone()); @@ -60,9 +64,7 @@ impl Server { } }; - let peer = tcp_stream - .peer_addr() - .context("connected streams should have a peer address")?; + let peer = tcp_stream.peer_addr()?; tokio::spawn(accept_connection( log_stream.clone(), peer, @@ -71,6 +73,62 @@ impl Server { )); } } + + pub fn server_url(&self) -> String { + server_url("localhost", self.port) + } +} + +/// Sync handle for the [`RerunServer`] +/// +/// When dropped, the server will be shut down. +pub struct RerunServerHandle { + port: RerunServerPort, + shutdown_tx: tokio::sync::broadcast::Sender<()>, +} + +impl Drop for RerunServerHandle { + fn drop(&mut self) { + re_log::info!("Shutting down Rerun server on port {}.", self.port); + self.shutdown_tx.send(()).ok(); + } +} + +impl RerunServerHandle { + /// Create new [`RerunServer`] to relay [`LogMsg`]s to a websocket. + /// Returns a [`RerunServerHandle`] that will shutdown the server when dropped. + /// + /// A port of 0 will let the OS choose a free port. + /// + /// The caller needs to ensure that there is a `tokio` runtime running. + pub fn new( + rerun_rx: Receiver, + requested_port: RerunServerPort, + ) -> Result { + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + + let rt = tokio::runtime::Handle::current(); + + let ws_server = rt.block_on(tokio::spawn(async move { + let ws_server = RerunServer::new(requested_port).await; + ws_server + }))??; + + let port = ws_server.port; + + tokio::spawn(async move { ws_server.listen(rerun_rx, shutdown_rx).await }); + + Ok(Self { port, shutdown_tx }) + } + + /// Get the port where the websocket server is listening + pub fn port(&self) -> RerunServerPort { + self.port + } + + pub fn server_url(&self) -> String { + server_url("localhost", self.port) + } } fn to_broadcast_stream( diff --git a/crates/rerun/src/clap.rs b/crates/rerun/src/clap.rs index 29ba016d81ca..fddb1de26aa6 100644 --- a/crates/rerun/src/clap.rs +++ b/crates/rerun/src/clap.rs @@ -2,6 +2,11 @@ use std::{net::SocketAddr, path::PathBuf}; +#[cfg(feature = "web_viewer")] +use re_web_viewer_server::WebViewerServerPort; +#[cfg(feature = "web_viewer")] +use re_ws_comms::RerunServerPort; + use crate::Session; // --- @@ -103,7 +108,11 @@ impl RerunArgs { #[cfg(feature = "web_viewer")] RerunBehavior::Serve => { let open_browser = true; - crate::web_viewer::new_sink(open_browser) + crate::web_viewer::new_sink( + open_browser, + WebViewerServerPort::default(), + RerunServerPort::default(), + )? } #[cfg(feature = "native_viewer")] diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 0cbebd6ed75b..979c05705ff7 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -5,6 +5,10 @@ use re_smart_channel::Receiver; use anyhow::Context as _; use clap::Subcommand; +#[cfg(feature = "web_viewer")] +use re_web_viewer_server::WebViewerServerPort; +#[cfg(feature = "web_viewer")] +use re_ws_comms::RerunServerPort; #[cfg(feature = "web_viewer")] use crate::web_viewer::host_web_viewer; @@ -67,7 +71,7 @@ struct Args { #[clap(long, default_value_t = true)] persist_state: bool, - /// What TCP port do we listen to (for SDK:s to connect to)? + /// What TCP port do we listen to (for SDKs to connect to)? #[cfg(feature = "server")] #[clap(long, default_value_t = re_sdk_comms::DEFAULT_SERVER_PORT)] port: u16, @@ -106,6 +110,18 @@ struct Args { /// Requires Rerun to have been compiled with the 'web_viewer' feature. #[clap(long)] web_viewer: bool, + + /// What port do we listen to for hosting the web viewer over HTTP. + /// A port of 0 will pick a random port. + #[cfg(feature = "web_viewer")] + #[clap(long, default_value_t = Default::default())] + web_viewer_port: WebViewerServerPort, + + /// What port do we listen to for incoming websocket connections from the viewer + /// A port of 0 will pick a random port. + #[cfg(feature = "web_viewer")] + #[clap(long, default_value_t = Default::default())] + ws_server_port: RerunServerPort, } #[derive(Debug, Clone, Subcommand)] @@ -303,8 +319,14 @@ async fn run_impl( if args.web_viewer { #[cfg(feature = "web_viewer")] { - let web_viewer = - host_web_viewer(true, rerun_server_ws_url, shutdown_rx.resubscribe()); + let web_viewer = host_web_viewer( + args.web_viewer_port, + true, + rerun_server_ws_url, + shutdown_rx.resubscribe(), + ); + // We return here because the running [`WebViewerServer`] is all we need. + // The page we open will be pointed at a websocket url hosted by a *different* server. return web_viewer.await; } #[cfg(not(feature = "web_viewer"))] @@ -356,7 +378,9 @@ async fn run_impl( #[cfg(feature = "web_viewer")] { #[cfg(feature = "server")] - if args.url_or_path.is_none() && args.port == re_ws_comms::DEFAULT_WS_SERVER_PORT { + if args.url_or_path.is_none() + && (args.port == args.web_viewer_port.0 || args.port == args.ws_server_port.0) + { anyhow::bail!( "Trying to spawn a websocket server on {}, but this port is \ already used by the server we're connecting to. Please specify a different port.", @@ -369,17 +393,21 @@ async fn run_impl( let shutdown_web_viewer = shutdown_rx.resubscribe(); // This is the server which the web viewer will talk to: - let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT).await?; + let ws_server = re_ws_comms::RerunServer::new(args.ws_server_port).await?; + let ws_server_url = ws_server.server_url(); let ws_server_handle = tokio::spawn(ws_server.listen(rx, shutdown_ws_server)); - let ws_server_url = re_ws_comms::default_server_url("127.0.0.1"); // This is the server that serves the Wasm+HTML: - let web_server_handle = - tokio::spawn(host_web_viewer(true, ws_server_url, shutdown_web_viewer)); + let web_server_handle = tokio::spawn(host_web_viewer( + args.web_viewer_port, + true, + ws_server_url, + shutdown_web_viewer, + )); // Wait for both servers to shutdown. web_server_handle.await?.ok(); - return ws_server_handle.await?; + return ws_server_handle.await?.map_err(anyhow::Error::from); } #[cfg(not(feature = "web_viewer"))] diff --git a/crates/rerun/src/web_viewer.rs b/crates/rerun/src/web_viewer.rs index feb606bb91b0..bc1fc6f3396e 100644 --- a/crates/rerun/src/web_viewer.rs +++ b/crates/rerun/src/web_viewer.rs @@ -1,80 +1,78 @@ use re_log_types::LogMsg; +use re_web_viewer_server::{WebViewerServerHandle, WebViewerServerPort}; +use re_ws_comms::{RerunServerHandle, RerunServerPort}; -/// Hosts two servers: -/// * A web-server, serving the web-viewer -/// * A `WebSocket` server, server [`LogMsg`]es to remote viewer(s). -struct RemoteViewerServer { +/// A [`crate::sink::LogSink`] tied to a hosted Rerun web viewer. This internally stores two servers: +/// * A [`re_ws_comms::RerunServer`] to relay messages from the sink to a websocket connection +/// * A [`re_web_viewer_server::WebViewerServer`] to serve the Wasm+HTML +struct WebViewerSink { + /// Sender to send messages to the [`re_ws_comms::RerunServer`] sender: re_smart_channel::Sender, - shutdown_tx: tokio::sync::broadcast::Sender<()>, -} -impl Drop for RemoteViewerServer { - fn drop(&mut self) { - re_log::info!("Shutting down web server."); - self.shutdown_tx.send(()).ok(); - } + /// Handle to keep the [`re_ws_comms::RerunServer`] alive + _rerun_server: RerunServerHandle, + + /// Handle to keep the [`re_web_viewer_server::WebViewerServer`] alive + _webviewer_server: WebViewerServerHandle, } -impl RemoteViewerServer { - pub fn new(open_browser: bool) -> Self { +impl WebViewerSink { + pub fn new( + open_browser: bool, + web_port: WebViewerServerPort, + ws_port: RerunServerPort, + ) -> anyhow::Result { let (rerun_tx, rerun_rx) = re_smart_channel::smart_channel(re_smart_channel::Source::Sdk); - let (shutdown_tx, shutdown_rx_ws_server) = tokio::sync::broadcast::channel(1); - let shutdown_rx_web_server = shutdown_tx.subscribe(); - tokio::spawn(async move { - // This is the server which the web viewer will talk to: - let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT) - .await - .unwrap(); - let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx, shutdown_rx_ws_server)); - let ws_server_url = re_ws_comms::default_server_url("127.0.0.1"); + let rerun_server = RerunServerHandle::new(rerun_rx, ws_port)?; + let webviewer_server = WebViewerServerHandle::new(web_port)?; - // This is the server that serves the Wasm+HTML: - let web_server_handle = tokio::spawn(host_web_viewer( - open_browser, - ws_server_url, - shutdown_rx_web_server, - )); + let web_port = webviewer_server.port(); + let server_url = rerun_server.server_url(); + let viewer_url = format!("http://127.0.0.1:{web_port}?url={server_url}"); - ws_server_handle.await.unwrap().unwrap(); - web_server_handle.await.unwrap().unwrap(); - }); + re_log::info!("Web server is running - view it at {viewer_url}"); + if open_browser { + webbrowser::open(&viewer_url).ok(); + } - Self { + Ok(Self { sender: rerun_tx, - shutdown_tx, - } + _rerun_server: rerun_server, + _webviewer_server: webviewer_server, + }) } } -/// Hosts two servers: -/// * A web-server, serving the web-viewer -/// * A `WebSocket` server, server [`LogMsg`]es to remote viewer(s). +/// Async helper to spawn an instance of the [`re_web_viewer_server::WebViewerServer`]. +/// This serves the HTTP+Wasm+JS files that make up the web-viewer. +/// +/// Optionally opens a browser with the web-viewer and connects to the provided `target_url`. +/// This url could be a hosted RRD file or a `ws://` url to a running [`re_ws_comms::RerunServer`]. /// -/// Optionally opens a browser with the web-viewer. +/// Note: this does not include the websocket server. #[cfg(feature = "web_viewer")] pub async fn host_web_viewer( + web_port: WebViewerServerPort, open_browser: bool, - ws_server_url: String, + source_url: String, shutdown_rx: tokio::sync::broadcast::Receiver<()>, ) -> anyhow::Result<()> { - let web_port = 9090; - let viewer_url = format!("http://127.0.0.1:{web_port}?url={ws_server_url}"); + let web_server = re_web_viewer_server::WebViewerServer::new(web_port)?; + let port = web_server.port(); + let web_server_handle = web_server.serve(shutdown_rx); - let web_server = re_web_viewer_server::WebViewerServer::new(web_port); - let web_server_handle = tokio::spawn(web_server.serve(shutdown_rx)); + let viewer_url = format!("http://127.0.0.1:{port}?url={source_url}"); re_log::info!("Web server is running - view it at {viewer_url}"); if open_browser { webbrowser::open(&viewer_url).ok(); - } else { - re_log::info!("Web server is running - view it at {viewer_url}"); } - web_server_handle.await? + web_server_handle.await.map_err(anyhow::Error::msg) } -impl crate::sink::LogSink for RemoteViewerServer { +impl crate::sink::LogSink for WebViewerSink { fn send(&self, msg: LogMsg) { if let Err(err) = self.sender.send(msg) { re_log::error_once!("Failed to send log message to web server: {err}"); @@ -96,7 +94,15 @@ impl crate::sink::LogSink for RemoteViewerServer { /// This function returns immediately. /// /// The caller needs to ensure that there is a `tokio` runtime running. -#[must_use] -pub fn new_sink(open_browser: bool) -> Box { - Box::new(RemoteViewerServer::new(open_browser)) +#[must_use = "the sink must be kept around to keep the servers running"] +pub fn new_sink( + open_browser: bool, + web_port: WebViewerServerPort, + ws_port: RerunServerPort, +) -> anyhow::Result> { + Ok(Box::new(WebViewerSink::new( + open_browser, + web_port, + ws_port, + )?)) } diff --git a/examples/python/notebook/.gitignore b/examples/python/notebook/.gitignore new file mode 100644 index 000000000000..87620ac7e74e --- /dev/null +++ b/examples/python/notebook/.gitignore @@ -0,0 +1 @@ +.ipynb_checkpoints/ diff --git a/examples/python/notebook/README.md b/examples/python/notebook/README.md new file mode 100644 index 000000000000..2ccb5d2f8ece --- /dev/null +++ b/examples/python/notebook/README.md @@ -0,0 +1,31 @@ +# Overview + +Rerun has limited support for running directly embedded within a [jupyter][https://jupyter.org/] notebook. +Many additional environments beyond jupyter are supported such as [Google Colab][https://colab.research.google.com/] +or [VSCode](https://code.visualstudio.com/blogs/2021/08/05/notebooks). + +In order to show a rerun viewer inline within the notebook you need to use a special in-memory +recording: +``` +rec = rr.memory_recording() +``` + +After creating this recording all the normal rerun commands will work as expected and log +to this recording instance. When you are ready to show it you can return it at the end of your cell +or call `rec.show()`. + +# Running in Jupyter + +The easiest way to get a feel for working with notebooks is to use it: + +Install jupyter +``` +pip install -r requirements.txt +``` + +Open the notebook +``` +jupyyter notebook cube.ipynb +``` + +Follow along in the browser that opens. diff --git a/examples/python/notebook/cube.ipynb b/examples/python/notebook/cube.ipynb new file mode 100644 index 000000000000..34dea7f23083 --- /dev/null +++ b/examples/python/notebook/cube.ipynb @@ -0,0 +1,272 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b31c0a84", + "metadata": {}, + "source": [ + "## Rerun imports and initialization" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1076c3a0", + "metadata": {}, + "outputs": [], + "source": [ + "from collections import namedtuple\n", + "from math import cos, sin, tau\n", + "import math\n", + "\n", + "import numpy as np\n", + "\n", + "import rerun as rr\n", + "\n", + "rr.init(\"cube\")" + ] + }, + { + "cell_type": "markdown", + "id": "f3c194db", + "metadata": {}, + "source": [ + "## Optional: start a local web-viewer server\n", + "\n", + "By default, Rerun will use a copy of the viewer hosted at [https://app.rerun.io](https://app.rerun.io).\n", + "This is generally preferable as it will work more seamlessly even if you\n", + "are connected to a notebook instance on a remote machine. However there\n", + "are some cases where this won't work such as running from source, or\n", + "using your notebook in an offline environment.\n", + "\n", + "In these cases you can start a local viewer server by uncommenting the following\n", + "line:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "63f80605", + "metadata": {}, + "outputs": [], + "source": [ + "rr.start_web_viewer_server()" + ] + }, + { + "cell_type": "markdown", + "id": "bf894a1f", + "metadata": {}, + "source": [ + "## Helper to the colored cube\n", + "\n", + "This is the same as the color cube demo from `rerun -m rerun_demo`, but the code\n", + "is repeated here for context." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f709925e", + "metadata": {}, + "outputs": [], + "source": [ + "ColorGrid = namedtuple(\"ColorGrid\", [\"positions\", \"colors\"])\n", + "\n", + "\n", + "def build_color_grid(x_count=10, y_count=10, z_count=10, twist=0):\n", + " \"\"\"\n", + " Create a cube of points with colors.\n", + "\n", + " The total point cloud will have x_count * y_count * z_count points.\n", + "\n", + " Parameters\n", + " ----------\n", + " x_count, y_count, z_count:\n", + " Number of points in each dimension.\n", + " twist:\n", + " Angle to twist from bottom to top of the cube\n", + "\n", + " \"\"\"\n", + "\n", + " grid = np.mgrid[\n", + " slice(-10, 10, x_count * 1j),\n", + " slice(-10, 10, y_count * 1j),\n", + " slice(-10, 10, z_count * 1j),\n", + " ]\n", + "\n", + " angle = np.linspace(-float(twist) / 2, float(twist) / 2, z_count)\n", + " for z in range(z_count):\n", + " xv, yv, zv = grid[:, :, :, z]\n", + " rot_xv = xv * cos(angle[z]) - yv * sin(angle[z])\n", + " rot_yv = xv * sin(angle[z]) + yv * cos(angle[z])\n", + " grid[:, :, :, z] = [rot_xv, rot_yv, zv]\n", + "\n", + " positions = np.vstack([xyz.ravel() for xyz in grid])\n", + "\n", + " colors = np.vstack(\n", + " [\n", + " xyz.ravel()\n", + " for xyz in np.mgrid[\n", + " slice(0, 255, x_count * 1j),\n", + " slice(0, 255, y_count * 1j),\n", + " slice(0, 255, z_count * 1j),\n", + " ]\n", + " ]\n", + " )\n", + "\n", + " return ColorGrid(positions.T, colors.T.astype(np.uint8))" + ] + }, + { + "cell_type": "markdown", + "id": "b9a75269", + "metadata": {}, + "source": [ + "## Start a new recording\n", + "\n", + "To start a new recording all you need to do is call rr.memory_recording()." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4e1caf9", + "metadata": {}, + "outputs": [], + "source": [ + "rec = rr.memory_recording()" + ] + }, + { + "cell_type": "markdown", + "id": "6e4f945b", + "metadata": {}, + "source": [ + "## Showing the recording\n", + "\n", + "At any point you can show this recording by returning it as the last item in the cell.\n", + "In this case the recording simply does not have any data in it yet." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d586d222", + "metadata": {}, + "outputs": [], + "source": [ + "rec" + ] + }, + { + "cell_type": "markdown", + "id": "04c095ef", + "metadata": {}, + "source": [ + "## Logging some data\n", + "\n", + "Now we can create some data and add it to the recording before we show it again." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "92871ea1", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "STEPS = 100\n", + "twists = math.pi * np.sin(np.linspace(0, math.tau, STEPS)) / 4\n", + "for t in range(STEPS):\n", + " rr.set_time_sequence(\"step\", t)\n", + " cube = build_color_grid(10, 10, 10, twist=twists[t])\n", + " rr.log_points(\"cube\", positions=cube.positions, colors=cube.colors, radii=0.5)\n", + "\n", + "rec" + ] + }, + { + "cell_type": "markdown", + "id": "31d392a8", + "metadata": {}, + "source": [ + "## Adjusting the view\n", + "\n", + "The recording also as a `show` method that lets you adjust properties such as width and height.\n", + "In the future this will support additional blueprint and layout options." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1a1b0f66-4287-4705-8be5-ae837ffe3f90", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "rec.show(width=400, height=400)" + ] + }, + { + "cell_type": "markdown", + "id": "36f9f61b", + "metadata": {}, + "source": [ + "## Stating a new recording\n", + "\n", + "You can always start another recording by calling `rr.memory_recording()` again." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c4cc33fd", + "metadata": {}, + "outputs": [], + "source": [ + "rec2 = rr.memory_recording()\n", + "\n", + "STEPS = 1\n", + "twists = math.pi * np.sin(np.linspace(0, math.tau, STEPS)) / 4\n", + "for t in range(STEPS):\n", + " rr.set_time_sequence(\"step\", t)\n", + " cube = build_color_grid(50, 50, 50, twist=twists[t])\n", + " rr.log_points(\"cube\", positions=cube.positions, colors=cube.colors, radii=0.5)\n", + "\n", + "rec2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eb8f7701", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/python/notebook/requirements.txt b/examples/python/notebook/requirements.txt new file mode 100644 index 000000000000..1e5f56badec1 --- /dev/null +++ b/examples/python/notebook/requirements.txt @@ -0,0 +1,2 @@ +jupyter +rerun-sdk diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index a7b89a43f450..9a48f157f713 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -35,13 +35,14 @@ native_viewer = ["rerun/native_viewer"] ## ## You also need to install some additional tools, which you can do by running ## [`scripts/setup_web.sh`](https://github.com/rerun-io/rerun/blob/main/scripts/setup_web.sh). -web_viewer = ["rerun/web_viewer"] +web_viewer = ["rerun/web_viewer", "dep:re_web_viewer_server", "dep:re_ws_comms"] [dependencies] re_build_info.workspace = true re_error.workspace = true re_log.workspace = true +re_log_encoding.workspace = true re_log_types.workspace = true re_memory.workspace = true rerun = { workspace = true, default-features = false, features = [ @@ -49,6 +50,8 @@ rerun = { workspace = true, default-features = false, features = [ "server", "sdk", ] } +re_web_viewer_server = { workspace = true, optional = true } +re_ws_comms = { workspace = true, optional = true } arrow2 = { workspace = true, features = ["io_ipc", "io_print"] } document-features = "0.2" @@ -62,6 +65,7 @@ once_cell = "1.12" parking_lot = "0.12" pyo3 = { version = "0.18.0", features = ["abi3-py38"] } rand = { version = "0.8", features = ["std_rng"] } +thiserror.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } uuid = "1.1" diff --git a/rerun_py/rerun_sdk/rerun/__init__.py b/rerun_py/rerun_sdk/rerun/__init__.py index f58664c7f72a..5385dda58c9a 100644 --- a/rerun_py/rerun_sdk/rerun/__init__.py +++ b/rerun_py/rerun_sdk/rerun/__init__.py @@ -21,6 +21,7 @@ from rerun.log.tensor import log_tensor from rerun.log.text import LoggingHandler, LogLevel, log_text_entry from rerun.log.transform import log_rigid3, log_unknown_transform, log_view_coordinates +from rerun.recording import MemoryRecording from rerun.script_helpers import script_add_args, script_setup, script_teardown __all__ = [ @@ -29,6 +30,7 @@ "LoggingHandler", "bindings", "components", + "inline_show", "ImageFormat", "log_annotation_context", "log_arrow", @@ -56,6 +58,7 @@ "log_text_entry", "log_unknown_transform", "log_view_coordinates", + "notebook", "LogLevel", "MeshFormat", "RectFormat", @@ -335,7 +338,7 @@ def spawn(port: int = 9876, connect: bool = True) -> None: _spawn = spawn # we need this because Python scoping is horrible -def serve(open_browser: bool = True) -> None: +def serve(open_browser: bool = True, web_port: Optional[int] = None, ws_port: Optional[int] = None) -> None: """ Serve log-data over WebSockets and serve a Rerun web viewer over HTTP. @@ -349,14 +352,42 @@ def serve(open_browser: bool = True) -> None: ---------- open_browser Open the default browser to the viewer. - + web_port: + The port to serve the web viewer on (defaults to 9090). + ws_port: + The port to serve the WebSocket server on (defaults to 9877) """ if not bindings.is_enabled(): print("Rerun is disabled - serve() call ignored") return - bindings.serve(open_browser) + bindings.serve(open_browser, web_port, ws_port) + + +def start_web_viewer_server(port: int = 0) -> None: + """ + Start an HTTP server that hosts the rerun web viewer. + + This only provides the web-server that makes the viewer available and + does not otherwise provide a rerun websocket server or facilitate any routing of + data. + + This is generally only necessary for application such as running a jupyter notebook + in a context where app.rerun.io is unavailable, or does not having the matching + resources for your build (such as when running from source.) + + Parameters + ---------- + port + Port to serve assets on. Defaults to 0 (random port). + """ + + if not bindings.is_enabled(): + print("Rerun is disabled - self_host_assets() call ignored") + return + + bindings.start_web_viewer_server(port) def disconnect() -> None: @@ -382,12 +413,25 @@ def save(path: str) -> None: """ if not bindings.is_enabled(): - print("Rerun is disabled - serve() call ignored") + print("Rerun is disabled - save() call ignored") return bindings.save(path) +def memory_recording() -> MemoryRecording: + """ + Streams all log-data to a memory buffer. + + Returns + ------- + MemoryRecording + A memory recording object that can be used to read the data. + """ + + return MemoryRecording(bindings.memory_recording()) + + def set_time_sequence(timeline: str, sequence: Optional[int]) -> None: """ Set the current time for this thread as an integer sequence. diff --git a/rerun_py/rerun_sdk/rerun/recording.py b/rerun_py/rerun_sdk/rerun/recording.py new file mode 100644 index 000000000000..108321ecfcb4 --- /dev/null +++ b/rerun_py/rerun_sdk/rerun/recording.py @@ -0,0 +1,90 @@ +"""Helper functions for displaying Rerun in a Jupyter notebook.""" + +import base64 +import logging +import random +import string +from typing import Any, Optional + +from rerun import bindings + + +class MemoryRecording: + def __init__(self, storage: bindings.PyMemorySinkStorage) -> None: + self.storage = storage + + def as_html( + self, width: int = 950, height: int = 712, app_location: Optional[str] = None, timeout_ms: int = 2000 + ) -> str: + """ + Show the Rerun viewer in a Jupyter notebook. + + Parameters + ---------- + width : int + The width of the viewer in pixels. + height : int + The height of the viewer in pixels. + app_location : str + The location of the Rerun web viewer. + timeout_ms : int + The number of milliseconds to wait for the Rerun web viewer to load. + """ + + if app_location is None: + app_location = bindings.get_app_url() + + random_string = "".join(random.choice(string.ascii_letters) for i in range(6)) + + base64_data = base64.b64encode(self.storage.get_rrd_as_bytes()).decode("utf-8") + + html_template = f""" + + + + + """ + + return html_template + + def show(self, **kwargs: Any) -> Any: + html = self.as_html(**kwargs) + try: + from IPython.core.display import HTML + + return HTML(html) # type: ignore[no-untyped-call] + except ImportError: + logging.warning("Could not import IPython.core.display. Returning raw HTML string instead.") + return html + + def _repr_html_(self) -> Any: + return self.as_html() diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 2d9c72677619..c7035761d336 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -8,12 +8,13 @@ use itertools::izip; use pyo3::{ exceptions::{PyRuntimeError, PyTypeError, PyValueError}, prelude::*, - types::PyDict, + types::{PyBytes, PyDict}, }; use re_log_types::{ArrowMsg, DataRow, DataTableError}; use rerun::{ log::{PathOp, RowId}, + sink::MemorySinkStorage, time::{Time, TimeInt, TimePoint, TimeType, Timeline}, ApplicationId, EntityPath, RecordingId, }; @@ -29,6 +30,9 @@ pub use rerun::{ coordinates::{Axis3, Handedness, Sign, SignedAxis3}, }; +use re_web_viewer_server::WebViewerServerPort; +use re_ws_comms::RerunServerPort; + use crate::{arrow::get_registered_component_names, python_session::PythonSession}; // ---------------------------------------------------------------------------- @@ -114,6 +118,7 @@ fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> { // TODO(jleibs): Refactor import logic so all we need is main m.add_function(wrap_pyfunction!(get_registered_component_names, m)?)?; m.add_class::()?; + m.add_class::()?; // If this is a special RERUN_APP_ONLY context (launched via .spawn), we // can bypass everything else, which keeps us from preparing an SDK session @@ -135,9 +140,12 @@ fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(connect, m)?)?; m.add_function(wrap_pyfunction!(disconnect, m)?)?; m.add_function(wrap_pyfunction!(flush, m)?)?; + m.add_function(wrap_pyfunction!(get_app_url, m)?)?; m.add_function(wrap_pyfunction!(init, m)?)?; m.add_function(wrap_pyfunction!(is_enabled, m)?)?; + m.add_function(wrap_pyfunction!(memory_recording, m)?)?; m.add_function(wrap_pyfunction!(save, m)?)?; + m.add_function(wrap_pyfunction!(start_web_viewer_server, m)?)?; m.add_function(wrap_pyfunction!(serve, m)?)?; m.add_function(wrap_pyfunction!(set_enabled, m)?)?; m.add_function(wrap_pyfunction!(shutdown, m)?)?; @@ -189,7 +197,6 @@ fn default_recording_id(py: Python<'_>) -> RecordingId { } fn authkey(py: Python<'_>) -> Vec { - use pyo3::types::PyBytes; let locals = PyDict::new(py); py.run( r#" @@ -297,10 +304,18 @@ fn connect(addr: Option) -> PyResult<()> { Ok(()) } +#[must_use = "the tokio_runtime guard must be kept alive while using tokio"] +fn enter_tokio_runtime() -> tokio::runtime::EnterGuard<'static> { + use once_cell::sync::Lazy; + static TOKIO_RUNTIME: Lazy = + Lazy::new(|| tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); + TOKIO_RUNTIME.enter() +} + /// Serve a web-viewer. #[allow(clippy::unnecessary_wraps)] // False positive #[pyfunction] -fn serve(open_browser: bool) -> PyResult<()> { +fn serve(open_browser: bool, web_port: Option, ws_port: Option) -> PyResult<()> { #[cfg(feature = "web_viewer")] { let mut session = python_session(); @@ -310,11 +325,16 @@ fn serve(open_browser: bool) -> PyResult<()> { return Ok(()); } - use once_cell::sync::Lazy; - static TOKIO_RUNTIME: Lazy = - Lazy::new(|| tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); - let _guard = TOKIO_RUNTIME.enter(); - session.set_sink(rerun::web_viewer::new_sink(open_browser)); + let _guard = enter_tokio_runtime(); + + session.set_sink( + rerun::web_viewer::new_sink( + open_browser, + web_port.map(WebViewerServerPort).unwrap_or_default(), + ws_port.map(RerunServerPort).unwrap_or_default(), + ) + .map_err(|err| PyRuntimeError::new_err(err.to_string()))?, + ); Ok(()) } @@ -328,6 +348,33 @@ fn serve(open_browser: bool) -> PyResult<()> { } } +#[pyfunction] +// TODO(jleibs) expose this as a python type +fn start_web_viewer_server(port: u16) -> PyResult<()> { + #[cfg(feature = "web_viewer")] + { + let mut session = python_session(); + let _guard = enter_tokio_runtime(); + session + .start_web_viewer_server(WebViewerServerPort(port)) + .map_err(|err| PyRuntimeError::new_err(err.to_string())) + } + + #[cfg(not(feature = "web_viewer"))] + { + _ = open_browser; + Err(PyRuntimeError::new_err( + "The Rerun SDK was not compiled with the 'web_viewer' feature", + )) + } +} + +#[pyfunction] +fn get_app_url() -> String { + let session = python_session(); + session.get_app_url() +} + #[pyfunction] fn shutdown(py: Python<'_>) { re_log::debug!("Shutting down the Rerun SDK"); @@ -382,6 +429,26 @@ fn save(path: &str) -> PyResult<()> { .map_err(|err| PyRuntimeError::new_err(err.to_string())) } +#[pyclass] +struct PyMemorySinkStorage(MemorySinkStorage); + +#[pymethods] +impl PyMemorySinkStorage { + fn get_rrd_as_bytes<'p>(&self, py: Python<'p>) -> PyResult<&'p PyBytes> { + self.0 + .rrd_as_bytes() + .map(|bytes| PyBytes::new(py, bytes.as_slice())) + .map_err(|err| PyRuntimeError::new_err(err.to_string())) + } +} + +/// Create an in-memory rrd file +#[pyfunction] +fn memory_recording() -> PyMemorySinkStorage { + let mut session = python_session(); + PyMemorySinkStorage(session.memory_recording()) +} + // ---------------------------------------------------------------------------- /// Set the current time globally. Used for all subsequent logging, diff --git a/rerun_py/src/python_session.rs b/rerun_py/src/python_session.rs index e69fb9a5ed65..696857b7b8a6 100644 --- a/rerun_py/src/python_session.rs +++ b/rerun_py/src/python_session.rs @@ -6,10 +6,20 @@ use re_log_types::{ RecordingId, RecordingInfo, RecordingSource, RowId, Time, TimePoint, }; +use re_web_viewer_server::WebViewerServerPort; use rerun::sink::LogSink; - // ---------------------------------------------------------------------------- +#[derive(thiserror::Error, Debug)] +pub enum PythonSessionError { + #[allow(dead_code)] + #[error("The Rerun SDK was not compiled with the '{0}' feature")] + FeatureNotEnabled(&'static str), + + #[error("Could not start the WebViewerServer: '{0}'")] + WebViewerServerError(#[from] re_web_viewer_server::WebViewerServerError), +} + /// Used to construct a [`RecordingInfo`]: struct RecordingMetaData { recording_source: RecordingSource, @@ -68,6 +78,13 @@ pub struct PythonSession { /// Where we put the log messages. sink: Box, + + build_info: re_build_info::BuildInfo, + + /// Used to serve the web viewer assets. + /// TODO(jleibs): Potentially use this for serve as well + #[cfg(feature = "web_viewer")] + web_viewer_server: Option, } impl Default for PythonSession { @@ -78,6 +95,9 @@ impl Default for PythonSession { has_sent_begin_recording_msg: false, recording_meta_data: Default::default(), sink: Box::new(rerun::sink::BufferedSink::new()), + build_info: re_build_info::build_info!(), + #[cfg(feature = "web_viewer")] + web_viewer_server: None, } } } @@ -183,7 +203,7 @@ impl PythonSession { self.set_sink(Box::new(rerun::sink::TcpSink::new(addr))); } - /// Drains all pending log messages and saves them to disk into an rrd file. + /// Send all pending and future log messages to disk as an rrd file pub fn save( &mut self, path: impl Into, @@ -197,9 +217,26 @@ impl PythonSession { Ok(()) } + /// Send all pending and future log messages to an in-memory store + pub fn memory_recording(&mut self) -> rerun::sink::MemorySinkStorage { + if !self.enabled { + re_log::debug!("Rerun disabled - call to memory_recording() ignored"); + return Default::default(); + } + + let memory_sink = rerun::sink::MemorySink::default(); + let buffer = memory_sink.buffer(); + + self.set_sink(Box::new(memory_sink)); + self.has_sent_begin_recording_msg = false; + + buffer + } + /// Disconnects any TCP connection, shuts down any server, and closes any file. pub fn disconnect(&mut self) { self.set_sink(Box::new(rerun::sink::BufferedSink::new())); + self.has_sent_begin_recording_msg = false; } /// Wait until all logged data have been sent to the remove server (if any). @@ -271,4 +308,32 @@ impl PythonSession { }, )); } + + /// Get a url to an instance of the web-viewer + /// + /// This may point to app.rerun.io or localhost depending on + /// whether `host_assets` was called. + pub fn get_app_url(&self) -> String { + #[cfg(feature = "web_viewer")] + if let Some(hosted_assets) = &self.web_viewer_server { + return format!("http://localhost:{}", hosted_assets.port()); + } + + let short_git_hash = &self.build_info.git_hash[..7]; + format!("https://app.rerun.io/commit/{short_git_hash}") + } + + /// Start a web server to host the run web-asserts + /// + /// The caller needs to ensure that there is a `tokio` runtime running. + #[allow(clippy::unnecessary_wraps)] + #[cfg(feature = "web_viewer")] + pub fn start_web_viewer_server( + &mut self, + _web_port: WebViewerServerPort, + ) -> Result<(), PythonSessionError> { + self.web_viewer_server = Some(re_web_viewer_server::WebViewerServerHandle::new(_web_port)?); + + Ok(()) + } } diff --git a/web_viewer/index.html b/web_viewer/index.html index ab1b610c96b8..23694aaef5b3 100644 --- a/web_viewer/index.html +++ b/web_viewer/index.html @@ -178,6 +178,10 @@ console.debug("app started."); document.getElementById("center_text").remove(); + + if (window.location !== window.parent.location) { + window.parent.postMessage("READY", "*"); + } } function determine_url() { diff --git a/web_viewer/index_bundled.html b/web_viewer/index_bundled.html index 212ad4f84c62..decce42c663a 100644 --- a/web_viewer/index_bundled.html +++ b/web_viewer/index_bundled.html @@ -176,6 +176,10 @@ console.debug("app started."); document.getElementById("center_text").remove(); + + if (window.location !== window.parent.location) { + window.parent.postMessage("READY", "*"); + } } function on_wasm_error(error) {