diff --git a/Cargo.lock b/Cargo.lock index 1d8d51aa53ae..ac50a526c3e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4066,6 +4066,7 @@ name = "re_sdk" version = "0.6.0-alpha.0" dependencies = [ "arrow2_convert", + "crossbeam", "document-features", "ndarray", "ndarray-rand", @@ -4078,6 +4079,7 @@ dependencies = [ "re_log_types", "re_memory", "re_sdk_comms", + "similar-asserts", "thiserror", ] diff --git a/crates/re_arrow_store/src/polars_util.rs b/crates/re_arrow_store/src/polars_util.rs index 4e4beabbb69c..e70aa8541942 100644 --- a/crates/re_arrow_store/src/polars_util.rs +++ b/crates/re_arrow_store/src/polars_util.rs @@ -197,7 +197,7 @@ pub fn range_components<'a, const N: usize>( // --- Joins --- -// TODO(#1619): none of this mess should be here +// TODO(#1759): none of this mess should be here pub fn dataframe_from_cells( cells: &[Option; N], diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs index a3255657e61b..e83690a478cf 100644 --- a/crates/re_arrow_store/src/store_dump.rs +++ b/crates/re_arrow_store/src/store_dump.rs @@ -1,6 +1,6 @@ -use ahash::HashMapExt; +use std::collections::BTreeMap; + use arrow2::Either; -use nohash_hasher::IntMap; use re_log_types::{ DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline, }; @@ -51,7 +51,7 @@ impl DataStore { .take(table.num_rows() as _) .collect(), col_num_instances: col_num_instances.clone(), - columns: columns.clone(), // shallow + columns: columns.clone().into_iter().collect(), // shallow } }) } @@ -92,7 +92,7 @@ impl DataStore { .take(col_row_id.len()) .collect(), col_num_instances: col_num_instances.clone(), - columns: columns.clone(), // shallow + columns: columns.clone().into_iter().collect(), // shallow } }) }) @@ -163,7 +163,7 @@ impl DataStore { let col_num_instances = filter_column(col_time, col_num_instances.iter(), time_filter).collect(); - let mut columns2 = IntMap::with_capacity(columns.len()); + let mut columns2 = BTreeMap::default(); for (component, column) in columns { let column = filter_column(col_time, column.iter(), time_filter).collect(); columns2.insert(*component, DataCellColumn(column)); diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 00a233c70d6f..73ad746c48c4 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -17,7 +17,7 @@ use crate::{ }; // TODO(#1692): all of this stuff should be defined by Data{Cell,Row,Table}, not the store. -// TODO(#1619): remove this and reimplement it on top of store serialization +// TODO(#1759): remove this and reimplement it on top of store serialization // --- diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 677310e5043d..bc2413678027 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -15,9 +15,8 @@ use crate::{ IndexedTable, PersistentIndexedTable, }; -// TODO(#1619): -// - The store should insert column-per-column rather than row-per-row (purely a performance -// matter) +// TODO(cmc): the store should insert column-per-column rather than row-per-row (purely a +// performance matter). // --- Data store --- diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 5ba2a722bf80..b7d62a44d425 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -24,8 +24,6 @@ use re_log_types::{ Timeline, }; -// TODO(#1619): introduce batching in the testing matrix - // --- LatestComponentsAt --- #[test] diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index 9af248b9c8e0..92dc5f7b86cc 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -61,7 +61,7 @@ impl EntityDb { let mut table = DataTable::from_arrow_msg(msg)?; table.compute_all_size_bytes(); - // TODO(#1619): batch all of this + // TODO(cmc): batch all of this for row in table.to_rows() { self.try_add_data_row(&row)?; } diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index b83e24266d29..e81b52777e66 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -11,9 +11,6 @@ use arrow2::{array::Array, chunk::Chunk, datatypes::Schema}; #[derive(Clone, Debug, PartialEq)] pub struct ArrowMsg { /// Unique identifier for the [`crate::DataTable`] in this message. - /// - /// NOTE(#1619): While we're in the process of transitioning towards end-to-end batching, the - /// `table_id` is always the same as the `row_id` as the first and only row. pub table_id: TableId, /// The maximum values for all timelines across the entire batch of data. diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs index d8b19a9eabc4..56f39a55863c 100644 --- a/crates/re_log_types/src/data_cell.rs +++ b/crates/re_log_types/src/data_cell.rs @@ -134,7 +134,7 @@ pub struct DataCellInner { } // TODO(cmc): We should be able to build a cell from non-reference types. -// TODO(#1619): We shouldn't have to specify the component name separately, this should be +// TODO(#1696): We shouldn't have to specify the component name separately, this should be // part of the metadata by using an extension. // TODO(#1696): Check that the array is indeed a leaf / component type when building a cell from an // arrow payload. @@ -533,6 +533,9 @@ impl DataCell { inner.compute_size_bytes(); return true; } + + re_log::error_once!("cell size could _not_ be computed"); + false } } diff --git a/crates/re_log_types/src/data_row.rs b/crates/re_log_types/src/data_row.rs index 1850c8b913c8..bd780e01188f 100644 --- a/crates/re_log_types/src/data_row.rs +++ b/crates/re_log_types/src/data_row.rs @@ -127,12 +127,6 @@ impl RowId { pub fn random() -> Self { Self(re_tuid::Tuid::random()) } - - /// Temporary utility while we transition to batching. See #1619. - #[doc(hidden)] - pub fn into_table_id(self) -> TableId { - TableId(self.0) - } } impl SizeBytes for RowId { @@ -322,13 +316,10 @@ impl DataRow { Self::try_from_cells(row_id, timepoint, entity_path, num_instances, cells).unwrap() } - /// Turns the `DataRow` into a single-row [`DataTable`] that carries the same ID. - /// - /// This only makes sense as part of our transition to batching. See #1619. - #[doc(hidden)] + /// Turns the `DataRow` into a single-row [`DataTable`]. #[inline] pub fn into_table(self) -> DataTable { - DataTable::from_rows(self.row_id.into_table_id(), [self]) + DataTable::from_rows(TableId::random(), [self]) } } diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index d4fffde8f947..30d2fcf84ba5 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use ahash::HashMap; use itertools::Itertools as _; -use nohash_hasher::{IntMap, IntSet}; +use nohash_hasher::IntSet; use smallvec::SmallVec; use crate::{ @@ -156,12 +156,6 @@ impl TableId { pub fn random() -> Self { Self(re_tuid::Tuid::random()) } - - /// Temporary utility while we transition to batching. See #1619. - #[doc(hidden)] - pub fn into_row_id(self) -> RowId { - RowId(self.0) - } } impl SizeBytes for TableId { @@ -353,7 +347,7 @@ pub struct DataTable { /// /// The cells are optional since not all rows will have data for every single component /// (i.e. the table is sparse). - pub columns: IntMap, + pub columns: BTreeMap, } impl DataTable { @@ -424,7 +418,7 @@ impl DataTable { } // Pre-allocate all columns (one per component). - let mut columns = IntMap::default(); + let mut columns = BTreeMap::default(); for component in components { columns.insert( component, @@ -441,12 +435,6 @@ impl DataTable { } } - if col_row_id.len() > 1 { - re_log::warn_once!( - "batching features are not ready for use, use single-row data tables instead!" - ); - } - Self { table_id, col_row_id, diff --git a/crates/re_log_types/src/datagen.rs b/crates/re_log_types/src/datagen.rs index 153545bc05d8..3cc2f144dc05 100644 --- a/crates/re_log_types/src/datagen.rs +++ b/crates/re_log_types/src/datagen.rs @@ -1,6 +1,6 @@ //! Generate random data for tests and benchmarks. -// TODO(#1619): It really is time for whole module to disappear. +// TODO(#1810): It really is time for whole module to disappear. use crate::{ component_types::{self, InstanceKey}, diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index 22e6f734b3f2..a3f0d5df77f4 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -195,18 +195,6 @@ pub enum LogMsg { } impl LogMsg { - pub fn id(&self) -> RowId { - match self { - Self::BeginRecordingMsg(msg) => msg.row_id, - Self::EntityPathOpMsg(_, msg) => msg.row_id, - Self::Goodbye(row_id) => *row_id, - // TODO(#1619): the following only makes sense because, while we support sending and - // receiving batches, we don't actually do so yet. - // We need to stop storing raw `LogMsg`s before we can benefit from our batching. - Self::ArrowMsg(_, msg) => msg.table_id.into_row_id(), - } - } - pub fn recording_id(&self) -> Option<&RecordingId> { match self { Self::BeginRecordingMsg(msg) => Some(&msg.info.recording_id), diff --git a/crates/re_sdk/Cargo.toml b/crates/re_sdk/Cargo.toml index ba8501c9d4a5..430a60322e9a 100644 --- a/crates/re_sdk/Cargo.toml +++ b/crates/re_sdk/Cargo.toml @@ -41,6 +41,7 @@ re_log.workspace = true re_memory.workspace = true re_sdk_comms = { workspace = true, features = ["client"] } +crossbeam.workspace = true document-features = "0.2" parking_lot.workspace = true thiserror.workspace = true @@ -54,6 +55,7 @@ arrow2_convert.workspace = true ndarray.workspace = true ndarray-rand = "0.14" rand = "0.8" +similar-asserts = "1.4.2" [build-dependencies] diff --git a/crates/re_sdk/src/global.rs b/crates/re_sdk/src/global.rs deleted file mode 100644 index 9e4d6d398a3c..000000000000 --- a/crates/re_sdk/src/global.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::session::Session; - -/// Access a global [`Session`] singleton for convenient logging. -/// -/// The default [`Session`] is a disabled dummy-session that ignore all log calls, -/// so you need to explicitly set the global session for it to be useful -/// -/// Example usage: -/// -/// ``` -/// use re_sdk::{global_session, SessionBuilder, default_server_addr}; -/// -/// *global_session() = SessionBuilder::new("my_app").connect(default_server_addr()); -/// -/// // Call code that logs using `global_session`. -/// ``` -pub fn global_session() -> parking_lot::MutexGuard<'static, Session> { - use once_cell::sync::OnceCell; - use parking_lot::Mutex; - static INSTANCE: OnceCell> = OnceCell::new(); - let mutex = INSTANCE.get_or_init(|| Mutex::new(Session::disabled())); - mutex.lock() -} diff --git a/crates/re_sdk/src/lib.rs b/crates/re_sdk/src/lib.rs index a7e7fe27607f..fcf8e5c31f6c 100644 --- a/crates/re_sdk/src/lib.rs +++ b/crates/re_sdk/src/lib.rs @@ -9,21 +9,15 @@ // ---------------- // Private modules: -#[cfg(feature = "global_session")] -mod global; - mod log_sink; mod msg_sender; -mod session; +mod recording_stream; // ------------- // Public items: -#[cfg(feature = "global_session")] -pub use self::global::global_session; - pub use self::msg_sender::{MsgSender, MsgSenderError}; -pub use self::session::{Session, SessionBuilder}; +pub use self::recording_stream::{RecordingStream, RecordingStreamBuilder}; pub use re_sdk_comms::default_server_addr; @@ -36,6 +30,9 @@ impl crate::sink::LogSink for re_log_encoding::FileSink { fn send(&self, msg: re_log_types::LogMsg) { re_log_encoding::FileSink::send(self, msg); } + + #[inline] + fn flush_blocking(&self) {} } // --------------- @@ -49,9 +46,7 @@ pub mod demo_util; /// This is how you select whether the log stream ends up /// sent over TCP, written to file, etc. pub mod sink { - pub use crate::log_sink::{ - disabled, BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink, - }; + pub use crate::log_sink::{BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink}; #[cfg(not(target_arch = "wasm32"))] pub use re_log_encoding::{FileSink, FileSinkError}; @@ -153,7 +148,7 @@ pub fn decide_logging_enabled(default_enabled: bool) -> bool { // ---------------------------------------------------------------------------- -/// Creates a new [`re_log_types::RecordingInfo`] which can be used with [`Session::new`]. +/// Creates a new [`re_log_types::RecordingInfo`] which can be used with [`RecordingStream::new`]. #[track_caller] // track_caller so that we can see if we are being called from an official example. pub fn new_recording_info( application_id: impl Into, diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index 639c682aaf21..5719e58090cc 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -1,3 +1,6 @@ +use std::sync::Arc; + +use parking_lot::RwLock; use re_log_types::LogMsg; /// Where the SDK sends its log messages. @@ -6,6 +9,7 @@ pub trait LogSink: Send + Sync + 'static { fn send(&self, msg: LogMsg); /// Send all these log messages. + #[inline] fn send_all(&self, messages: Vec) { for msg in messages { self.send(msg); @@ -13,40 +17,22 @@ pub trait LogSink: Send + Sync + 'static { } /// Drain all buffered [`LogMsg`]es and return them. + /// + /// Only applies to sinks that maintain a backlog. + #[inline] fn drain_backlog(&self) -> Vec { vec![] } - /// Wait until all logged data have been sent to the remove server (if any). - fn flush(&self) {} - - /// If the TCP session is disconnected, allow it to quit early and drop unsent messages. - fn drop_msgs_if_disconnected(&self) {} - - /// Returns `false` if this sink just discards all messages. - fn is_enabled(&self) -> bool { - true - } -} - -// ---------------------------------------------------------------------------- - -struct DisabledSink; - -impl LogSink for DisabledSink { - fn send(&self, _msg: LogMsg) { - // It's intended that the logging SDK should drop messages earlier than this if logging is disabled. - re_log::debug_once!("Logging is disabled, dropping message(s)."); - } - - fn is_enabled(&self) -> bool { - false - } -} + /// Blocks until all pending data in the sink's send buffers has been fully flushed. + /// + /// See also [`LogSink::drop_if_disconnected`]. + fn flush_blocking(&self); -/// A sink that does nothing. All log messages are just dropped. -pub fn disabled() -> Box { - Box::new(DisabledSink) + /// Drops all pending data currently sitting in the sink's send buffers if it is unable to + /// flush it for any reason (e.g. a broken TCP connection for a [`TcpSink`]). + #[inline] + fn drop_if_disconnected(&self) {} } // ---------------------------------------------------------------------------- @@ -57,26 +43,33 @@ pub struct BufferedSink(parking_lot::Mutex>); impl BufferedSink { /// An empty buffer. + #[inline] pub fn new() -> Self { Self::default() } } impl LogSink for BufferedSink { + #[inline] fn send(&self, msg: LogMsg) { self.0.lock().push(msg); } + #[inline] fn send_all(&self, mut messages: Vec) { self.0.lock().append(&mut messages); } + #[inline] fn drain_backlog(&self) -> Vec { std::mem::take(&mut self.0.lock()) } + + #[inline] + fn flush_blocking(&self) {} } -/// Store log messages directly in memory +/// Store log messages directly in memory. /// /// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating /// a new sink the logged messages stay with the `MemorySink` (`drain_backlog` does nothing). @@ -88,37 +81,55 @@ pub struct MemorySink(MemorySinkStorage); impl MemorySink { /// Access the raw `MemorySinkStorage` + #[inline] pub fn buffer(&self) -> MemorySinkStorage { self.0.clone() } } impl LogSink for MemorySink { + #[inline] fn send(&self, msg: LogMsg) { - self.0.lock().push(msg); + self.0.write().push(msg); } + #[inline] fn send_all(&self, mut messages: Vec) { - self.0.lock().append(&mut messages); + self.0.write().append(&mut messages); } + + #[inline] + fn flush_blocking(&self) {} } -/// The storage used by [`MemorySink`] +/// The storage used by [`MemorySink`]. #[derive(Default, Clone)] -pub struct MemorySinkStorage(std::sync::Arc>>); +pub struct MemorySinkStorage(Arc>>); -/// impl MemorySinkStorage { - /// Lock the contained buffer - fn lock(&self) -> parking_lot::MutexGuard<'_, Vec> { - self.0.lock() + /// Write access to the inner array of [`LogMsg`]. + #[inline] + fn write(&self) -> parking_lot::RwLockWriteGuard<'_, Vec> { + self.0.write() + } + + /// Read access to the inner array of [`LogMsg`]. + #[inline] + pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, Vec> { + self.0.read() + } + + /// Consumes and returns the inner array of [`LogMsg`]. + #[inline] + pub fn take(&self) -> Vec { + std::mem::take(&mut *self.0.write()) } - /// Convert the stored messages into an in-memory Rerun log file + /// Convert the stored messages into an in-memory Rerun log file. + #[inline] pub fn rrd_as_bytes(&self) -> Result, re_log_encoding::encoder::EncodeError> { - let messages = self.lock(); let mut buffer = std::io::Cursor::new(Vec::new()); - re_log_encoding::encoder::encode(messages.iter(), &mut buffer)?; + re_log_encoding::encoder::encode(self.read().iter(), &mut buffer)?; Ok(buffer.into_inner()) } } @@ -133,6 +144,7 @@ pub struct TcpSink { impl TcpSink { /// Connect to the given address in a background thread. /// Retries until successful. + #[inline] pub fn new(addr: std::net::SocketAddr) -> Self { Self { client: re_sdk_comms::Client::new(addr), @@ -141,15 +153,18 @@ impl TcpSink { } impl LogSink for TcpSink { + #[inline] fn send(&self, msg: LogMsg) { self.client.send(msg); } - fn flush(&self) { + #[inline] + fn flush_blocking(&self) { self.client.flush(); } - fn drop_msgs_if_disconnected(&self) { + #[inline] + fn drop_if_disconnected(&self) { self.client.drop_if_disconnected(); } } diff --git a/crates/re_sdk/src/msg_sender.rs b/crates/re_sdk/src/msg_sender.rs index 0bd6832a3f2f..438fd7a54764 100644 --- a/crates/re_sdk/src/msg_sender.rs +++ b/crates/re_sdk/src/msg_sender.rs @@ -1,17 +1,12 @@ -use std::borrow::Borrow; - -use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RecordingId, RowId}; +use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RowId}; use crate::{ components::Transform, - log::{DataCell, LogMsg}, - sink::LogSink, + log::DataCell, time::{Time, TimeInt, TimePoint, Timeline}, - Component, EntityPath, SerializableComponent, Session, + Component, EntityPath, RecordingStream, SerializableComponent, }; -// TODO(#1619): Rust SDK batching - // --- /// Errors that can occur when constructing or sending messages @@ -35,7 +30,7 @@ pub enum MsgSenderError { /// /// ```ignore /// fn log_coordinate_space( -/// session: &Session, +/// rec_stream: &RecordingStream, /// ent_path: impl Into, /// axes: &str, /// ) -> anyhow::Result<()> { @@ -46,15 +41,11 @@ pub enum MsgSenderError { /// MsgSender::new(ent_path) /// .with_timeless(true) /// .with_component(&[view_coords])? -/// .send(session) +/// .send(rec_stream) /// .map_err(Into::into) /// } /// ``` -// TODO(#1619): this whole thing needs to be rethought to incorporate batching and datatables. pub struct MsgSender { - // TODO(cmc): At the moment, a `MsgBundle` can only contain data for a single entity, so - // this must be known as soon as we spawn the builder. - // This won't be true anymore once batch insertions land. entity_path: EntityPath, /// All the different timestamps for this message. @@ -166,8 +157,6 @@ impl MsgSender { /// The SDK does not yet support batch insertions, which are semantically identical to adding /// the same component type multiple times in a single message. /// Doing so will return an error when trying to `send()` the message. - // - // TODO(#589): batch insertions pub fn with_component<'a, C: SerializableComponent>( mut self, data: impl IntoIterator, @@ -200,8 +189,6 @@ impl MsgSender { /// The SDK does not yet support batch insertions, which are semantically identical to adding /// the same component type multiple times in a single message. /// Doing so will return an error when trying to `send()` the message. - // - // TODO(#589): batch insertions pub fn with_splat(mut self, data: C) -> Result { if C::name() == InstanceKey::name() { return Err(MsgSenderError::SplattedInstanceKeys); @@ -231,42 +218,23 @@ impl MsgSender { /// Consumes, packs, sanity checks and finally sends the message to the currently configured /// target of the SDK. - pub fn send(self, session: &Session) -> Result<(), DataTableError> { - self.send_to_sink(session.recording_id(), session.borrow()) - } - - /// Consumes, packs, sanity checks and finally sends the message to the currently configured - /// target of the SDK. - fn send_to_sink( - self, - recording_id: RecordingId, - sink: &dyn LogSink, - ) -> Result<(), DataTableError> { - if !sink.is_enabled() { + pub fn send(self, rec_stream: &RecordingStream) -> Result<(), DataTableError> { + if !rec_stream.is_enabled() { return Ok(()); // silently drop the message } let [row_standard, row_transforms, row_splats] = self.into_rows(); if let Some(row_transforms) = row_transforms { - sink.send(LogMsg::ArrowMsg( - recording_id, - row_transforms.into_table().to_arrow_msg()?, - )); + rec_stream.record_row(row_transforms); } if let Some(row_splats) = row_splats { - sink.send(LogMsg::ArrowMsg( - recording_id, - row_splats.into_table().to_arrow_msg()?, - )); + rec_stream.record_row(row_splats); } // Always the primary component last so range-based queries will include the other data. // Since the primary component can't be splatted it must be in msg_standard, see(#1215). if let Some(row_standard) = row_standard { - sink.send(LogMsg::ArrowMsg( - recording_id, - row_standard.into_table().to_arrow_msg()?, - )); + rec_stream.record_row(row_standard); } Ok(()) diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs new file mode 100644 index 000000000000..2c77ddfe4548 --- /dev/null +++ b/crates/re_sdk/src/recording_stream.rs @@ -0,0 +1,1014 @@ +use std::sync::Arc; + +use crossbeam::channel::{Receiver, Sender}; +use re_log_types::{ + ApplicationId, DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig, + DataTableBatcherError, LogMsg, RecordingId, RecordingInfo, RecordingSource, Time, +}; + +use crate::sink::{LogSink, MemorySinkStorage}; + +// --- + +/// Errors that can occur when creating/manipulating a [`RecordingStream`]. +#[derive(thiserror::Error, Debug)] +pub enum RecordingStreamError { + /// Error within the underlying file sink. + #[error("Failed to create the underlying file sink: {0}")] + FileSink(#[from] re_log_encoding::FileSinkError), + + /// Error within the underlying table batcher. + #[error("Failed to spawn the underlying batcher: {0}")] + DataTableBatcher(#[from] DataTableBatcherError), + + /// Error spawning one of the background threads. + #[error("Failed to spawn background thread '{name}': {err}")] + SpawnThread { + name: &'static str, + err: Box, + }, +} + +pub type RecordingStreamResult = Result; + +// --- + +/// Construct a [`RecordingStream`]. +/// +/// ``` no_run +/// # use re_sdk::RecordingStreamBuilder; +/// let rec_stream = RecordingStreamBuilder::new("my_app").save("my_recording.rrd")?; +/// # Ok::<(), Box>(()) +/// ``` +pub struct RecordingStreamBuilder { + application_id: ApplicationId, + recording_id: Option, + recording_source: Option, + + default_enabled: bool, + enabled: Option, + + batcher_config: Option, + + is_official_example: bool, +} + +impl RecordingStreamBuilder { + /// Create a new [`RecordingStreamBuilder`] with the given [`ApplicationId`]. + /// + /// The [`ApplicationId`] is usually the name of your app. + /// + /// ```no_run + /// # use re_sdk::RecordingStreamBuilder; + /// let rec_stream = RecordingStreamBuilder::new("my_app").save("my_recording.rrd")?; + /// # Ok::<(), Box>(()) + /// ``` + // + // NOTE: track_caller so that we can see if we are being called from an official example. + #[track_caller] + pub fn new(application_id: impl Into) -> Self { + let application_id = application_id.into(); + let is_official_example = crate::called_from_official_rust_example(); + + Self { + application_id, + recording_id: None, + recording_source: None, + + default_enabled: true, + enabled: None, + + batcher_config: None, + is_official_example, + } + } + + /// Set whether or not Rerun is enabled by default. + /// + /// If the `RERUN` environment variable is set, it will override this. + /// + /// Set also: [`Self::enabled`]. + pub fn default_enabled(mut self, default_enabled: bool) -> Self { + self.default_enabled = default_enabled; + self + } + + /// Set whether or not Rerun is enabled. + /// + /// Setting this will ignore the `RERUN` environment variable. + /// + /// Set also: [`Self::default_enabled`]. + pub fn enabled(mut self, enabled: bool) -> Self { + self.enabled = Some(enabled); + self + } + + /// Set the [`RecordingId`] for this context. + /// + /// If you're logging from multiple processes and want all the messages to end up as the same + /// recording, you must make sure they all set the same [`RecordingId`] using this function. + /// + /// Note that many recordings can share the same [`ApplicationId`], but they all have + /// unique [`RecordingId`]s. + /// + /// The default is to use a random [`RecordingId`]. + pub fn recording_id(mut self, recording_id: RecordingId) -> Self { + self.recording_id = Some(recording_id); + self + } + + /// Specifies the configuration of the internal data batching mechanism. + /// + /// See [`DataTableBatcher`] & [`DataTableBatcherConfig`] for more information. + pub fn batcher_config(mut self, config: DataTableBatcherConfig) -> Self { + self.batcher_config = Some(config); + self + } + + #[doc(hidden)] + pub fn recording_source(mut self, recording_source: RecordingSource) -> Self { + self.recording_source = Some(recording_source); + self + } + + #[doc(hidden)] + pub fn is_official_example(mut self, is_official_example: bool) -> Self { + self.is_official_example = is_official_example; + self + } + + /// Creates a new [`RecordingStream`] that starts in a buffering state (RAM). + /// + /// ## Example + /// + /// ```no_run + /// let rec_stream = re_sdk::RecordingStreamBuilder::new("my_app").buffered()?; + /// # Ok::<(), Box>(()) + /// ``` + pub fn buffered(self) -> RecordingStreamResult { + let (enabled, recording_info, batcher_config) = self.into_args(); + if enabled { + RecordingStream::new( + recording_info, + batcher_config, + Box::new(crate::log_sink::BufferedSink::new()), + ) + } else { + re_log::debug!("Rerun disabled - call to buffered() ignored"); + Ok(RecordingStream::disabled()) + } + } + + /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a + /// [`crate::log_sink::MemorySink`]. + /// + /// ## Example + /// + /// ```no_run + /// let (rec_stream, storage) = re_sdk::RecordingStreamBuilder::new("my_app").memory()?; + /// # Ok::<(), Box>(()) + /// ``` + pub fn memory( + self, + ) -> RecordingStreamResult<(RecordingStream, crate::log_sink::MemorySinkStorage)> { + let sink = crate::log_sink::MemorySink::default(); + let storage = sink.buffer(); + + let (enabled, recording_info, batcher_config) = self.into_args(); + if enabled { + RecordingStream::new(recording_info, batcher_config, Box::new(sink)) + .map(|rec_stream| (rec_stream, storage)) + } else { + re_log::debug!("Rerun disabled - call to memory() ignored"); + Ok((RecordingStream::disabled(), Default::default())) + } + } + + /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a + /// remote Rerun instance. + /// + /// ## Example + /// + /// ```no_run + /// let rec_stream = re_sdk::RecordingStreamBuilder::new("my_app") + /// .connect(re_sdk::default_server_addr())?; + /// # Ok::<(), Box>(()) + /// ``` + pub fn connect(self, addr: std::net::SocketAddr) -> RecordingStreamResult { + let (enabled, recording_info, batcher_config) = self.into_args(); + if enabled { + RecordingStream::new( + recording_info, + batcher_config, + Box::new(crate::log_sink::TcpSink::new(addr)), + ) + } else { + re_log::debug!("Rerun disabled - call to connect() ignored"); + Ok(RecordingStream::disabled()) + } + } + + /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to an + /// RRD file on disk. + /// + /// ## Example + /// + /// ```no_run + /// let rec_stream = re_sdk::RecordingStreamBuilder::new("my_app").save("my_recording.rrd")?; + /// # Ok::<(), Box>(()) + /// ``` + #[cfg(not(target_arch = "wasm32"))] + pub fn save( + self, + path: impl Into, + ) -> RecordingStreamResult { + let (enabled, recording_info, batcher_config) = self.into_args(); + + if enabled { + RecordingStream::new( + recording_info, + batcher_config, + Box::new(crate::sink::FileSink::new(path)?), + ) + } else { + re_log::debug!("Rerun disabled - call to save() ignored"); + Ok(RecordingStream::disabled()) + } + } + + /// Returns whether or not logging is enabled, a [`RecordingInfo`] and the associated batcher + /// configuration. + /// + /// This can be used to then construct a [`RecordingStream`] manually using + /// [`RecordingStream::new`]. + pub fn into_args(self) -> (bool, RecordingInfo, DataTableBatcherConfig) { + let Self { + application_id, + recording_id, + recording_source, + default_enabled, + enabled, + batcher_config, + is_official_example, + } = self; + + let enabled = enabled.unwrap_or_else(|| crate::decide_logging_enabled(default_enabled)); + let recording_id = recording_id.unwrap_or_else(RecordingId::random); + let recording_source = recording_source.unwrap_or_else(|| RecordingSource::RustSdk { + rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(), + llvm_version: env!("RE_BUILD_LLVM_VERSION").into(), + }); + + let recording_info = RecordingInfo { + application_id, + recording_id, + is_official_example, + started: Time::now(), + recording_source, + }; + + let batcher_config = batcher_config + .unwrap_or_else(|| DataTableBatcherConfig::from_env().unwrap_or_default()); + + (enabled, recording_info, batcher_config) + } +} + +// ---------------------------------------------------------------------------- + +/// A [`RecordingStream`] handles everything related to logging data into Rerun. +/// +/// You can construct a new [`RecordingStream`] using [`RecordingStreamBuilder`] or +/// [`RecordingStream::new`]. +/// +/// ## Sinks +/// +/// Data is logged into Rerun via [`LogSink`]s. +/// +/// The underlying [`LogSink`] of a [`RecordingStream`] can be changed at any point during its +/// lifetime by calling [`RecordingStream::set_sink`] or one of the higher level helpers +/// ([`RecordingStream::connect`], [`RecordingStream::memory`], +/// [`RecordingStream::save`], [`RecordingStream::disconnect`]). +/// +/// See [`RecordingStream::set_sink`] for more information. +/// +/// ## Multithreading and ordering +/// +/// [`RecordingStream`] can be cheaply cloned and used freely across any number of threads. +/// +/// Internally, all operations are linearized into a pipeline: +/// - All operations sent by a given thread will take effect in the same exact order as that +/// thread originally sent them in, from its point of view. +/// - There isn't any well defined global order across multiple threads. +/// +/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all +/// previous data sent by the calling thread has been recorded; no more, no less. +/// +/// ## Shutdown +/// +/// The [`RecordingStream`] can only be shutdown by dropping all instances of it, at which point +/// it will automatically take care of flushing any pending data that might remain in the pipeline. +/// +/// Shutting down cannot ever block. +#[derive(Clone)] +pub struct RecordingStream { + inner: Arc>, +} + +struct RecordingStreamInner { + info: RecordingInfo, + + /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed, + /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's + /// running. + cmds_tx: Sender, + + batcher: DataTableBatcher, + batcher_to_sink_handle: Option>, + // + // TODO(emilk): add convenience `TimePoint` here so that users can + // do things like `session.set_time_sequence("frame", frame_idx);` +} + +impl Drop for RecordingStreamInner { + fn drop(&mut self) { + // NOTE: The command channel is private, if we're here, nothing is currently capable of + // sending data down the pipeline. + self.batcher.flush_blocking(); + self.cmds_tx.send(Command::PopPendingTables).ok(); + self.cmds_tx.send(Command::Shutdown).ok(); + if let Some(handle) = self.batcher_to_sink_handle.take() { + handle.join().ok(); + } + } +} + +impl RecordingStreamInner { + fn new( + info: RecordingInfo, + batcher_config: DataTableBatcherConfig, + sink: Box, + ) -> RecordingStreamResult { + let batcher = DataTableBatcher::new(batcher_config)?; + + // TODO(cmc): BeginRecordingMsg is a misnomer; it's idempotent. + { + re_log::debug!( + app_id = %info.application_id, + rec_id = %info.recording_id, + "setting recording info", + ); + sink.send( + re_log_types::BeginRecordingMsg { + row_id: re_log_types::RowId::random(), + info: info.clone(), + } + .into(), + ); + } + + let (cmds_tx, cmds_rx) = crossbeam::channel::unbounded(); + + let batcher_to_sink_handle = { + const NAME: &str = "RecordingStream::batcher_to_sink"; + std::thread::Builder::new() + .name(NAME.into()) + .spawn({ + let info = info.clone(); + let batcher = batcher.clone(); + move || forwarding_thread(info, sink, cmds_rx, batcher.tables()) + }) + .map_err(|err| RecordingStreamError::SpawnThread { + name: NAME, + err: Box::new(err), + })? + }; + + Ok(RecordingStreamInner { + info, + cmds_tx, + batcher, + batcher_to_sink_handle: Some(batcher_to_sink_handle), + }) + } +} + +enum Command { + RecordMsg(LogMsg), + SwapSink(Box), + Flush(Sender<()>), + PopPendingTables, + Shutdown, +} + +impl Command { + fn flush() -> (Self, Receiver<()>) { + let (tx, rx) = crossbeam::channel::bounded(0); // oneshot + (Self::Flush(tx), rx) + } +} + +impl RecordingStream { + /// Creates a new [`RecordingStream`] with a given [`RecordingInfo`] and [`LogSink`]. + /// + /// You can create a [`RecordingInfo`] with [`crate::new_recording_info`]; + /// + /// The [`RecordingInfo`] is immediately sent to the sink in the form of a + /// [`re_log_types::BeginRecordingMsg`]. + /// + /// You can find sinks in [`crate::sink`]. + /// + /// See also: [`RecordingStreamBuilder`]. + #[must_use = "Recording will get closed automatically once all instances of this object have been dropped"] + pub fn new( + info: RecordingInfo, + batcher_config: DataTableBatcherConfig, + sink: Box, + ) -> RecordingStreamResult { + RecordingStreamInner::new(info, batcher_config, sink).map(|inner| Self { + inner: Arc::new(Some(inner)), + }) + } + + /// Creates a new no-op [`RecordingStream`] that drops all logging messages, doesn't allocate + /// any memory and doesn't spawn any threads. + /// + /// [`Self::is_enabled`] will return `false`. + pub fn disabled() -> Self { + Self { + inner: Arc::new(None), + } + } +} + +#[allow(clippy::needless_pass_by_value)] +fn forwarding_thread( + info: RecordingInfo, + mut sink: Box, + cmds_rx: Receiver, + tables: Receiver, +) { + /// Returns `true` to indicate that processing can continue; i.e. `false` means immediate + /// shutdown. + fn handle_cmd(info: &RecordingInfo, cmd: Command, sink: &mut Box) -> bool { + match cmd { + Command::RecordMsg(msg) => { + sink.send(msg); + } + Command::SwapSink(new_sink) => { + let backlog = { + // Capture the backlog if it exists. + let backlog = sink.drain_backlog(); + + // Flush the underlying sink if possible. + sink.drop_if_disconnected(); + sink.flush_blocking(); + + backlog + }; + + // Send the recording info to the new sink. This is idempotent. + { + re_log::debug!( + app_id = %info.application_id, + rec_id = %info.recording_id, + "setting recording info", + ); + new_sink.send( + re_log_types::BeginRecordingMsg { + row_id: re_log_types::RowId::random(), + info: info.clone(), + } + .into(), + ); + new_sink.send_all(backlog); + } + + *sink = new_sink; + } + Command::Flush(oneshot) => { + // Flush the underlying sink if possible. + sink.drop_if_disconnected(); + sink.flush_blocking(); + drop(oneshot); // signals the oneshot + } + Command::PopPendingTables => { + // Wake up and skip the current iteration so that we can drain all pending tables + // before handling the next command. + } + Command::Shutdown => return false, + } + + true + } + + use crossbeam::select; + loop { + // NOTE: Always pop tables first, this is what makes `Command::PopPendingTables` possible, + // which in turns makes `RecordingStream::flush_blocking` well defined. + while let Ok(table) = tables.try_recv() { + let table = match table.to_arrow_msg() { + Ok(table) => table, + Err(err) => { + re_log::error!(%err, + "couldn't serialize table; data dropped (this is a bug in Rerun!)"); + continue; + } + }; + sink.send(LogMsg::ArrowMsg(info.recording_id, table)); + } + + select! { + recv(tables) -> res => { + let Ok(table) = res else { + // The batcher is gone, which can only happen if the `RecordingStream` itself + // has been dropped. + break; + }; + let table = match table.to_arrow_msg() { + Ok(table) => table, + Err(err) => { + re_log::error!(%err, + "couldn't serialize table; data dropped (this is a bug in Rerun!)"); + continue; + } + }; + sink.send(LogMsg::ArrowMsg(info.recording_id, table)); + } + recv(cmds_rx) -> res => { + let Ok(cmd) = res else { + // All command senders are gone, which can only happen if the + // `RecordingStream` itself has been dropped. + break; + }; + if !handle_cmd(&info, cmd, &mut sink) { + break; // shutdown + } + } + } + + // NOTE: The receiving end of the command stream is owned solely by this thread. + // Past this point, all command writes will return `ErrDisconnected`. + } +} + +impl RecordingStream { + /// Check if logging is enabled on this `RecordingStream`. + /// + /// If not, all recording calls will be ignored. + #[inline] + pub fn is_enabled(&self) -> bool { + self.inner.is_some() + } + + /// The [`RecordingInfo`] associated with this `RecordingStream`. + #[inline] + pub fn recording_info(&self) -> Option<&RecordingInfo> { + (*self.inner).as_ref().map(|inner| &inner.info) + } +} + +impl RecordingStream { + /// Records an arbitrary [`LogMsg`]. + #[inline] + pub fn record_msg(&self, msg: LogMsg) { + let Some(this) = &*self.inner else { + re_log::debug!("Recording disabled - call to record_msg() ignored"); + return; + }; + + // NOTE: Internal channels can never be closed outside of the `Drop` impl, this send cannot + // fail. + + this.cmds_tx.send(Command::RecordMsg(msg)).ok(); + } + + /// Records a [`re_log_types::PathOp`]. + /// + /// This is a convenience wrapper for [`Self::record_msg`]. + #[inline] + pub fn record_path_op( + &self, + timepoint: re_log_types::TimePoint, + path_op: re_log_types::PathOp, + ) { + let Some(this) = &*self.inner else { + re_log::debug!("Recording disabled - call to record_path_op() ignored"); + return; + }; + + self.record_msg(LogMsg::EntityPathOpMsg( + this.info.recording_id, + re_log_types::EntityPathOpMsg { + row_id: re_log_types::RowId::random(), + time_point: timepoint, + path_op, + }, + )); + } + + /// Records a single [`DataRow`]. + /// + /// Internally, incoming [`DataRow`]s are automatically coalesced into larger [`DataTable`]s to + /// optimize for transport. + #[inline] + pub fn record_row(&self, row: DataRow) { + let Some(this) = &*self.inner else { + re_log::debug!("Recording disabled - call to record_row() ignored"); + return; + }; + + this.batcher.push_row(row); + } + + /// Swaps the underlying sink for a new one. + /// + /// This guarantees that: + /// 1. all pending rows and tables are batched, collected and sent down the current sink, + /// 2. the current sink is flushed if it has pending data in its buffers, + /// 3. the current sink's backlog, if there's any, is forwarded to the new sink. + /// + /// When this function returns, the calling thread is guaranteed that all future record calls + /// will end up in the new sink. + /// + /// ## Data loss + /// + /// If the current sink is in a broken state (e.g. a TCP sink with a broken connection that + /// cannot be repaired), all pending data in its buffers will be dropped. + pub fn set_sink(&self, sink: Box) { + let Some(this) = &*self.inner else { + re_log::debug!("Recording disabled - call to set_sink() ignored"); + return; + }; + + // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends + // are safe. + + // 1. Flush the batcher down the table channel + this.batcher.flush_blocking(); + + // 2. Receive pending tables from the batcher's channel + this.cmds_tx.send(Command::PopPendingTables).ok(); + + // 3. Swap the sink, which will internally make sure to re-ingest the backlog if needed + this.cmds_tx.send(Command::SwapSink(sink)).ok(); + + // 4. Before we give control back to the caller, we need to make sure that the swap has + // taken place: we don't want the user to send data to the old sink! + let (cmd, oneshot) = Command::flush(); + this.cmds_tx.send(cmd).ok(); + oneshot.recv().ok(); + } + + /// Initiates a flush of the pipeline and returns immediately. + /// + /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]). + /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees. + pub fn flush_async(&self) { + let Some(this) = &*self.inner else { + re_log::debug!("Recording disabled - call to flush_async() ignored"); + return; + }; + + // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends + // are safe. + + // 1. Synchronously flush the batcher down the table channel + // + // NOTE: This _has_ to be done synchronously as we need to be guaranteed that all tables + // are ready to be drained by the time this call returns. + // It cannot block indefinitely and is fairly fast as it only requires compute (no I/O). + this.batcher.flush_blocking(); + + // 2. Drain all pending tables from the batcher's channel _before_ any other future command + this.cmds_tx.send(Command::PopPendingTables).ok(); + + // 3. Asynchronously flush everything down the sink + let (cmd, _) = Command::flush(); + this.cmds_tx.send(cmd).ok(); + } + + /// Initiates a flush the batching pipeline and waits for it to propagate. + /// + /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees. + pub fn flush_blocking(&self) { + let Some(this) = &*self.inner else { + re_log::debug!("Recording disabled - call to flush_blocking() ignored"); + return; + }; + + // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends + // are safe. + + // 1. Flush the batcher down the table channel + this.batcher.flush_blocking(); + + // 2. Drain all pending tables from the batcher's channel _before_ any other future command + this.cmds_tx.send(Command::PopPendingTables).ok(); + + // 3. Wait for all tables to have been forwarded down the sink + let (cmd, oneshot) = Command::flush(); + this.cmds_tx.send(cmd).ok(); + oneshot.recv().ok(); + } +} + +impl RecordingStream { + /// Swaps the underlying sink for a [`crate::log_sink::TcpSink`] sink pre-configured to use + /// the specified address. + /// + /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in + /// terms of data durability and ordering. + /// See [`Self::set_sink`] for more information. + pub fn connect(&self, addr: std::net::SocketAddr) { + self.set_sink(Box::new(crate::log_sink::TcpSink::new(addr))); + } + + /// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated + /// [`MemorySinkStorage`]. + /// + /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in + /// terms of data durability and ordering. + /// See [`Self::set_sink`] for more information. + pub fn memory(&self) -> MemorySinkStorage { + let sink = crate::sink::MemorySink::default(); + let buffer = sink.buffer(); + self.set_sink(Box::new(sink)); + buffer + } + + /// Swaps the underlying sink for a [`crate::sink::FileSink`] at the specified `path`. + /// + /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in + /// terms of data durability and ordering. + /// See [`Self::set_sink`] for more information. + pub fn save( + &self, + path: impl Into, + ) -> Result<(), crate::sink::FileSinkError> { + let sink = crate::sink::FileSink::new(path)?; + self.set_sink(Box::new(sink)); + Ok(()) + } + + /// Swaps the underlying sink for a [`crate::sink::BufferedSink`]. + /// + /// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in + /// terms of data durability and ordering. + /// See [`Self::set_sink`] for more information. + pub fn disconnect(&self) { + self.set_sink(Box::new(crate::sink::BufferedSink::new())); + } +} + +#[cfg(test)] +mod tests { + use re_log_types::RowId; + + use super::*; + + #[test] + fn impl_send_sync() { + fn assert_send_sync() {} + assert_send_sync::(); + } + + #[test] + fn never_flush() { + let rec_stream = RecordingStreamBuilder::new("never_flush") + .enabled(true) + .batcher_config(DataTableBatcherConfig::NEVER) + .buffered() + .unwrap(); + + let rec_info = rec_stream.recording_info().cloned().unwrap(); + + let mut table = DataTable::example(false); + table.compute_all_size_bytes(); + for row in table.to_rows() { + rec_stream.record_row(row); + } + + let storage = rec_stream.memory(); + let mut msgs = { + let mut msgs = storage.take(); + msgs.reverse(); + msgs + }; + + // First message should be a set_recording_info resulting from the original sink swap to + // buffered mode. + match msgs.pop().unwrap() { + LogMsg::BeginRecordingMsg(msg) => { + assert!(msg.row_id != RowId::ZERO); + similar_asserts::assert_eq!(rec_info, msg.info); + } + _ => panic!("expected BeginRecordingMsg"), + } + + // Second message should be a set_recording_info resulting from the later sink swap from + // buffered mode into in-memory mode. + // This arrives _before_ the data itself since we're using manual flushing. + match msgs.pop().unwrap() { + LogMsg::BeginRecordingMsg(msg) => { + assert!(msg.row_id != RowId::ZERO); + similar_asserts::assert_eq!(rec_info, msg.info); + } + _ => panic!("expected BeginRecordingMsg"), + } + + // Third message is the batched table itself, which was sent as a result of the implicit + // flush when swapping the underlying sink from buffered to in-memory. + match msgs.pop().unwrap() { + LogMsg::ArrowMsg(rid, msg) => { + assert_eq!(rec_info.recording_id, rid); + + let mut got = DataTable::from_arrow_msg(&msg).unwrap(); + // TODO(1760): we shouldn't have to (re)do this! + got.compute_all_size_bytes(); + // NOTE: Override the resulting table's ID so they can be compared. + got.table_id = table.table_id; + + similar_asserts::assert_eq!(table, got); + } + _ => panic!("expected ArrowMsg"), + } + + // That's all. + assert!(msgs.pop().is_none()); + } + + #[test] + fn always_flush() { + let rec_stream = RecordingStreamBuilder::new("always_flush") + .enabled(true) + .batcher_config(DataTableBatcherConfig::ALWAYS) + .buffered() + .unwrap(); + + let rec_info = rec_stream.recording_info().cloned().unwrap(); + + let mut table = DataTable::example(false); + table.compute_all_size_bytes(); + for row in table.to_rows() { + rec_stream.record_row(row); + } + + let storage = rec_stream.memory(); + let mut msgs = { + let mut msgs = storage.take(); + msgs.reverse(); + msgs + }; + + // First message should be a set_recording_info resulting from the original sink swap to + // buffered mode. + match msgs.pop().unwrap() { + LogMsg::BeginRecordingMsg(msg) => { + assert!(msg.row_id != RowId::ZERO); + similar_asserts::assert_eq!(rec_info, msg.info); + } + _ => panic!("expected BeginRecordingMsg"), + } + + // Second message should be a set_recording_info resulting from the later sink swap from + // buffered mode into in-memory mode. + // This arrives _before_ the data itself since we're using manual flushing. + match msgs.pop().unwrap() { + LogMsg::BeginRecordingMsg(msg) => { + assert!(msg.row_id != RowId::ZERO); + similar_asserts::assert_eq!(rec_info, msg.info); + } + _ => panic!("expected BeginRecordingMsg"), + } + + let mut rows = { + let mut rows: Vec<_> = table.to_rows().collect(); + rows.reverse(); + rows + }; + + let mut assert_next_row = || { + match msgs.pop().unwrap() { + LogMsg::ArrowMsg(rid, msg) => { + assert_eq!(rec_info.recording_id, rid); + + let mut got = DataTable::from_arrow_msg(&msg).unwrap(); + // TODO(1760): we shouldn't have to (re)do this! + got.compute_all_size_bytes(); + // NOTE: Override the resulting table's ID so they can be compared. + got.table_id = table.table_id; + + let expected = DataTable::from_rows(got.table_id, [rows.pop().unwrap()]); + + similar_asserts::assert_eq!(expected, got); + } + _ => panic!("expected ArrowMsg"), + } + }; + + // 3rd, 4th and 5th messages are all the single-row batched tables themselves, which were + // sent as a result of the implicit flush when swapping the underlying sink from buffered + // to in-memory. + assert_next_row(); + assert_next_row(); + assert_next_row(); + + // That's all. + assert!(msgs.pop().is_none()); + } + + #[test] + fn flush_hierarchy() { + let (rec_stream, storage) = RecordingStreamBuilder::new("flush_hierarchy") + .enabled(true) + .batcher_config(DataTableBatcherConfig::NEVER) + .memory() + .unwrap(); + + let rec_info = rec_stream.recording_info().cloned().unwrap(); + + let mut table = DataTable::example(false); + table.compute_all_size_bytes(); + for row in table.to_rows() { + rec_stream.record_row(row); + } + + { + let mut msgs = { + let mut msgs = storage.take(); + msgs.reverse(); + msgs + }; + + // First message should be a set_recording_info resulting from the original sink swap + // to in-memory mode. + match msgs.pop().unwrap() { + LogMsg::BeginRecordingMsg(msg) => { + assert!(msg.row_id != RowId::ZERO); + similar_asserts::assert_eq!(rec_info, msg.info); + } + _ => panic!("expected BeginRecordingMsg"), + } + + // The underlying batcher is never flushing: there's nothing else. + assert!(msgs.pop().is_none()); + } + + // The underlying batcher is never flushing: there's nothing else. + assert!(storage.take().is_empty()); + + rec_stream.flush_blocking(); // flush the entire hierarchy + + { + let mut msgs = { + let mut msgs = storage.take(); + msgs.reverse(); + msgs + }; + + // The batched table itself, which was sent as a result of the explicit flush above. + match msgs.pop().unwrap() { + LogMsg::ArrowMsg(rid, msg) => { + assert_eq!(rec_info.recording_id, rid); + + let mut got = DataTable::from_arrow_msg(&msg).unwrap(); + // TODO(1760): we shouldn't have to (re)do this! + got.compute_all_size_bytes(); + // NOTE: Override the resulting table's ID so they can be compared. + got.table_id = table.table_id; + + similar_asserts::assert_eq!(table, got); + } + _ => panic!("expected ArrowMsg"), + } + + // That's all. + assert!(msgs.pop().is_none()); + } + } + + #[test] + fn disabled() { + let (rec_stream, storage) = RecordingStreamBuilder::new("disabled") + .enabled(false) + .batcher_config(DataTableBatcherConfig::ALWAYS) + .memory() + .unwrap(); + + let mut table = DataTable::example(false); + table.compute_all_size_bytes(); + for row in table.to_rows() { + rec_stream.record_row(row); + } + + let mut msgs = { + let mut msgs = storage.take(); + msgs.reverse(); + msgs + }; + + // That's all. + assert!(msgs.pop().is_none()); + } +} diff --git a/crates/re_sdk/src/session.rs b/crates/re_sdk/src/session.rs deleted file mode 100644 index b9fbc1d419d3..000000000000 --- a/crates/re_sdk/src/session.rs +++ /dev/null @@ -1,320 +0,0 @@ -use std::sync::Arc; - -use re_log_types::{ApplicationId, LogMsg, RecordingId, RecordingInfo, RecordingSource, Time}; - -use crate::sink::LogSink; - -// ---------------------------------------------------------------------------- - -/// Construct a [`Session`]. -/// -/// ``` no_run -/// # use re_sdk::SessionBuilder; -/// let session = SessionBuilder::new("my_app").save("my_recording.rrd")?; -/// # Ok::<(), Box>(()) -/// ``` -#[must_use] -pub struct SessionBuilder { - application_id: ApplicationId, - is_official_example: bool, - enabled: Option, - default_enabled: bool, - recording_id: Option, -} - -impl SessionBuilder { - /// Create a new [`SessionBuilder`] with an application id. - /// - /// The application id is usually the name of your app. - /// - /// ``` no_run - /// # use re_sdk::SessionBuilder; - /// let session = SessionBuilder::new("my_app").save("my_recording.rrd")?; - /// # Ok::<(), Box>(()) - /// ``` - #[track_caller] // track_caller so that we can see if we are being called from an official example. - pub fn new(application_id: impl Into) -> Self { - let application_id = application_id.into(); - let is_official_example = crate::called_from_official_rust_example(); - - Self { - application_id, - is_official_example, - enabled: None, - default_enabled: true, - recording_id: None, - } - } - - /// Set whether or not Rerun is enabled by default. - /// - /// If the `RERUN` environment variable is set, it will override this. - /// - /// Set also: [`Self::enabled`]. - pub fn default_enabled(mut self, default_enabled: bool) -> Self { - self.default_enabled = default_enabled; - self - } - - /// Set whether or not Rerun is enabled. - /// - /// Setting this will ignore the `RERUN` environment variable. - /// - /// Set also: [`Self::default_enabled`]. - pub fn enabled(mut self, enabled: bool) -> Self { - self.enabled = Some(enabled); - self - } - - /// Set the [`RecordingId`] for this session. - /// - /// If you're logging from multiple processes and want all the messages - /// to end up as the same recording, you must make sure they all set the same - /// [`RecordingId`] using this function. - /// - /// Note that many recordings can share the same [`ApplicationId`], but - /// they all have unique [`RecordingId`]s. - /// - /// The default is to use a random [`RecordingId`]. - pub fn recording_id(mut self, recording_id: RecordingId) -> Self { - self.recording_id = Some(recording_id); - self - } - - /// Buffer log messages in RAM. - /// - /// Retrieve them later with [`Session::drain_backlog`]. - pub fn buffered(self) -> Session { - let (rerun_enabled, recording_info) = self.finalize(); - if rerun_enabled { - Session::buffered(recording_info) - } else { - re_log::debug!("Rerun disabled - call to buffered() ignored"); - Session::disabled() - } - } - - /// Send log data to a remote viewer/server. - /// - /// Usually this is done by running the `rerun` binary (`cargo install rerun`) without arguments, - /// and then connecting to it. - /// - /// Send all currently buffered messages. - /// If we are already connected, we will re-connect to this new address. - /// - /// This function returns immediately. - /// - /// ## Example: - /// - /// ``` no_run - /// let session = re_sdk::SessionBuilder::new("my_app").connect(re_sdk::default_server_addr()); - /// ``` - pub fn connect(self, addr: std::net::SocketAddr) -> Session { - let (rerun_enabled, recording_info) = self.finalize(); - if rerun_enabled { - Session::new( - recording_info, - Box::new(crate::log_sink::TcpSink::new(addr)), - ) - } else { - re_log::debug!("Rerun disabled - call to connect() ignored"); - Session::disabled() - } - } - - /// Stream all log messages to an `.rrd` file. - /// - /// ``` no_run - /// # use re_sdk::SessionBuilder; - /// let session = SessionBuilder::new("my_app").save("my_recording.rrd")?; - /// # Ok::<(), Box>(()) - /// ``` - #[cfg(not(target_arch = "wasm32"))] - pub fn save( - self, - path: impl Into, - ) -> Result { - let (rerun_enabled, recording_info) = self.finalize(); - if rerun_enabled { - Ok(Session::new( - recording_info, - Box::new(crate::sink::FileSink::new(path)?), - )) - } else { - re_log::debug!("Rerun disabled - call to save() ignored"); - Ok(Session::disabled()) - } - } - - /// Returns whether or not logging is enabled, plus a [`RecordingInfo`]. - /// - /// This can be used to then construct a [`Session`] manually using [`Session::new`]. - pub fn finalize(self) -> (bool, RecordingInfo) { - let Self { - application_id, - is_official_example, - enabled, - default_enabled, - recording_id, - } = self; - - let enabled = enabled.unwrap_or_else(|| crate::decide_logging_enabled(default_enabled)); - let recording_id = recording_id.unwrap_or_else(RecordingId::random); - - let recording_info = RecordingInfo { - application_id, - recording_id, - is_official_example, - started: Time::now(), - recording_source: RecordingSource::RustSdk { - rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(), - llvm_version: env!("RE_BUILD_LLVM_VERSION").into(), - }, - }; - - (enabled, recording_info) - } -} - -// ---------------------------------------------------------------------------- - -/// The main way to do Rerun loggning. -/// -/// You can construct a [`Session`] with [`SessionBuilder`] or [`Session::new`]. -/// -/// Cloning a [`Session`] is cheap (it's a shallow clone). -/// The clone will send its messages to the same sink as the prototype. -/// -/// `Session` also implements `Send` and `Sync`. -#[must_use] -#[derive(Clone)] -pub struct Session { - recording_info: RecordingInfo, - sink: Arc, - // TODO(emilk): add convenience `TimePoint` here so that users can - // do things like `session.set_time_sequence("frame", frame_idx);` -} - -#[test] -fn session_impl_send_sync() { - fn assert_send_sync() {} - assert_send_sync::(); -} - -impl Session { - /// Construct a new session with a given [`RecordingInfo`] and [`LogSink`]. - /// - /// You can create a [`RecordingInfo`] with [`crate::new_recording_info`]; - /// - /// The [`RecordingInfo`] is immediately sent to the sink in the form of a - /// [`re_log_types::BeginRecordingMsg`]. - /// - /// You can find sinks in [`crate::sink`]. - /// - /// See also: [`SessionBuilder`]. - pub fn new(recording_info: RecordingInfo, sink: Box) -> Self { - if sink.is_enabled() { - re_log::debug!( - "Beginning new recording with application_id {:?} and recording id {}", - recording_info.application_id.0, - recording_info.recording_id - ); - - sink.send( - re_log_types::BeginRecordingMsg { - row_id: re_log_types::RowId::random(), - info: recording_info.clone(), - } - .into(), - ); - } - - Self { - recording_info, - sink: sink.into(), - } - } - - /// Construct a new session with a disabled "dummy" sink that drops all logging messages. - /// - /// [`Self::is_enabled`] will return `false`. - pub fn disabled() -> Self { - Self { - recording_info: RecordingInfo { - application_id: ApplicationId::unknown(), - recording_id: Default::default(), - is_official_example: crate::called_from_official_rust_example(), - started: Time::now(), - recording_source: RecordingSource::RustSdk { - rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(), - llvm_version: env!("RE_BUILD_LLVM_VERSION").into(), - }, - }, - sink: crate::sink::disabled().into(), - } - } - - /// Buffer log messages in RAM. - /// - /// Retrieve them later with [`Self::drain_backlog`]. - pub fn buffered(recording_info: RecordingInfo) -> Self { - Self::new(recording_info, Box::new(crate::sink::BufferedSink::new())) - } - - /// Check if logging is enabled on this `Session`. - /// - /// If not, all logging calls will be ignored. - pub fn is_enabled(&self) -> bool { - self.sink.is_enabled() - } - - /// Access the underlying log sink to where we send out log messages. - pub fn sink(&self) -> &Arc { - &self.sink - } - - /// Send a [`LogMsg`]. - pub fn send(&self, log_msg: LogMsg) { - self.sink.send(log_msg); - } - - /// Send a [`re_log_types::PathOp`]. - /// - /// This is a convenience wrapper for [`Self::send`]. - pub fn send_path_op( - &self, - time_point: &re_log_types::TimePoint, - path_op: re_log_types::PathOp, - ) { - self.send(LogMsg::EntityPathOpMsg( - self.recording_id(), - re_log_types::EntityPathOpMsg { - row_id: re_log_types::RowId::random(), - time_point: time_point.clone(), - path_op, - }, - )); - } - - /// Drain all buffered [`LogMsg`]es and return them. - pub fn drain_backlog(&self) -> Vec { - self.sink.drain_backlog() - } - - /// The current [`RecordingId`]. - pub fn recording_id(&self) -> RecordingId { - self.recording_info.recording_id - } -} - -impl AsRef for Session { - fn as_ref(&self) -> &dyn LogSink { - self.sink.as_ref() - } -} - -impl std::borrow::Borrow for Session { - fn borrow(&self) -> &dyn LogSink { - self.sink.as_ref() - } -} diff --git a/crates/rerun/src/clap.rs b/crates/rerun/src/clap.rs index 4cd0514d9ce0..ea883f0a59a4 100644 --- a/crates/rerun/src/clap.rs +++ b/crates/rerun/src/clap.rs @@ -2,13 +2,12 @@ use std::{net::SocketAddr, path::PathBuf}; +use re_sdk::RecordingStream; #[cfg(feature = "web_viewer")] use re_web_viewer_server::WebViewerServerPort; #[cfg(feature = "web_viewer")] use re_ws_comms::RerunServerPort; -use crate::Session; - // --- #[derive(Debug, Clone, PartialEq, Eq)] @@ -24,6 +23,9 @@ enum RerunBehavior { Spawn, } +// TODO(cmc): There are definitely ways of making this all nicer now (this, native_viewer and +// web_viewer).. but one thing at a time. + /// This struct implements a `clap::Parser` that defines all the arguments that a typical Rerun /// application might use, and provides helpers to evaluate those arguments and behave /// consequently. @@ -67,7 +69,7 @@ pub struct RerunArgs { } impl RerunArgs { - /// Set up Rerun, and run the given code with a [`Session`] object + /// Set up Rerun, and run the given code with a [`RecordingStream`] object /// that can be used to log data. /// /// Logging will be controlled by the `RERUN` environment variable, @@ -77,7 +79,7 @@ impl RerunArgs { &self, application_id: &str, default_enabled: bool, - run: impl FnOnce(Session) + Send + 'static, + run: impl FnOnce(RecordingStream) + Send + 'static, ) -> anyhow::Result<()> { // Ensure we have a running tokio runtime. let mut tokio_runtime = None; @@ -89,12 +91,13 @@ impl RerunArgs { }; let _tokio_runtime_guard = tokio_runtime_handle.enter(); - let (rerun_enabled, recording_info) = crate::SessionBuilder::new(application_id) - .default_enabled(default_enabled) - .finalize(); + let (rerun_enabled, recording_info, batcher_config) = + crate::RecordingStreamBuilder::new(application_id) + .default_enabled(default_enabled) + .into_args(); if !rerun_enabled { - run(Session::disabled()); + run(RecordingStream::disabled()); return Ok(()); } @@ -115,14 +118,17 @@ impl RerunArgs { #[cfg(feature = "native_viewer")] RerunBehavior::Spawn => { - crate::native_viewer::spawn(recording_info, run)?; + crate::native_viewer::spawn(recording_info, batcher_config, run)?; return Ok(()); } }; - let session = Session::new(recording_info, sink); - let _sink = session.sink().clone(); // Keep sink (and potential associated servers) alive until the end of this function scope. - run(session); + let rec_stream = RecordingStream::new(recording_info, batcher_config, sink)?; + run(rec_stream.clone()); + + // The user callback is done executing, it's a good opportunity to flush the pipeline + // independently of the current flush thresholds (which might be `NEVER`). + rec_stream.flush_async(); #[cfg(feature = "web_viewer")] if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) { diff --git a/crates/rerun/src/lib.rs b/crates/rerun/src/lib.rs index 107bba38accd..1ee5181ef3d5 100644 --- a/crates/rerun/src/lib.rs +++ b/crates/rerun/src/lib.rs @@ -34,7 +34,7 @@ //! # fn capture_image() -> image::DynamicImage { Default::default() } //! # fn positions() -> Vec { Default::default() } //! # fn colors() -> Vec { Default::default() } -//! let mut rr_session = rerun::SessionBuilder::new("my_app").buffered(); +//! let rec_stream = rerun::RecordingStreamBuilder::new("my_app").buffered()?; //! //! let points: Vec = positions(); //! let colors: Vec = colors(); @@ -43,16 +43,16 @@ //! rerun::MsgSender::new("points") //! .with_component(&points)? //! .with_component(&colors)? -//! .send(&mut rr_session)?; +//! .send(&rec_stream)?; //! //! rerun::MsgSender::new("image") //! .with_component(&[rerun::components::Tensor::from_image(image)?])? -//! .send(&mut rr_session)?; +//! .send(&rec_stream)?; //! //! # Ok::<(), Box>(()) //! ``` //! -//! See [`Session`] and [`MsgSender`] for details. +//! See [`RecordingStream`] and [`MsgSender`] for details. //! //! #### Streaming //! To stream log data to an awaiting `rerun` process, you can do this: @@ -60,18 +60,20 @@ //! //! Then do this: //! -//! ``` no_run -//! let mut rr_session = rerun::SessionBuilder::new("my_app").connect(rerun::default_server_addr()); +//! ```no_run +//! let rec_stream = rerun::RecordingStreamBuilder::new("my_app").connect(rerun::default_server_addr()); //! ``` //! //! #### Buffering //! -//! ``` no_run -//! # fn log_using(rr_session: &rerun::Session) {} +//! ```no_run +//! # fn log_using(rec_stream: &rerun::RecordingStream) {} //! -//! let mut rr_session = rerun::SessionBuilder::new("my_app").buffered(); -//! log_using(&mut rr_session); -//! rerun::native_viewer::show(&mut rr_session); +//! let (rec_stream, storage) = rerun::RecordingStreamBuilder::new("my_app").memory()?; +//! log_using(&rec_stream); +//! rerun::native_viewer::show(storage.take()); +//! +//! # Ok::<(), Box>(()) //! ``` //! //! ## Binary diff --git a/crates/rerun/src/native_viewer.rs b/crates/rerun/src/native_viewer.rs index e1d5fc1de4e0..2999879f1f92 100644 --- a/crates/rerun/src/native_viewer.rs +++ b/crates/rerun/src/native_viewer.rs @@ -1,9 +1,9 @@ use re_log_types::LogMsg; use re_log_types::RecordingInfo; -use re_sdk::Session; +use re_sdk::RecordingStream; /// Starts a Rerun viewer on the current thread and migrates the given callback, along with -/// the active `Session`, to a newly spawned thread where the callback will run until +/// the active `RecordingStream`, to a newly spawned thread where the callback will run until /// completion. /// /// All messages logged from the passed-in callback will be streamed to the viewer in @@ -14,21 +14,26 @@ use re_sdk::Session; /// ⚠️ This function must be called from the main thread since some platforms require that /// their UI runs on the main thread! ⚠️ #[cfg(not(target_arch = "wasm32"))] -pub fn spawn(recording_info: RecordingInfo, run: F) -> re_viewer::external::eframe::Result<()> +pub fn spawn( + recording_info: RecordingInfo, + batcher_config: re_log_types::DataTableBatcherConfig, + run: F, +) -> re_viewer::external::eframe::Result<()> where - F: FnOnce(Session) + Send + 'static, + F: FnOnce(RecordingStream) + Send + 'static, { let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::Sdk); let sink = Box::new(NativeViewerSink(tx)); let app_env = re_viewer::AppEnvironment::from_recording_source(&recording_info.recording_source); - let session = Session::new(recording_info, sink); + let rec_stream = + RecordingStream::new(recording_info, batcher_config, sink).expect("Failed to spawn thread"); // NOTE: Forget the handle on purpose, leave that thread be. std::thread::Builder::new() .name("spawned".into()) - .spawn(move || run(session)) + .spawn(move || run(rec_stream)) .expect("Failed to spawn thread"); // NOTE: Some platforms still mandate that the UI must run on the main thread, so make sure @@ -49,32 +54,28 @@ where })) } -/// Drains all pending log messages and starts a Rerun viewer to visualize -/// everything that has been logged so far. -/// +/// Starts a Rerun viewer to visualize the contents of a given array of messages. /// The method will return when the viewer is closed. /// -/// You should use this method together with [`Session::buffered`]; -/// /// ⚠️ This function must be called from the main thread since some platforms require that /// their UI runs on the main thread! ⚠️ -pub fn show(session: &Session) -> re_viewer::external::eframe::Result<()> { - if !session.is_enabled() { - re_log::debug!("Rerun disabled - call to show() ignored"); +pub fn show(msgs: Vec) -> re_viewer::external::eframe::Result<()> { + if msgs.is_empty() { + re_log::debug!("Empty array of msgs - call to show() ignored"); return Ok(()); } + let recording_source = re_log_types::RecordingSource::RustSdk { rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(), llvm_version: env!("RE_BUILD_LLVM_VERSION").into(), }; - let log_messages = session.drain_backlog(); let startup_options = re_viewer::StartupOptions::default(); re_viewer::run_native_viewer_with_messages( re_build_info::build_info!(), re_viewer::AppEnvironment::from_recording_source(&recording_source), startup_options, - log_messages, + msgs, ) } @@ -91,4 +92,7 @@ impl re_sdk::sink::LogSink for NativeViewerSink { re_log::error_once!("Failed to send log message to viewer: {err}"); } } + + #[inline] + fn flush_blocking(&self) {} } diff --git a/crates/rerun/src/web_viewer.rs b/crates/rerun/src/web_viewer.rs index bc1fc6f3396e..3910b85214cf 100644 --- a/crates/rerun/src/web_viewer.rs +++ b/crates/rerun/src/web_viewer.rs @@ -78,6 +78,9 @@ impl crate::sink::LogSink for WebViewerSink { re_log::error_once!("Failed to send log message to web server: {err}"); } } + + #[inline] + fn flush_blocking(&self) {} } // ---------------------------------------------------------------------------- diff --git a/examples/rust/api_demo/src/main.rs b/examples/rust/api_demo/src/main.rs index abc6ff2e873e..32b9cd141beb 100644 --- a/examples/rust/api_demo/src/main.rs +++ b/examples/rust/api_demo/src/main.rs @@ -29,7 +29,7 @@ use rerun::{ re_log_types::external::{arrow2, arrow2_convert}, }, time::{Time, TimePoint, TimeType, Timeline}, - Component, ComponentName, EntityPath, MsgSender, Session, + Component, ComponentName, EntityPath, MsgSender, RecordingStream, }; // --- Rerun logging --- @@ -40,7 +40,7 @@ fn sim_time(at: f64) -> TimePoint { [(timeline_sim_time, time.into())].into() } -fn demo_bbox(session: &Session) -> anyhow::Result<()> { +fn demo_bbox(rec_stream: &RecordingStream) -> anyhow::Result<()> { MsgSender::new("bbox_demo/bbox") .with_timepoint(sim_time(0 as _)) .with_component(&[Box3D::new(1.0, 0.5, 0.25)])? @@ -51,7 +51,7 @@ fn demo_bbox(session: &Session) -> anyhow::Result<()> { .with_component(&[ColorRGBA::from_rgb(0, 255, 0)])? .with_component(&[Radius(0.005)])? .with_component(&[Label("box/t0".to_owned())])? - .send(session)?; + .send(rec_stream)?; MsgSender::new("bbox_demo/bbox") .with_timepoint(sim_time(1 as _)) @@ -63,17 +63,17 @@ fn demo_bbox(session: &Session) -> anyhow::Result<()> { .with_component(&[ColorRGBA::from_rgb(255, 255, 0)])? .with_component(&[Radius(0.01)])? .with_component(&[Label("box/t1".to_owned())])? - .send(session)?; + .send(rec_stream)?; Ok(()) } -fn demo_extension_components(session: &Session) -> anyhow::Result<()> { +fn demo_extension_components(rec_stream: &RecordingStream) -> anyhow::Result<()> { // Hack to establish 2d view bounds MsgSender::new("extension_components") .with_timepoint(sim_time(0 as _)) .with_component(&[Rect2D::from_xywh(0.0, 0.0, 128.0, 128.0)])? - .send(session)?; + .send(rec_stream)?; // Separate extension component // TODO(cmc): not that great to have to dig around for arrow2-* reexports :/ @@ -95,7 +95,7 @@ fn demo_extension_components(session: &Session) -> anyhow::Result<()> { .with_component(&[Point2D::new(64.0, 64.0)])? .with_component(&[ColorRGBA::from_rgb(255, 0, 0)])? .with_component(&[Confidence(0.9)])? - .send(session)?; + .send(rec_stream)?; // Batch points with extension @@ -136,15 +136,15 @@ fn demo_extension_components(session: &Session) -> anyhow::Result<()> { Corner("lower right".into()), ])? .with_splat(Training(true))? - .send(session)?; + .send(rec_stream)?; Ok(()) } -fn demo_log_cleared(session: &Session) -> anyhow::Result<()> { +fn demo_log_cleared(rec_stream: &RecordingStream) -> anyhow::Result<()> { // TODO(cmc): need abstractions for this fn log_cleared( - session: &Session, + rec_stream: &RecordingStream, timepoint: &TimePoint, ent_path: impl Into, recursive: bool, @@ -155,7 +155,7 @@ fn demo_log_cleared(session: &Session) -> anyhow::Result<()> { (Timeline::log_time(), Time::now().into()), (*tp[0].0, *tp[0].1), ]; - session.send_path_op(&timepoint.into(), PathOp::clear(recursive, ent_path.into())); + rec_stream.record_path_op(timepoint.into(), PathOp::clear(recursive, ent_path.into())); } // sim_time = 1 @@ -164,46 +164,46 @@ fn demo_log_cleared(session: &Session) -> anyhow::Result<()> { .with_component(&[Rect2D::from_xywh(5.0, 5.0, 4.0, 4.0)])? .with_component(&[ColorRGBA::from_rgb(255, 0, 0)])? .with_component(&[Label("Rect1".into())])? - .send(session)?; + .send(rec_stream)?; MsgSender::new("null_demo/rect/1") .with_timepoint(sim_time(1 as _)) .with_component(&[Rect2D::from_xywh(10.0, 5.0, 4.0, 4.0)])? .with_component(&[ColorRGBA::from_rgb(0, 255, 0)])? .with_component(&[Label("Rect2".into())])? - .send(session)?; + .send(rec_stream)?; // sim_time = 2 - log_cleared(session, &sim_time(2 as _), "null_demo/rect/0", false); + log_cleared(rec_stream, &sim_time(2 as _), "null_demo/rect/0", false); // sim_time = 3 - log_cleared(session, &sim_time(3 as _), "null_demo/rect", true); + log_cleared(rec_stream, &sim_time(3 as _), "null_demo/rect", true); // sim_time = 4 MsgSender::new("null_demo/rect/0") .with_timepoint(sim_time(4 as _)) .with_component(&[Rect2D::from_xywh(5.0, 5.0, 4.0, 4.0)])? - .send(session)?; + .send(rec_stream)?; // sim_time = 5 MsgSender::new("null_demo/rect/1") .with_timepoint(sim_time(5 as _)) .with_component(&[Rect2D::from_xywh(10.0, 5.0, 4.0, 4.0)])? - .send(session)?; + .send(rec_stream)?; Ok(()) } -fn demo_3d_points(session: &Session) -> anyhow::Result<()> { +fn demo_3d_points(rec_stream: &RecordingStream) -> anyhow::Result<()> { MsgSender::new("3d_points/single_point_unlabeled") .with_timepoint(sim_time(1 as _)) .with_component(&[Point3D::new(10.0, 0.0, 0.0)])? - .send(session)?; + .send(rec_stream)?; MsgSender::new("3d_points/single_point_labeled") .with_timepoint(sim_time(1 as _)) .with_component(&[Point3D::new(0.0, 0.0, 0.0)])? .with_component(&[Label("labeled point".to_owned())])? - .send(session)?; + .send(rec_stream)?; fn create_points( n: usize, @@ -232,7 +232,7 @@ fn demo_3d_points(session: &Session) -> anyhow::Result<()> { .with_component(&points)? .with_component(&labels)? .with_component(&radii)? - .send(session)?; + .send(rec_stream)?; let (labels, points, _, colors) = create_points(100, |x| x * 5.0, |y| y * 5.0 - 10.0, |z| z * 0.4 - 5.0); @@ -241,12 +241,12 @@ fn demo_3d_points(session: &Session) -> anyhow::Result<()> { .with_component(&points)? .with_component(&labels)? .with_component(&colors)? - .send(session)?; + .send(rec_stream)?; Ok(()) } -fn demo_rects(session: &Session) -> anyhow::Result<()> { +fn demo_rects(rec_stream: &RecordingStream) -> anyhow::Result<()> { use ndarray::prelude::*; use ndarray_rand::{rand_distr::Uniform, RandomExt as _}; @@ -255,7 +255,7 @@ fn demo_rects(session: &Session) -> anyhow::Result<()> { MsgSender::new("rects_demo/img") .with_timepoint(sim_time(1 as _)) .with_component(&[Tensor::try_from(img.as_standard_layout().view())?])? - .send(session)?; + .send(rec_stream)?; // 20 random rectangles // TODO(cmc): shouldn't have to collect, need to fix the "must have a ref" thingy @@ -273,28 +273,32 @@ fn demo_rects(session: &Session) -> anyhow::Result<()> { .with_timepoint(sim_time(2 as _)) .with_component(&rects)? .with_component(&colors)? - .send(session)?; + .send(rec_stream)?; // Clear the rectangles by logging an empty set MsgSender::new("rects_demo/rects") .with_timepoint(sim_time(3 as _)) .with_component(&Vec::::new())? - .send(session)?; + .send(rec_stream)?; Ok(()) } -fn demo_segmentation(session: &Session) -> anyhow::Result<()> { +fn demo_segmentation(rec_stream: &RecordingStream) -> anyhow::Result<()> { // TODO(cmc): All of these text logs should really be going through `re_log` and automagically // fed back into rerun via a `tracing` backend. At the _very_ least we should have a helper // available for this. // In either case, this raises the question of tracking time at the SDK level, akin to what the // python SDK does. - fn log_info(session: &Session, timepoint: TimePoint, text: &str) -> anyhow::Result<()> { + fn log_info( + rec_stream: &RecordingStream, + timepoint: TimePoint, + text: &str, + ) -> anyhow::Result<()> { MsgSender::new("logs/seg_demo_log") .with_timepoint(timepoint) .with_component(&[TextEntry::new(text, Some("INFO".into()))])? - .send(session) + .send(rec_stream) .map_err(Into::into) } @@ -310,20 +314,20 @@ fn demo_segmentation(session: &Session) -> anyhow::Result<()> { MsgSender::new("seg_demo/img") .with_timepoint(sim_time(1 as _)) .with_component(&[tensor])? - .send(session)?; + .send(rec_stream)?; // Log a bunch of classified 2D points MsgSender::new("seg_demo/single_point") .with_timepoint(sim_time(1 as _)) .with_component(&[Point2D::new(64.0, 64.0)])? .with_component(&[ClassId(13)])? - .send(session)?; + .send(rec_stream)?; MsgSender::new("seg_demo/single_point_labeled") .with_timepoint(sim_time(1 as _)) .with_component(&[Point2D::new(90.0, 50.0)])? .with_component(&[ClassId(13)])? .with_component(&[Label("labeled point".into())])? - .send(session)?; + .send(rec_stream)?; MsgSender::new("seg_demo/several_points0") .with_timepoint(sim_time(1 as _)) .with_component(&[ @@ -332,7 +336,7 @@ fn demo_segmentation(session: &Session) -> anyhow::Result<()> { Point2D::new(60.0, 30.0), ])? .with_splat(ClassId(42))? - .send(session)?; + .send(rec_stream)?; MsgSender::new("seg_demo/several_points1") .with_timepoint(sim_time(1 as _)) .with_component(&[ @@ -341,7 +345,7 @@ fn demo_segmentation(session: &Session) -> anyhow::Result<()> { Point2D::new(80.0, 30.0), ])? .with_component(&[ClassId(13), ClassId(42), ClassId(99)])? - .send(session)?; + .send(rec_stream)?; MsgSender::new("seg_demo/many points") .with_timepoint(sim_time(1 as _)) .with_component( @@ -350,9 +354,9 @@ fn demo_segmentation(session: &Session) -> anyhow::Result<()> { .collect::>(), )? .with_splat(ClassId(42))? - .send(session)?; + .send(rec_stream)?; log_info( - session, + rec_stream, sim_time(1 as _), "no rects, default colored points, a single point has a label", )?; @@ -388,9 +392,9 @@ fn demo_segmentation(session: &Session) -> anyhow::Result<()> { .into_iter() .collect(), }])? - .send(session)?; + .send(rec_stream)?; log_info( - session, + rec_stream, sim_time(2 as _), "default colored rects, default colored points, all points except the \ bottom right clusters have labels", @@ -408,9 +412,9 @@ fn demo_segmentation(session: &Session) -> anyhow::Result<()> { .into_iter() .collect(), }])? - .send(session)?; + .send(rec_stream)?; log_info( - session, + rec_stream, sim_time(3 as _), "points/rects with user specified colors", )?; @@ -427,9 +431,9 @@ fn demo_segmentation(session: &Session) -> anyhow::Result<()> { .into_iter() .collect(), }])? - .send(session)?; + .send(rec_stream)?; log_info( - session, + rec_stream, sim_time(4 as _), "label1 disappears and everything with label3 is now default colored again", )?; @@ -437,7 +441,7 @@ fn demo_segmentation(session: &Session) -> anyhow::Result<()> { Ok(()) } -fn demo_text_logs(session: &Session) -> anyhow::Result<()> { +fn demo_text_logs(rec_stream: &RecordingStream) -> anyhow::Result<()> { // TODO(cmc): the python SDK has some magic that glues the standard logger directly into rerun // logs; we're gonna need something similar for rust (e.g. `tracing` backend). @@ -448,7 +452,7 @@ fn demo_text_logs(session: &Session) -> anyhow::Result<()> { .with_timepoint(sim_time(0 as _)) .with_component(&[TextEntry::new("Text with explicitly set color", None)])? .with_component(&[ColorRGBA::from_rgb(255, 215, 0)])? - .send(session)?; + .send(rec_stream)?; MsgSender::new("logs") .with_timepoint(sim_time(0 as _)) @@ -456,12 +460,12 @@ fn demo_text_logs(session: &Session) -> anyhow::Result<()> { "this entry has loglevel TRACE", Some("TRACE".into()), )])? - .send(session)?; + .send(rec_stream)?; Ok(()) } -fn demo_transforms_3d(session: &Session) -> anyhow::Result<()> { +fn demo_transforms_3d(rec_stream: &RecordingStream) -> anyhow::Result<()> { let sun_to_planet_distance = 6.0; let planet_to_moon_distance = 3.0; let rotation_speed_planet = 2.0; @@ -469,7 +473,7 @@ fn demo_transforms_3d(session: &Session) -> anyhow::Result<()> { // Planetary motion is typically in the XY plane. fn log_coordinate_space( - session: &Session, + rec_stream: &RecordingStream, ent_path: impl Into, ) -> anyhow::Result<()> { let view_coords = ViewCoordinates::from_up_and_handedness( @@ -480,17 +484,17 @@ fn demo_transforms_3d(session: &Session) -> anyhow::Result<()> { .with_timeless(true) .with_component(&[view_coords])? .with_component(&[ColorRGBA::from_rgb(255, 215, 0)])? - .send(session) + .send(rec_stream) .map_err(Into::into) } - log_coordinate_space(session, "transforms3d")?; - log_coordinate_space(session, "transforms3d/sun")?; - log_coordinate_space(session, "transforms3d/sun/planet")?; - log_coordinate_space(session, "transforms3d/sun/planet/moon")?; + log_coordinate_space(rec_stream, "transforms3d")?; + log_coordinate_space(rec_stream, "transforms3d/sun")?; + log_coordinate_space(rec_stream, "transforms3d/sun/planet")?; + log_coordinate_space(rec_stream, "transforms3d/sun/planet/moon")?; // All are in the center of their own space: fn log_point( - session: &Session, + rec_stream: &RecordingStream, ent_path: impl Into, radius: f32, color: [u8; 3], @@ -500,13 +504,13 @@ fn demo_transforms_3d(session: &Session) -> anyhow::Result<()> { .with_component(&[Point3D::ZERO])? .with_component(&[Radius(radius)])? .with_component(&[ColorRGBA::from_rgb(color[0], color[1], color[2])])? - .send(session) + .send(rec_stream) .map_err(Into::into) } - log_point(session, "transforms3d/sun", 1.0, [255, 200, 10])?; - log_point(session, "transforms3d/sun/planet", 0.4, [40, 80, 200])?; + log_point(rec_stream, "transforms3d/sun", 1.0, [255, 200, 10])?; + log_point(rec_stream, "transforms3d/sun/planet", 0.4, [40, 80, 200])?; log_point( - session, + rec_stream, "transforms3d/sun/planet/moon", 0.15, [180, 180, 180], @@ -533,7 +537,7 @@ fn demo_transforms_3d(session: &Session) -> anyhow::Result<()> { .with_component(&points)? .with_splat(Radius(0.025))? .with_splat(ColorRGBA::from_rgb(80, 80, 80))? - .send(session)?; + .send(rec_stream)?; // paths where the planet & moon move let create_path = |distance: f32| { @@ -549,11 +553,11 @@ fn demo_transforms_3d(session: &Session) -> anyhow::Result<()> { MsgSender::new("transforms3d/sun/planet_path") .with_timepoint(sim_time(0 as _)) .with_component(&[create_path(sun_to_planet_distance)])? - .send(session)?; + .send(rec_stream)?; MsgSender::new("transforms3d/sun/planet/moon_path") .with_timepoint(sim_time(0 as _)) .with_component(&[create_path(planet_to_moon_distance)])? - .send(session)?; + .send(rec_stream)?; for i in 0..6 * 120 { let time = i as f32 / 120.0; @@ -571,7 +575,7 @@ fn demo_transforms_3d(session: &Session) -> anyhow::Result<()> { 0.0, ), })])? - .send(session)?; + .send(rec_stream)?; MsgSender::new("transforms3d/sun/planet/moon") .with_timepoint(sim_time(time as _)) @@ -583,7 +587,7 @@ fn demo_transforms_3d(session: &Session) -> anyhow::Result<()> { 0.0, ), })])? - .send(session)?; + .send(rec_stream)?; } Ok(()) @@ -629,7 +633,7 @@ struct Args { demo: Option>, } -fn run(session: &Session, args: &Args) -> anyhow::Result<()> { +fn run(rec_stream: &RecordingStream, args: &Args) -> anyhow::Result<()> { use clap::ValueEnum as _; let demos: HashSet = args.demo.as_ref().map_or_else( || Demo::value_variants().iter().copied().collect(), @@ -638,14 +642,14 @@ fn run(session: &Session, args: &Args) -> anyhow::Result<()> { for demo in demos { match demo { - Demo::BoundingBox => demo_bbox(session)?, - Demo::ExtensionComponents => demo_extension_components(session)?, - Demo::LogCleared => demo_log_cleared(session)?, - Demo::Points3D => demo_3d_points(session)?, - Demo::Rects => demo_rects(session)?, - Demo::Segmentation => demo_segmentation(session)?, - Demo::TextLogs => demo_text_logs(session)?, - Demo::Transforms3D => demo_transforms_3d(session)?, + Demo::BoundingBox => demo_bbox(rec_stream)?, + Demo::ExtensionComponents => demo_extension_components(rec_stream)?, + Demo::LogCleared => demo_log_cleared(rec_stream)?, + Demo::Points3D => demo_3d_points(rec_stream)?, + Demo::Rects => demo_rects(rec_stream)?, + Demo::Segmentation => demo_segmentation(rec_stream)?, + Demo::TextLogs => demo_text_logs(rec_stream)?, + Demo::Transforms3D => demo_transforms_3d(rec_stream)?, } } @@ -661,7 +665,7 @@ fn main() -> anyhow::Result<()> { let default_enabled = true; args.rerun .clone() - .run("api_demo_rs", default_enabled, move |session| { - run(&session, &args).unwrap(); + .run("api_demo_rs", default_enabled, move |rec_stream| { + run(&rec_stream, &args).unwrap(); }) } diff --git a/examples/rust/dna/src/main.rs b/examples/rust/dna/src/main.rs index 488808886119..ecc97445c41c 100644 --- a/examples/rust/dna/src/main.rs +++ b/examples/rust/dna/src/main.rs @@ -9,20 +9,20 @@ use rerun::{ demo_util::{bounce_lerp, color_spiral}, external::glam, time::{Time, TimeType, Timeline}, - MsgSender, MsgSenderError, Session, + MsgSender, MsgSenderError, RecordingStream, }; const NUM_POINTS: usize = 100; fn main() -> Result<(), Box> { let recording_info = rerun::new_recording_info("DNA Abacus"); - rerun::native_viewer::spawn(recording_info, |session| { - run(&session).unwrap(); + rerun::native_viewer::spawn(recording_info, Default::default(), |rec_stream| { + run(&rec_stream).unwrap(); })?; Ok(()) } -fn run(session: &Session) -> Result<(), MsgSenderError> { +fn run(rec_stream: &RecordingStream) -> Result<(), MsgSenderError> { let stable_time = Timeline::new("stable_time", TimeType::Time); let (points1, colors1) = color_spiral(NUM_POINTS, 2.0, 0.02, 0.0, 0.1); @@ -33,14 +33,14 @@ fn run(session: &Session) -> Result<(), MsgSenderError> { .with_component(&points1.iter().copied().map(Point3D::from).collect_vec())? .with_component(&colors1.iter().copied().map(ColorRGBA::from).collect_vec())? .with_splat(Radius(0.08))? - .send(session)?; + .send(rec_stream)?; MsgSender::new("dna/structure/right") .with_time(stable_time, 0) .with_component(&points2.iter().copied().map(Point3D::from).collect_vec())? .with_component(&colors2.iter().copied().map(ColorRGBA::from).collect_vec())? .with_splat(Radius(0.08))? - .send(session)?; + .send(rec_stream)?; let scaffolding = points1 .iter() @@ -55,7 +55,7 @@ fn run(session: &Session) -> Result<(), MsgSenderError> { .with_time(stable_time, 0) .with_component(&scaffolding)? .with_splat(ColorRGBA::from([128, 128, 128, 255]))? - .send(session)?; + .send(rec_stream)?; use rand::Rng as _; let mut rng = rand::thread_rng(); @@ -86,7 +86,7 @@ fn run(session: &Session) -> Result<(), MsgSenderError> { .with_component(&beads)? .with_component(&colors)? .with_splat(Radius(0.06))? - .send(session)?; + .send(rec_stream)?; MsgSender::new("dna/structure") .with_time(stable_time, Time::from_seconds_since_epoch(time as _)) @@ -97,7 +97,7 @@ fn run(session: &Session) -> Result<(), MsgSenderError> { )), ..Default::default() })])? - .send(session)?; + .send(rec_stream)?; } Ok(()) diff --git a/examples/rust/minimal/src/main.rs b/examples/rust/minimal/src/main.rs index 6fdce0ef1a48..6909c6d2b486 100644 --- a/examples/rust/minimal/src/main.rs +++ b/examples/rust/minimal/src/main.rs @@ -4,11 +4,11 @@ use rerun::{ components::{ColorRGBA, Point3D}, demo_util::grid, external::glam, - MsgSender, SessionBuilder, + MsgSender, RecordingStreamBuilder, }; fn main() -> Result<(), Box> { - let session = SessionBuilder::new("minimal_rs").buffered(); + let (rec_stream, storage) = RecordingStreamBuilder::new("minimal_rs").memory()?; let points = grid(glam::Vec3::splat(-5.0), glam::Vec3::splat(5.0), 10) .map(Point3D::from) @@ -20,9 +20,11 @@ fn main() -> Result<(), Box> { MsgSender::new("my_points") .with_component(&points)? .with_component(&colors)? - .send(&session)?; + .send(&rec_stream)?; - rerun::native_viewer::show(&session)?; + rec_stream.flush_blocking(); + + rerun::native_viewer::show(storage.take())?; Ok(()) } diff --git a/examples/rust/minimal_options/src/main.rs b/examples/rust/minimal_options/src/main.rs index c6045998e498..726ef1650ee3 100644 --- a/examples/rust/minimal_options/src/main.rs +++ b/examples/rust/minimal_options/src/main.rs @@ -7,7 +7,7 @@ use rerun::components::{ColorRGBA, Point3D}; use rerun::time::{TimeType, Timeline}; -use rerun::{external::re_log, MsgSender, Session}; +use rerun::{external::re_log, MsgSender, RecordingStream}; use rerun::demo_util::grid; @@ -24,7 +24,7 @@ struct Args { radius: f32, } -fn run(session: &Session, args: &Args) -> anyhow::Result<()> { +fn run(rec_stream: &RecordingStream, args: &Args) -> anyhow::Result<()> { let timeline_keyframe = Timeline::new("keyframe", TimeType::Sequence); let points = grid( @@ -46,7 +46,7 @@ fn run(session: &Session, args: &Args) -> anyhow::Result<()> { .with_component(&points)? .with_component(&colors)? .with_time(timeline_keyframe, 0) - .send(session)?; + .send(rec_stream)?; Ok(()) } @@ -60,7 +60,7 @@ fn main() -> anyhow::Result<()> { let default_enabled = true; args.rerun .clone() - .run("minimal_options", default_enabled, move |session| { - run(&session, &args).unwrap(); + .run("minimal_options", default_enabled, move |rec_stream| { + run(&rec_stream, &args).unwrap(); }) } diff --git a/examples/rust/objectron/src/main.rs b/examples/rust/objectron/src/main.rs index 203b581a0ec4..a2ecf235960e 100644 --- a/examples/rust/objectron/src/main.rs +++ b/examples/rust/objectron/src/main.rs @@ -21,7 +21,7 @@ use anyhow::{anyhow, Context as _}; use rerun::{ external::re_log, time::{Time, TimePoint, TimeType, Timeline}, - MsgSender, Session, + MsgSender, RecordingStream, }; // --- Rerun logging --- @@ -73,7 +73,7 @@ impl<'a> From<&'a [objectron::FrameAnnotation]> for AnnotationsPerFrame<'a> { } fn log_coordinate_space( - session: &Session, + rec_stream: &RecordingStream, ent_path: impl Into, axes: &str, ) -> anyhow::Result<()> { @@ -83,33 +83,36 @@ fn log_coordinate_space( MsgSender::new(ent_path) .with_timeless(true) .with_component(&[view_coords])? - .send(session) + .send(rec_stream) .map_err(Into::into) } fn log_ar_frame( - session: &Session, + rec_stream: &RecordingStream, annotations: &AnnotationsPerFrame<'_>, ar_frame: &ArFrame, ) -> anyhow::Result<()> { - log_video_frame(session, ar_frame)?; + log_video_frame(rec_stream, ar_frame)?; if let Some(ar_camera) = ar_frame.data.camera.as_ref() { - log_ar_camera(session, ar_frame.timepoint.clone(), ar_camera)?; + log_ar_camera(rec_stream, ar_frame.timepoint.clone(), ar_camera)?; } if let Some(points) = ar_frame.data.raw_feature_points.as_ref() { - log_feature_points(session, ar_frame.timepoint.clone(), points)?; + log_feature_points(rec_stream, ar_frame.timepoint.clone(), points)?; } if let Some(&annotations) = annotations.0.get(&ar_frame.index) { - log_frame_annotations(session, &ar_frame.timepoint, annotations)?; + log_frame_annotations(rec_stream, &ar_frame.timepoint, annotations)?; } Ok(()) } -fn log_baseline_objects(session: &Session, objects: &[objectron::Object]) -> anyhow::Result<()> { +fn log_baseline_objects( + rec_stream: &RecordingStream, + objects: &[objectron::Object], +) -> anyhow::Result<()> { use rerun::components::{Box3D, ColorRGBA, Label, Rigid3, Transform}; let boxes = objects.iter().filter_map(|object| { @@ -143,26 +146,26 @@ fn log_baseline_objects(session: &Session, objects: &[objectron::Object]) -> any .with_component(&[transform])? .with_component(&[label])? .with_splat(ColorRGBA::from_rgb(160, 230, 130))? - .send(session)?; + .send(rec_stream)?; } Ok(()) } -fn log_video_frame(session: &Session, ar_frame: &ArFrame) -> anyhow::Result<()> { +fn log_video_frame(rec_stream: &RecordingStream, ar_frame: &ArFrame) -> anyhow::Result<()> { let image_path = ar_frame.dir.join(format!("video/{}.jpg", ar_frame.index)); let tensor = rerun::components::Tensor::tensor_from_jpeg_file(image_path)?; MsgSender::new("world/camera/video") .with_timepoint(ar_frame.timepoint.clone()) .with_component(&[tensor])? - .send(session)?; + .send(rec_stream)?; Ok(()) } fn log_ar_camera( - session: &Session, + rec_stream: &RecordingStream, timepoint: TimePoint, ar_camera: &objectron::ArCamera, ) -> anyhow::Result<()> { @@ -196,20 +199,20 @@ fn log_ar_camera( rotation: rot.into(), translation: translation.into(), })])? - .send(session)?; + .send(rec_stream)?; MsgSender::new("world/camera/video") .with_timepoint(timepoint) .with_component(&[Transform::Pinhole(Pinhole { image_from_cam: intrinsics.into(), resolution: Some(resolution.into()), })])? - .send(session)?; + .send(rec_stream)?; Ok(()) } fn log_feature_points( - session: &Session, + rec_stream: &RecordingStream, timepoint: TimePoint, points: &objectron::ArPointCloud, ) -> anyhow::Result<()> { @@ -236,13 +239,13 @@ fn log_feature_points( .with_component(&points)? .with_component(&ids)? .with_splat(ColorRGBA::from_rgb(255, 255, 255))? - .send(session)?; + .send(rec_stream)?; Ok(()) } fn log_frame_annotations( - session: &Session, + rec_stream: &RecordingStream, timepoint: &TimePoint, annotations: &objectron::FrameAnnotation, ) -> anyhow::Result<()> { @@ -306,7 +309,7 @@ fn log_frame_annotations( .with_component(&points.into_iter().map(Point2D::from).collect::>())?; } - msg.send(session)?; + msg.send(rec_stream)?; } Ok(()) @@ -360,7 +363,7 @@ fn parse_duration(arg: &str) -> Result anyhow::Result<()> { +fn run(rec_stream: &RecordingStream, args: &Args) -> anyhow::Result<()> { // Parse protobuf dataset let rec_info = args.recording.info().with_context(|| { use clap::ValueEnum as _; @@ -374,10 +377,10 @@ fn run(session: &Session, args: &Args) -> anyhow::Result<()> { let annotations = read_annotations(&rec_info.path_annotations)?; // See https://github.com/google-research-datasets/Objectron/issues/39 - log_coordinate_space(session, "world", "RUB")?; - log_coordinate_space(session, "world/camera", "RDF")?; + log_coordinate_space(rec_stream, "world", "RUB")?; + log_coordinate_space(rec_stream, "world/camera", "RDF")?; - log_baseline_objects(session, &annotations.objects)?; + log_baseline_objects(rec_stream, &annotations.objects)?; let mut global_frame_offset = 0; let mut global_time_offset = 0.0; @@ -404,7 +407,7 @@ fn run(session: &Session, args: &Args) -> anyhow::Result<()> { ar_frame, ); let annotations = annotations.frame_annotations.as_slice().into(); - log_ar_frame(session, &annotations, &ar_frame)?; + log_ar_frame(rec_stream, &annotations, &ar_frame)?; if let Some(d) = args.per_frame_sleep { std::thread::sleep(d); @@ -434,8 +437,8 @@ fn main() -> anyhow::Result<()> { let default_enabled = true; args.rerun .clone() - .run("objectron_rs", default_enabled, move |session| { - run(&session, &args).unwrap(); + .run("objectron_rs", default_enabled, move |rec_stream| { + run(&rec_stream, &args).unwrap(); }) } diff --git a/examples/rust/raw_mesh/src/main.rs b/examples/rust/raw_mesh/src/main.rs index 5e0bd26001bc..3775b7153901 100644 --- a/examples/rust/raw_mesh/src/main.rs +++ b/examples/rust/raw_mesh/src/main.rs @@ -16,7 +16,7 @@ use rerun::components::{ColorRGBA, Mesh3D, MeshId, RawMesh3D, Transform, Vec4D, use rerun::time::{TimeType, Timeline}; use rerun::{ external::{re_log, re_memory::AccountingAllocator}, - EntityPath, MsgSender, Session, + EntityPath, MsgSender, RecordingStream, }; // TODO(cmc): This example needs to support animations to showcase Rerun's time capabilities. @@ -67,7 +67,7 @@ impl From for Transform { } /// Log a glTF node with Rerun. -fn log_node(session: &Session, node: GltfNode) -> anyhow::Result<()> { +fn log_node(rec_stream: &RecordingStream, node: GltfNode) -> anyhow::Result<()> { let ent_path = EntityPath::from(node.name.as_str()); // Convert glTF objects into Rerun components. @@ -83,19 +83,19 @@ fn log_node(session: &Session, node: GltfNode) -> anyhow::Result<()> { .with_time(timeline_keyframe, 0) .with_component(&primitives)? .with_component(transform.as_ref())? - .send(session)?; + .send(rec_stream)?; // Recurse through all of the node's children! for mut child in node.children { child.name = [node.name.as_str(), child.name.as_str()].join("/"); - log_node(session, child)?; + log_node(rec_stream, child)?; } Ok(()) } fn log_coordinate_space( - session: &Session, + rec_stream: &RecordingStream, ent_path: impl Into, axes: &str, ) -> anyhow::Result<()> { @@ -106,7 +106,7 @@ fn log_coordinate_space( MsgSender::new(ent_path) .with_timeless(true) .with_component(&[view_coords])? - .send(session) + .send(rec_stream) .map_err(Into::into) } @@ -171,7 +171,7 @@ impl Args { } } -fn run(session: &Session, args: &Args) -> anyhow::Result<()> { +fn run(rec_stream: &RecordingStream, args: &Args) -> anyhow::Result<()> { // Read glTF scene let (doc, buffers, _) = gltf::import_slice(Bytes::from(std::fs::read(args.scene_path()?)?))?; let nodes = load_gltf(&doc, &buffers); @@ -179,8 +179,8 @@ fn run(session: &Session, args: &Args) -> anyhow::Result<()> { // Log raw glTF nodes and their transforms with Rerun for root in nodes { re_log::info!(scene = root.name, "logging glTF scene"); - log_coordinate_space(session, root.name.as_str(), "RUB")?; - log_node(session, root)?; + log_coordinate_space(rec_stream, root.name.as_str(), "RUB")?; + log_node(rec_stream, root)?; } Ok(()) @@ -195,8 +195,8 @@ fn main() -> anyhow::Result<()> { let default_enabled = true; args.rerun .clone() - .run("raw_mesh_rs", default_enabled, move |session| { - run(&session, &args).unwrap(); + .run("raw_mesh_rs", default_enabled, move |rec_stream| { + run(&rec_stream, &args).unwrap(); }) } diff --git a/rerun_py/src/python_session.rs b/rerun_py/src/python_session.rs index af973f69d5dc..fe56aa0e9a7a 100644 --- a/rerun_py/src/python_session.rs +++ b/rerun_py/src/python_session.rs @@ -172,8 +172,8 @@ impl PythonSession { // Before changing the sink, we set drop_if_disconnected and // flush. This ensures that any messages that are currently // buffered will be sent. - self.sink.drop_msgs_if_disconnected(); - self.sink.flush(); + self.sink.drop_if_disconnected(); + self.sink.flush_blocking(); self.sink = sink; if backlog.is_empty() { @@ -243,7 +243,7 @@ impl PythonSession { /// Wait until all logged data have been sent to the remove server (if any). pub fn flush(&mut self) { - self.sink.flush(); + self.sink.flush_blocking(); } /// Send a single [`DataRow`]. diff --git a/tests/rust/test_image_memory/src/main.rs b/tests/rust/test_image_memory/src/main.rs index 4d42358f5a49..ea6868c0597a 100644 --- a/tests/rust/test_image_memory/src/main.rs +++ b/tests/rust/test_image_memory/src/main.rs @@ -14,13 +14,13 @@ fn main() -> Result<(), Box> { ); let recording_info = rerun::new_recording_info("test_image_memory_rs"); - rerun::native_viewer::spawn(recording_info, |session| { - log_images(&session).unwrap(); + rerun::native_viewer::spawn(recording_info, Default::default(), |rec_stream| { + log_images(&rec_stream).unwrap(); })?; Ok(()) } -fn log_images(session: &rerun::Session) -> Result<(), Box> { +fn log_images(rec_stream: &rerun::RecordingStream) -> Result<(), Box> { let (w, h) = (2048, 1024); let n = 100; @@ -36,9 +36,11 @@ fn log_images(session: &rerun::Session) -> Result<(), Box for _ in 0..n { rerun::MsgSender::new("image") .with_component(&[tensor.clone()])? - .send(session)?; + .send(rec_stream)?; } + rec_stream.flush_blocking(); + eprintln!( "Logged {n} {w}x{h} RGBA images = {}", re_format::format_bytes((n * w * h * 4) as _)