From 78fc9fcb293e238214c9e8960369eb1bdaadd9eb Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Thu, 6 Apr 2023 11:57:52 -0400 Subject: [PATCH] Always send recording_id as part of LogMsg (#1778) * Always send recording_id as part of LogMsg * Rename build_chunk_from_components -> build_data_table_from_components * Don't make RecordingInfo optional * Always default the recording id * Log an error if we hit the initialization issue --- crates/re_data_store/examples/memory_usage.rs | 13 +++- crates/re_data_store/src/log_db.rs | 4 +- .../benches/msg_encode_benchmark.rs | 29 +++---- crates/re_log_types/src/lib.rs | 20 +++-- crates/re_sdk/src/msg_sender.rs | 31 ++++++-- crates/re_sdk/src/session.rs | 36 +++++++-- crates/re_sdk_comms/src/server.rs | 6 +- crates/re_viewer/src/app.rs | 53 +++++++------ crates/re_viewer/src/ui/data_ui/log_msg.rs | 4 +- crates/re_viewer/src/ui/event_log_view.rs | 4 +- rerun_py/src/arrow.rs | 17 ++-- rerun_py/src/python_bridge.rs | 37 +++++---- rerun_py/src/python_session.rs | 77 +++++++++++-------- 13 files changed, 205 insertions(+), 126 deletions(-) diff --git a/crates/re_data_store/examples/memory_usage.rs b/crates/re_data_store/examples/memory_usage.rs index 601b9152cdeca..69ca6ef5d6a14 100644 --- a/crates/re_data_store/examples/memory_usage.rs +++ b/crates/re_data_store/examples/memory_usage.rs @@ -48,7 +48,7 @@ fn live_bytes() -> usize { // ---------------------------------------------------------------------------- -use re_log_types::{entity_path, DataRow, MsgId}; +use re_log_types::{entity_path, DataRow, MsgId, RecordingId}; fn main() { log_messages(); @@ -91,6 +91,7 @@ fn log_messages() { const NUM_POINTS: usize = 1_000; + let recording_id = RecordingId::random(); let timeline = Timeline::new_sequence("frame_nr"); let mut time_point = TimePoint::default(); time_point.insert(timeline, TimeInt::from(0)); @@ -116,7 +117,10 @@ fn log_messages() { .into_table(), ); let table_bytes = live_bytes() - used_bytes_start; - let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap())); + let log_msg = Box::new(LogMsg::ArrowMsg( + recording_id, + ArrowMsg::try_from(&*table).unwrap(), + )); let log_msg_bytes = live_bytes() - used_bytes_start; println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); let encoded = encode_log_msg(&log_msg); @@ -139,7 +143,10 @@ fn log_messages() { .into_table(), ); let table_bytes = live_bytes() - used_bytes_start; - let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap())); + let log_msg = Box::new(LogMsg::ArrowMsg( + recording_id, + ArrowMsg::try_from(&*table).unwrap(), + )); let log_msg_bytes = live_bytes() - used_bytes_start; println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); let encoded = encode_log_msg(&log_msg); diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index 50a5ce57a7036..4ae4002aae073 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -235,7 +235,7 @@ impl LogDb { match &msg { LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg), - LogMsg::EntityPathOpMsg(msg) => { + LogMsg::EntityPathOpMsg(_, msg) => { let EntityPathOpMsg { msg_id, time_point, @@ -243,7 +243,7 @@ impl LogDb { } = msg; self.entity_db.add_path_op(*msg_id, time_point, path_op); } - LogMsg::ArrowMsg(inner) => self.entity_db.try_add_arrow_msg(inner)?, + LogMsg::ArrowMsg(_, inner) => self.entity_db.try_add_arrow_msg(inner)?, LogMsg::Goodbye(_) => {} } diff --git a/crates/re_log_encoding/benches/msg_encode_benchmark.rs b/crates/re_log_encoding/benches/msg_encode_benchmark.rs index 90a4b323feb28..fb36e7bc34d74 100644 --- a/crates/re_log_encoding/benches/msg_encode_benchmark.rs +++ b/crates/re_log_encoding/benches/msg_encode_benchmark.rs @@ -6,7 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use re_log_types::{ datagen::{build_frame_nr, build_some_colors, build_some_point2d}, - entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, + entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, RecordingId, }; use criterion::{criterion_group, criterion_main, Criterion}; @@ -42,10 +42,10 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec { messages } -fn generate_messages(tables: &[DataTable]) -> Vec { +fn generate_messages(recording_id: RecordingId, tables: &[DataTable]) -> Vec { tables .iter() - .map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table).unwrap())) + .map(|table| LogMsg::ArrowMsg(recording_id, ArrowMsg::try_from(table).unwrap())) .collect() } @@ -53,7 +53,7 @@ fn decode_tables(messages: &[LogMsg]) -> Vec { messages .iter() .map(|log_msg| { - if let LogMsg::ArrowMsg(arrow_msg) = log_msg { + if let LogMsg::ArrowMsg(_, arrow_msg) = log_msg { DataTable::try_from(arrow_msg).unwrap() } else { unreachable!() @@ -81,6 +81,7 @@ fn mono_points_arrow(c: &mut Criterion) { } { + let recording_id = RecordingId::random(); let mut group = c.benchmark_group("mono_points_arrow"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); group.bench_function("generate_message_bundles", |b| { @@ -88,14 +89,14 @@ fn mono_points_arrow(c: &mut Criterion) { }); let tables = generate_tables(); group.bench_function("generate_messages", |b| { - b.iter(|| generate_messages(&tables)); + b.iter(|| generate_messages(recording_id, &tables)); }); - let messages = generate_messages(&tables); + let messages = generate_messages(recording_id, &tables); group.bench_function("encode_log_msg", |b| { b.iter(|| encode_log_msgs(&messages)); }); group.bench_function("encode_total", |b| { - b.iter(|| encode_log_msgs(&generate_messages(&generate_tables()))); + b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables()))); }); let encoded = encode_log_msgs(&messages); @@ -136,6 +137,7 @@ fn mono_points_arrow_batched(c: &mut Criterion) { } { + let recording_id = RecordingId::random(); let mut group = c.benchmark_group("mono_points_arrow_batched"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); group.bench_function("generate_message_bundles", |b| { @@ -143,14 +145,14 @@ fn mono_points_arrow_batched(c: &mut Criterion) { }); let tables = [generate_table()]; group.bench_function("generate_messages", |b| { - b.iter(|| generate_messages(&tables)); + b.iter(|| generate_messages(recording_id, &tables)); }); - let messages = generate_messages(&tables); + let messages = generate_messages(recording_id, &tables); group.bench_function("encode_log_msg", |b| { b.iter(|| encode_log_msgs(&messages)); }); group.bench_function("encode_total", |b| { - b.iter(|| encode_log_msgs(&generate_messages(&[generate_table()]))); + b.iter(|| encode_log_msgs(&generate_messages(recording_id, &[generate_table()]))); }); let encoded = encode_log_msgs(&messages); @@ -192,6 +194,7 @@ fn batch_points_arrow(c: &mut Criterion) { } { + let recording_id = RecordingId::random(); let mut group = c.benchmark_group("batch_points_arrow"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); group.bench_function("generate_message_bundles", |b| { @@ -199,14 +202,14 @@ fn batch_points_arrow(c: &mut Criterion) { }); let tables = generate_tables(); group.bench_function("generate_messages", |b| { - b.iter(|| generate_messages(&tables)); + b.iter(|| generate_messages(recording_id, &tables)); }); - let messages = generate_messages(&tables); + let messages = generate_messages(recording_id, &tables); group.bench_function("encode_log_msg", |b| { b.iter(|| encode_log_msgs(&messages)); }); group.bench_function("encode_total", |b| { - b.iter(|| encode_log_msgs(&generate_messages(&generate_tables()))); + b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables()))); }); let encoded = encode_log_msgs(&messages); diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index e7c45feba34cf..bf258e417ad1d 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -169,10 +169,10 @@ pub enum LogMsg { BeginRecordingMsg(BeginRecordingMsg), /// Server-backed operation on an [`EntityPath`]. - EntityPathOpMsg(EntityPathOpMsg), + EntityPathOpMsg(RecordingId, EntityPathOpMsg), /// Log an entity using an [`ArrowMsg`]. - ArrowMsg(ArrowMsg), + ArrowMsg(RecordingId, ArrowMsg), /// Sent when the client shuts down the connection. Goodbye(MsgId), @@ -182,19 +182,27 @@ impl LogMsg { pub fn id(&self) -> MsgId { match self { Self::BeginRecordingMsg(msg) => msg.msg_id, - Self::EntityPathOpMsg(msg) => msg.msg_id, + Self::EntityPathOpMsg(_, msg) => msg.msg_id, Self::Goodbye(msg_id) => *msg_id, // TODO(#1619): the following only makes sense because, while we support sending and // receiving batches, we don't actually do so yet. // We need to stop storing raw `LogMsg`s before we can benefit from our batching. - Self::ArrowMsg(msg) => msg.table_id, + Self::ArrowMsg(_, msg) => msg.table_id, + } + } + + pub fn recording_id(&self) -> Option<&RecordingId> { + match self { + Self::BeginRecordingMsg(msg) => Some(&msg.info.recording_id), + Self::EntityPathOpMsg(recording_id, _) | Self::ArrowMsg(recording_id, _) => { + Some(recording_id) + } + Self::Goodbye(_) => None, } } } impl_into_enum!(BeginRecordingMsg, LogMsg, BeginRecordingMsg); -impl_into_enum!(EntityPathOpMsg, LogMsg, EntityPathOpMsg); -impl_into_enum!(ArrowMsg, LogMsg, ArrowMsg); // ---------------------------------------------------------------------------- diff --git a/crates/re_sdk/src/msg_sender.rs b/crates/re_sdk/src/msg_sender.rs index 790a06e71bd41..9ab867a3d452f 100644 --- a/crates/re_sdk/src/msg_sender.rs +++ b/crates/re_sdk/src/msg_sender.rs @@ -1,11 +1,13 @@ -use re_log_types::{component_types::InstanceKey, DataRow, DataTableError}; +use std::borrow::Borrow; + +use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RecordingId}; use crate::{ components::Transform, log::{DataCell, LogMsg, MsgId}, sink::LogSink, time::{Time, TimeInt, TimePoint, Timeline}, - Component, EntityPath, SerializableComponent, + Component, EntityPath, SerializableComponent, Session, }; // TODO(#1619): Rust SDK batching @@ -229,13 +231,17 @@ impl MsgSender { /// Consumes, packs, sanity checks and finally sends the message to the currently configured /// target of the SDK. - pub fn send(self, sink: &impl std::borrow::Borrow) -> Result<(), DataTableError> { - self.send_to_sink(sink.borrow()) + pub fn send(self, session: &Session) -> Result<(), DataTableError> { + self.send_to_sink(session.recording_id(), session.borrow()) } /// Consumes, packs, sanity checks and finally sends the message to the currently configured /// target of the SDK. - fn send_to_sink(self, sink: &dyn LogSink) -> Result<(), DataTableError> { + fn send_to_sink( + self, + recording_id: RecordingId, + sink: &dyn LogSink, + ) -> Result<(), DataTableError> { if !sink.is_enabled() { return Ok(()); // silently drop the message } @@ -243,15 +249,24 @@ impl MsgSender { let [row_standard, row_transforms, row_splats] = self.into_rows(); if let Some(row_transforms) = row_transforms { - sink.send(LogMsg::ArrowMsg((&row_transforms.into_table()).try_into()?)); + sink.send(LogMsg::ArrowMsg( + recording_id, + (&row_transforms.into_table()).try_into()?, + )); } if let Some(row_splats) = row_splats { - sink.send(LogMsg::ArrowMsg((&row_splats.into_table()).try_into()?)); + sink.send(LogMsg::ArrowMsg( + recording_id, + (&row_splats.into_table()).try_into()?, + )); } // Always the primary component last so range-based queries will include the other data. // Since the primary component can't be splatted it must be in msg_standard, see(#1215). if let Some(row_standard) = row_standard { - sink.send(LogMsg::ArrowMsg((&row_standard.into_table()).try_into()?)); + sink.send(LogMsg::ArrowMsg( + recording_id, + (&row_standard.into_table()).try_into()?, + )); } Ok(()) diff --git a/crates/re_sdk/src/session.rs b/crates/re_sdk/src/session.rs index 7b8dbd60b74ff..94d336b30fccb 100644 --- a/crates/re_sdk/src/session.rs +++ b/crates/re_sdk/src/session.rs @@ -189,6 +189,7 @@ impl SessionBuilder { #[must_use] #[derive(Clone)] pub struct Session { + recording_info: RecordingInfo, sink: Arc, // TODO(emilk): add convenience `TimePoint` here so that users can // do things like `session.set_time_sequence("frame", frame_idx);` @@ -222,13 +223,16 @@ impl Session { sink.send( re_log_types::BeginRecordingMsg { msg_id: re_log_types::MsgId::random(), - info: recording_info, + info: recording_info.clone(), } .into(), ); } - Self { sink: sink.into() } + Self { + recording_info, + sink: sink.into(), + } } /// Construct a new session with a disabled "dummy" sink that drops all logging messages. @@ -236,6 +240,16 @@ impl Session { /// [`Self::is_enabled`] will return `false`. pub fn disabled() -> Self { Self { + recording_info: RecordingInfo { + application_id: ApplicationId::unknown(), + recording_id: Default::default(), + is_official_example: crate::called_from_official_rust_example(), + started: Time::now(), + recording_source: RecordingSource::RustSdk { + rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(), + llvm_version: env!("RE_BUILD_LLVM_VERSION").into(), + }, + }, sink: crate::sink::disabled().into(), } } @@ -272,17 +286,25 @@ impl Session { time_point: &re_log_types::TimePoint, path_op: re_log_types::PathOp, ) { - self.send(LogMsg::EntityPathOpMsg(re_log_types::EntityPathOpMsg { - msg_id: re_log_types::MsgId::random(), - time_point: time_point.clone(), - path_op, - })); + self.send(LogMsg::EntityPathOpMsg( + self.recording_id(), + re_log_types::EntityPathOpMsg { + msg_id: re_log_types::MsgId::random(), + time_point: time_point.clone(), + path_op, + }, + )); } /// Drain all buffered [`LogMsg`]es and return them. pub fn drain_backlog(&self) -> Vec { self.sink.drain_backlog() } + + /// The current [`RecordingId`]. + pub fn recording_id(&self) -> RecordingId { + self.recording_info.recording_id + } } impl AsRef for Session { diff --git a/crates/re_sdk_comms/src/server.rs b/crates/re_sdk_comms/src/server.rs index 71c7c786f7637..fd1ceca50ea07 100644 --- a/crates/re_sdk_comms/src/server.rs +++ b/crates/re_sdk_comms/src/server.rs @@ -209,9 +209,11 @@ impl CongestionManager { #[allow(clippy::match_same_arms)] match msg { // we don't want to drop any of these - LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_) | LogMsg::Goodbye(_) => true, + LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_, _) | LogMsg::Goodbye(_) => { + true + } - LogMsg::ArrowMsg(arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max), + LogMsg::ArrowMsg(_, arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max), } } diff --git a/crates/re_viewer/src/app.rs b/crates/re_viewer/src/app.rs index 7f868f1d1692e..ad5459df9dc54 100644 --- a/crates/re_viewer/src/app.rs +++ b/crates/re_viewer/src/app.rs @@ -677,34 +677,37 @@ impl App { let start = instant::Instant::now(); while let Ok(msg) = self.rx.try_recv() { - let is_new_recording = if let LogMsg::BeginRecordingMsg(msg) = &msg { - re_log::debug!("Opening a new recording: {:?}", msg.info); - self.state.selected_rec_id = msg.info.recording_id; - true - } else { - false - }; + // All messages except [`LogMsg::GoodBye`] should have an associated recording id + if let Some(recording_id) = msg.recording_id() { + let is_new_recording = if let LogMsg::BeginRecordingMsg(msg) = &msg { + re_log::debug!("Opening a new recording: {:?}", msg.info); + self.state.selected_rec_id = msg.info.recording_id; + true + } else { + false + }; - let log_db = self.log_dbs.entry(self.state.selected_rec_id).or_default(); + let log_db = self.log_dbs.entry(*recording_id).or_default(); - if log_db.data_source.is_none() { - log_db.data_source = Some(self.rx.source().clone()); - } + if log_db.data_source.is_none() { + log_db.data_source = Some(self.rx.source().clone()); + } - if let Err(err) = log_db.add(msg) { - re_log::error!("Failed to add incoming msg: {err}"); - }; + if let Err(err) = log_db.add(msg) { + re_log::error!("Failed to add incoming msg: {err}"); + }; - if is_new_recording { - // Do analytics after ingesting the new message, - // because thats when the `log_db.recording_info` is set, - // which we use in the analytics call. - self.analytics.on_open_recording(log_db); - } + if is_new_recording { + // Do analytics after ingesting the new message, + // because thats when the `log_db.recording_info` is set, + // which we use in the analytics call. + self.analytics.on_open_recording(log_db); + } - if start.elapsed() > instant::Duration::from_millis(10) { - egui_ctx.request_repaint(); // make sure we keep receiving messages asap - break; // don't block the main thread for too long + if start.elapsed() > instant::Duration::from_millis(10) { + egui_ctx.request_repaint(); // make sure we keep receiving messages asap + break; // don't block the main thread for too long + } } } } @@ -1767,7 +1770,7 @@ fn save_database_to_file( LogMsg::BeginRecordingMsg(_) | LogMsg::Goodbye(_) => { true // timeless } - LogMsg::EntityPathOpMsg(EntityPathOpMsg { time_point, .. }) => { + LogMsg::EntityPathOpMsg(_, EntityPathOpMsg { time_point, .. }) => { time_point.is_timeless() || { let is_within_range = time_point .get(&timeline) @@ -1775,7 +1778,7 @@ fn save_database_to_file( is_within_range } } - LogMsg::ArrowMsg(_) => { + LogMsg::ArrowMsg(_, _) => { // TODO(john) false } diff --git a/crates/re_viewer/src/ui/data_ui/log_msg.rs b/crates/re_viewer/src/ui/data_ui/log_msg.rs index b536284f6eebe..5c19a09fa3b8d 100644 --- a/crates/re_viewer/src/ui/data_ui/log_msg.rs +++ b/crates/re_viewer/src/ui/data_ui/log_msg.rs @@ -16,8 +16,8 @@ impl DataUi for LogMsg { ) { match self { LogMsg::BeginRecordingMsg(msg) => msg.data_ui(ctx, ui, verbosity, query), - LogMsg::EntityPathOpMsg(msg) => msg.data_ui(ctx, ui, verbosity, query), - LogMsg::ArrowMsg(msg) => msg.data_ui(ctx, ui, verbosity, query), + LogMsg::EntityPathOpMsg(_, msg) => msg.data_ui(ctx, ui, verbosity, query), + LogMsg::ArrowMsg(_, msg) => msg.data_ui(ctx, ui, verbosity, query), LogMsg::Goodbye(_) => { ui.label("Goodbye"); } diff --git a/crates/re_viewer/src/ui/event_log_view.rs b/crates/re_viewer/src/ui/event_log_view.rs index 94c96edddfe8a..89aea702faa2d 100644 --- a/crates/re_viewer/src/ui/event_log_view.rs +++ b/crates/re_viewer/src/ui/event_log_view.rs @@ -141,7 +141,7 @@ fn table_row( ui.monospace(format!("{application_id} - {recording_id:?}")); }); } - LogMsg::EntityPathOpMsg(msg) => { + LogMsg::EntityPathOpMsg(_, msg) => { let EntityPathOpMsg { msg_id, time_point, @@ -176,7 +176,7 @@ fn table_row( // NOTE: This really only makes sense because we don't yet have batches with more than a // single row at the moment... and by the time we do, the event log view will have // disappeared entirely. - LogMsg::ArrowMsg(msg) => match DataTable::try_from(msg) { + LogMsg::ArrowMsg(_, msg) => match DataTable::try_from(msg) { Ok(table) => { for datarow in table.as_rows() { row.col(|ui| { diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 8e75cc60bc4d1..24cf0e43beb1b 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -9,9 +9,7 @@ use pyo3::{ types::{IntoPyDict, PyString}, PyAny, PyResult, }; -use re_log_types::{ - component_types, DataCell, DataRow, DataTableError, EntityPath, LogMsg, MsgId, TimePoint, -}; +use re_log_types::{component_types, DataCell, DataRow, DataTable, EntityPath, MsgId, TimePoint}; /// Perform conversion between a pyarrow array to arrow2 types. /// @@ -82,13 +80,12 @@ pub fn get_registered_component_names(py: pyo3::Python<'_>) -> PyResult<&PyDict> Ok(fields.into_py_dict(py)) } -/// Build a [`LogMsg`] and vector of [`Field`] given a '**kwargs'-style dictionary of -/// component arrays. -pub fn build_chunk_from_components( +/// Build a [`DataTable`] given a '**kwargs'-style dictionary of component arrays. +pub fn build_data_table_from_components( entity_path: &EntityPath, components: &PyDict, time_point: &TimePoint, -) -> PyResult { +) -> PyResult { let (arrays, fields): (Vec>, Vec) = itertools::process_results( components.iter().map(|(name, array)| { let name = name.downcast::()?.to_str()?; @@ -112,9 +109,7 @@ pub fn build_chunk_from_components( cells, ); - let msg = (&row.into_table()) - .try_into() - .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; + let data_table = row.into_table(); - Ok(LogMsg::ArrowMsg(msg)) + Ok(data_table) } diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index d42f3ef61a1a8..02926a8b51cef 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -11,9 +11,9 @@ use pyo3::{ types::PyDict, }; -use re_log_types::{DataRow, DataTableError}; +use re_log_types::{ArrowMsg, DataRow, DataTableError}; use rerun::{ - log::{LogMsg, MsgId, PathOp}, + log::{MsgId, PathOp}, time::{Time, TimeInt, TimePoint, TimeType, Timeline}, ApplicationId, EntityPath, RecordingId, }; @@ -243,10 +243,13 @@ fn main(py: Python<'_>, argv: Vec) -> PyResult { #[pyfunction] fn get_recording_id() -> PyResult { - python_session() - .recording_id() - .ok_or_else(|| PyTypeError::new_err("module has not been initialized")) - .map(|recording_id| recording_id.to_string()) + let recording_id = python_session().recording_id(); + + if recording_id == RecordingId::ZERO { + Err(PyTypeError::new_err("module has not been initialized")) + } else { + Ok(recording_id.to_string()) + } } #[pyfunction] @@ -485,7 +488,7 @@ fn log_transform( .try_into() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - session.send(LogMsg::ArrowMsg(msg)); + session.send_arrow_msg(msg); Ok(()) } @@ -569,7 +572,7 @@ fn log_view_coordinates( .try_into() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - session.send(LogMsg::ArrowMsg(msg)); + session.send_arrow_msg(msg); Ok(()) } @@ -703,7 +706,7 @@ fn log_meshes( .try_into() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - session.send(LogMsg::ArrowMsg(msg)); + session.send_arrow_msg(msg); Ok(()) } @@ -784,7 +787,7 @@ fn log_mesh_file( .try_into() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - session.send(LogMsg::ArrowMsg(msg)); + session.send_arrow_msg(msg); Ok(()) } @@ -876,7 +879,7 @@ fn log_image_file( .try_into() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - session.send(LogMsg::ArrowMsg(msg)); + session.send_arrow_msg(msg); Ok(()) } @@ -955,7 +958,7 @@ fn log_annotation_context( .try_into() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - session.send(LogMsg::ArrowMsg(msg)); + session.send_arrow_msg(msg); Ok(()) } @@ -979,10 +982,16 @@ fn log_arrow_msg(entity_path: &str, components: &PyDict, timeless: bool) -> PyRe // It's important that we don't hold the session lock while building our arrow component. // the API we call to back through pyarrow temporarily releases the GIL, which can cause // cause a deadlock. - let msg = crate::arrow::build_chunk_from_components(&entity_path, components, &time(timeless))?; + let data_table = + crate::arrow::build_data_table_from_components(&entity_path, components, &time(timeless))?; let mut session = python_session(); - session.send(msg); + + let msg: ArrowMsg = (&data_table) + .try_into() + .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; + + session.send_arrow_msg(msg); Ok(()) } diff --git a/rerun_py/src/python_session.rs b/rerun_py/src/python_session.rs index 9b4a1da61c7eb..a44a7619747b2 100644 --- a/rerun_py/src/python_session.rs +++ b/rerun_py/src/python_session.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use re_log_types::{ - ApplicationId, BeginRecordingMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, + ApplicationId, ArrowMsg, BeginRecordingMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, RecordingSource, Time, TimePoint, }; @@ -13,7 +13,7 @@ use rerun::sink::LogSink; struct RecordingMetaData { recording_source: RecordingSource, application_id: Option, - recording_id: Option, + recording_id: RecordingId, is_official_example: Option, } @@ -23,28 +23,30 @@ impl Default for RecordingMetaData { // Will be filled in when we initialize the `rerun` python module. recording_source: RecordingSource::Unknown, application_id: Default::default(), - recording_id: Default::default(), + // TODO(https://github.com/rerun-io/rerun/issues/1792): ZERO is not a great choice + // here. Ideally we would use `default_recording_id(py)` instead. + recording_id: RecordingId::ZERO, is_official_example: Default::default(), } } } impl RecordingMetaData { - pub fn to_recording_info(&self) -> Option { - let recording_id = self.recording_id?; + pub fn to_recording_info(&self) -> RecordingInfo { + let recording_id = self.recording_id; let application_id = self .application_id .clone() .unwrap_or_else(ApplicationId::unknown); - Some(RecordingInfo { + RecordingInfo { application_id, recording_id, is_official_example: self.is_official_example.unwrap_or(false), started: Time::now(), recording_source: self.recording_source.clone(), - }) + } } } @@ -116,8 +118,8 @@ impl PythonSession { } } - /// The current [`RecordingId`], if set. - pub fn recording_id(&self) -> Option { + /// The current [`RecordingId`]. + pub fn recording_id(&self) -> RecordingId { self.recording_meta_data.recording_id } @@ -130,8 +132,8 @@ impl PythonSession { /// Note that many recordings can share the same [`ApplicationId`], but /// they all have unique [`RecordingId`]s. pub fn set_recording_id(&mut self, recording_id: RecordingId) { - if self.recording_meta_data.recording_id != Some(recording_id) { - self.recording_meta_data.recording_id = Some(recording_id); + if self.recording_meta_data.recording_id != recording_id { + self.recording_meta_data.recording_id = recording_id; self.has_sent_begin_recording_msg = false; } } @@ -205,33 +207,46 @@ impl PythonSession { } if !self.has_sent_begin_recording_msg { - if let Some(info) = self.recording_meta_data.to_recording_info() { - re_log::debug!( - "Beginning new recording with application_id {:?} and recording id {}", - info.application_id.0, - info.recording_id - ); - - self.sink.send( - BeginRecordingMsg { - msg_id: MsgId::random(), - info, - } - .into(), - ); - self.has_sent_begin_recording_msg = true; + let info = self.recording_meta_data.to_recording_info(); + + // This shouldn't happen, but at least log an error if it does. + // See: https://github.com/rerun-io/rerun/issues/1792 + if info.recording_id == RecordingId::ZERO { + re_log::error_once!("RecordingId was still ZERO when sent to server. This is a python initialization bug."); } + + re_log::debug!( + "Beginning new recording with application_id {:?} and recording id {}", + info.application_id.0, + info.recording_id + ); + + self.sink.send( + BeginRecordingMsg { + msg_id: MsgId::random(), + info, + } + .into(), + ); + self.has_sent_begin_recording_msg = true; } self.sink.send(log_msg); } + pub fn send_arrow_msg(&mut self, arrow_msg: ArrowMsg) { + self.send(LogMsg::ArrowMsg(self.recording_id(), arrow_msg)); + } + /// Send a [`PathOp`]. pub fn send_path_op(&mut self, time_point: &TimePoint, path_op: PathOp) { - self.send(LogMsg::EntityPathOpMsg(re_log_types::EntityPathOpMsg { - msg_id: MsgId::random(), - time_point: time_point.clone(), - path_op, - })); + self.send(LogMsg::EntityPathOpMsg( + self.recording_id(), + re_log_types::EntityPathOpMsg { + msg_id: MsgId::random(), + time_point: time_point.clone(), + path_op, + }, + )); } }