diff --git a/crates/re_arrow_store/Cargo.toml b/crates/re_arrow_store/Cargo.toml index 5948230d22c8..8b1b1c8b62a5 100644 --- a/crates/re_arrow_store/Cargo.toml +++ b/crates/re_arrow_store/Cargo.toml @@ -6,6 +6,7 @@ rust-version.workspace = true license.workspace = true publish = false + [features] default = [] ## Enables `parking_lot`'s deadlock detection background thread. @@ -13,6 +14,7 @@ deadlock_detection = ["parking_lot/deadlock_detection"] ## Integration with `polars`, to efficiently use the datastore with dataframes. polars = ["dep:polars-core"] + [dependencies] # Rerun re_format = { path = "../re_format" } @@ -36,11 +38,15 @@ thiserror.workspace = true # Optional polars-core = { workspace = true, optional = true, features = [ + "diagonal_concat", "dtype-date", + "dtype-datetime", "dtype-time", "dtype-struct", + "sort_multiple", ] } + [dev-dependencies] criterion = "0.4" mimalloc = "0.1" @@ -52,9 +58,32 @@ polars-core = { workspace = true, features = [ ] } tracing-subscriber = "0.3" + [lib] bench = false + +[[example]] +name = "dump_dataframe" +path = "examples/dump_dataframe.rs" +required-features = ["polars"] + +[[example]] +name = "latest_component" +path = "examples/latest_component.rs" +required-features = ["polars"] + +[[example]] +name = "latest_components" +path = "examples/latest_components.rs" +required-features = ["polars"] + +[[example]] +name = "range_components" +path = "examples/range_components.rs" +required-features = ["polars"] + + [[bench]] name = "data_store" harness = false diff --git a/crates/re_arrow_store/examples/dump_dataframe.rs b/crates/re_arrow_store/examples/dump_dataframe.rs new file mode 100644 index 000000000000..3095258230a5 --- /dev/null +++ b/crates/re_arrow_store/examples/dump_dataframe.rs @@ -0,0 +1,71 @@ +//! Demonstrates usage of [`DataStore::to_dataframe`]. +//! +//! ```text +//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example dump_dataframe +//! ``` + +use re_arrow_store::{test_bundle, DataStore}; +use re_log_types::{ + datagen::{ + build_frame_nr, build_log_time, build_some_instances, build_some_instances_from, + build_some_point2d, build_some_rects, + }, + field_types::Instance, + msg_bundle::Component as _, + ObjPath as EntityPath, Time, +}; + +// --- + +fn main() { + let mut store = DataStore::new(Instance::name(), Default::default()); + + let ent_paths = [ + EntityPath::from("this/that"), + EntityPath::from("and/this/other/thing"), + ]; + + for ent_path in &ent_paths { + let bundle1 = test_bundle!(ent_path @ [ + build_frame_nr(1.into()), build_log_time(Time::now()), + ] => [build_some_instances(2), build_some_rects(2)]); + store.insert(&bundle1).unwrap(); + } + + for ent_path in &ent_paths { + let bundle2 = test_bundle!(ent_path @ [ + build_frame_nr(2.into()) + ] => [build_some_instances(2), build_some_point2d(2)]); + store.insert(&bundle2).unwrap(); + + let bundle3 = test_bundle!(ent_path @ [ + build_frame_nr(3.into()), build_log_time(Time::now()), + ] => [build_some_instances_from(25..29), build_some_point2d(4)]); + store.insert(&bundle3).unwrap(); + } + + for ent_path in &ent_paths { + let bundle4_1 = test_bundle!(ent_path @ [ + build_frame_nr(4.into()), build_log_time(Time::now()), + ] => [build_some_instances_from(20..23), build_some_rects(3)]); + store.insert(&bundle4_1).unwrap(); + + let bundle4_15 = test_bundle!(ent_path @ [ + build_frame_nr(4.into()), + ] => [build_some_instances_from(20..23), build_some_point2d(3)]); + store.insert(&bundle4_15).unwrap(); + + let bundle4_2 = test_bundle!(ent_path @ [ + build_frame_nr(4.into()), build_log_time(Time::now()), + ] => [build_some_instances_from(25..28), build_some_rects(3)]); + store.insert(&bundle4_2).unwrap(); + + let bundle4_25 = test_bundle!(ent_path @ [ + build_frame_nr(4.into()), build_log_time(Time::now()), + ] => [build_some_instances_from(25..28), build_some_point2d(3)]); + store.insert(&bundle4_25).unwrap(); + } + + let df = store.to_dataframe(); + println!("{df}"); +} diff --git a/crates/re_arrow_store/examples/latest_component.rs b/crates/re_arrow_store/examples/latest_component.rs new file mode 100644 index 000000000000..a36d543a141a --- /dev/null +++ b/crates/re_arrow_store/examples/latest_component.rs @@ -0,0 +1,54 @@ +//! Demonstrates usage of [`re_arrow_store::polars_util::latest_component`]. +//! +//! ```text +//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example latest_component +//! ``` + +use re_arrow_store::polars_util::latest_component; +use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline}; +use re_log_types::datagen::build_some_rects; +use re_log_types::field_types::Rect2D; +use re_log_types::{ + datagen::{build_frame_nr, build_some_point2d}, + field_types::{Instance, Point2D}, + msg_bundle::Component, + ObjPath as EntityPath, +}; + +fn main() { + let mut store = DataStore::new(Instance::name(), Default::default()); + + let ent_path = EntityPath::from("my/entity"); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]); + store.insert(&bundle).unwrap(); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]); + store.insert(&bundle).unwrap(); + + let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); + + println!("Store contents:\n{}", store.to_dataframe()); + + println!("\n-----\n"); + + let df = latest_component( + &store, + &LatestAtQuery::new(timeline_frame_nr, 10.into()), + &ent_path, + Rect2D::name(), + ) + .unwrap(); + println!("Query results from {:?}'s PoV:\n{df}", Rect2D::name()); + + println!("\n-----\n"); + + let df = latest_component( + &store, + &LatestAtQuery::new(timeline_frame_nr, 10.into()), + &ent_path, + Point2D::name(), + ) + .unwrap(); + println!("Query results from {:?}'s PoV:\n{df}", Point2D::name()); +} diff --git a/crates/re_arrow_store/examples/latest_components.rs b/crates/re_arrow_store/examples/latest_components.rs new file mode 100644 index 000000000000..23110d213bb1 --- /dev/null +++ b/crates/re_arrow_store/examples/latest_components.rs @@ -0,0 +1,40 @@ +//! Demonstrates usage of [`re_arrow_store::polars_util::latest_components`]. +//! +//! ```text +//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example latest_components +//! ``` + +use polars_core::prelude::*; +use re_arrow_store::polars_util::latest_components; +use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline}; +use re_log_types::{ + datagen::{build_frame_nr, build_some_point2d, build_some_rects}, + field_types::{Instance, Point2D, Rect2D}, + msg_bundle::Component, + ObjPath as EntityPath, +}; + +fn main() { + let mut store = DataStore::new(Instance::name(), Default::default()); + + let ent_path = EntityPath::from("my/entity"); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]); + store.insert(&bundle).unwrap(); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]); + store.insert(&bundle).unwrap(); + + let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); + let df = latest_components( + &store, + &LatestAtQuery::new(timeline_frame_nr, 10.into()), + &ent_path, + &[Point2D::name(), Rect2D::name()], + &JoinType::Outer, + ) + .unwrap(); + + println!("Store contents:\n{}", store.to_dataframe()); + println!("Query results:\n{df}"); +} diff --git a/crates/re_arrow_store/examples/range_components.rs b/crates/re_arrow_store/examples/range_components.rs new file mode 100644 index 000000000000..ea6b07468aca --- /dev/null +++ b/crates/re_arrow_store/examples/range_components.rs @@ -0,0 +1,92 @@ +//! Demonstrates usage of [`re_arrow_store::polars_util::range_components`]. +//! +//! ```text +//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example range_components +//! ``` + +use polars_core::prelude::JoinType; +use re_arrow_store::{polars_util, test_bundle, DataStore, RangeQuery, TimeRange}; +use re_log_types::{ + datagen::{build_frame_nr, build_some_point2d, build_some_rects}, + field_types::{Instance, Point2D, Rect2D}, + msg_bundle::Component as _, + ObjPath as EntityPath, TimeType, Timeline, +}; + +fn main() { + let mut store = DataStore::new(Instance::name(), Default::default()); + + let ent_path = EntityPath::from("this/that"); + + let frame1 = 1.into(); + let frame2 = 2.into(); + let frame3 = 3.into(); + let frame4 = 4.into(); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]); + store.insert(&bundle).unwrap(); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_point2d(2)]); + store.insert(&bundle).unwrap(); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(4)]); + store.insert(&bundle).unwrap(); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]); + store.insert(&bundle).unwrap(); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(1)]); + store.insert(&bundle).unwrap(); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]); + store.insert(&bundle).unwrap(); + + let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(3)]); + store.insert(&bundle).unwrap(); + + let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); + let query = RangeQuery { + timeline: timeline_frame_nr, + range: TimeRange::new(2.into(), 4.into()), + }; + + println!("Store contents:\n{}", store.to_dataframe()); + + println!("\n-----\n"); + + let dfs = polars_util::range_components( + &store, + &query, + &ent_path, + Rect2D::name(), + [Instance::name(), Rect2D::name(), Point2D::name()], + &JoinType::Outer, + ); + for (time, df) in dfs.map(Result::unwrap) { + eprintln!( + "Found data at time {} from {}'s PoV (outer-joining):\n{}", + TimeType::Sequence.format(time), + Rect2D::name(), + df, + ); + } + + println!("\n-----\n"); + + let dfs = polars_util::range_components( + &store, + &query, + &ent_path, + Point2D::name(), + [Instance::name(), Rect2D::name(), Point2D::name()], + &JoinType::Outer, + ); + for (time, df) in dfs.map(Result::unwrap) { + eprintln!( + "Found data at time {} from {}'s PoV (outer-joining):\n{}", + TimeType::Sequence.format(time), + Point2D::name(), + df, + ); + } +} diff --git a/crates/re_arrow_store/src/lib.rs b/crates/re_arrow_store/src/lib.rs index 7bd3336eedba..381b87a368bd 100644 --- a/crates/re_arrow_store/src/lib.rs +++ b/crates/re_arrow_store/src/lib.rs @@ -14,6 +14,9 @@ mod store; mod store_read; mod store_write; +#[cfg(feature = "polars")] +mod store_polars; + #[cfg(feature = "polars")] pub mod polars_util; diff --git a/crates/re_arrow_store/src/polars_util.rs b/crates/re_arrow_store/src/polars_util.rs index 2b3f33082a21..2adebe2ddb4e 100644 --- a/crates/re_arrow_store/src/polars_util.rs +++ b/crates/re_arrow_store/src/polars_util.rs @@ -20,48 +20,7 @@ pub type SharedResult = ::std::result::Result; /// with any number of other dataframes returned by this function [`latest_component`] and /// [`latest_components`]. /// -/// Usage: -/// ``` -/// # use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline}; -/// # use re_arrow_store::polars_util::latest_component; -/// # use re_log_types::{ -/// # datagen::{build_frame_nr, build_some_point2d}, -/// # field_types::{Instance, Point2D}, -/// # msg_bundle::Component, -/// # ObjPath as EntityPath, -/// # }; -/// -/// let mut store = DataStore::new(Instance::name(), Default::default()); -/// -/// let ent_path = EntityPath::from("my/entity"); -/// -/// let bundle3 = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]); -/// store.insert(&bundle3).unwrap(); -/// -/// let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); -/// let df = latest_component( -/// &store, -/// &LatestAtQuery::new(timeline_frame_nr, 10.into()), -/// &ent_path, -/// Point2D::name(), -/// ) -/// .unwrap(); -/// -/// println!("{df:?}"); -/// ``` -/// -/// Outputs: -/// ```text -/// ┌────────────────┬─────────────────────┐ -/// │ rerun.instance ┆ rerun.point2d │ -/// │ --- ┆ --- │ -/// │ u64 ┆ struct[2] │ -/// ╞════════════════╪═════════════════════╡ -/// │ 0 ┆ {3.339503,6.287318} │ -/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -/// │ 1 ┆ {2.813822,9.160795} │ -/// └────────────────┴─────────────────────┘ -/// ``` +/// See `example/latest_component.rs` for an example of use. // // TODO(cmc): can this really fail though? pub fn latest_component( @@ -88,57 +47,7 @@ pub fn latest_component( /// with any number of other dataframes returned by this function [`latest_component`] and /// [`latest_components`]. /// -/// Usage: -/// ``` -/// # use polars_core::prelude::*; -/// # use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline}; -/// # use re_arrow_store::polars_util::latest_components; -/// # use re_log_types::{ -/// # datagen::{build_frame_nr, build_some_point2d, build_some_rects}, -/// # field_types::{Instance, Point2D, Rect2D}, -/// # msg_bundle::Component, -/// # ObjPath as EntityPath, -/// # }; -/// -/// let mut store = DataStore::new(Instance::name(), Default::default()); -/// -/// let ent_path = EntityPath::from("my/entity"); -/// -/// let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]); -/// store.insert(&bundle).unwrap(); -/// -/// let bundle = test_bundle!(ent_path @ [build_frame_nr(5.into())] => [build_some_rects(4)]); -/// store.insert(&bundle).unwrap(); -/// -/// let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); -/// let df = latest_components( -/// &store, -/// &LatestAtQuery::new(timeline_frame_nr, 10.into()), -/// &ent_path, -/// &[Point2D::name(), Rect2D::name()], -/// &JoinType::Outer, -/// ) -/// .unwrap(); -/// -/// println!("{df:?}"); -/// ``` -/// -/// Outputs: -/// ```text -/// ┌────────────────┬─────────────────────┬───────────────────┐ -/// │ rerun.instance ┆ rerun.point2d ┆ rerun.rect2d │ -/// │ --- ┆ --- ┆ --- │ -/// │ u64 ┆ struct[2] ┆ struct[4] │ -/// ╞════════════════╪═════════════════════╪═══════════════════╡ -/// │ 0 ┆ {2.936338,1.308388} ┆ {0.0,0.0,0.0,0.0} │ -/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -/// │ 1 ┆ {0.924683,7.757691} ┆ {1.0,1.0,0.0,0.0} │ -/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -/// │ 2 ┆ {null,null} ┆ {2.0,2.0,1.0,1.0} │ -/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -/// │ 3 ┆ {null,null} ┆ {3.0,3.0,1.0,1.0} │ -/// └────────────────┴─────────────────────┴───────────────────┘ -/// ``` +/// See `example/latest_components.rs` for an example of use. // // TODO(cmc): can this really fail though? pub fn latest_components( @@ -172,115 +81,7 @@ pub fn latest_components( /// This is a streaming-join: every yielded dataframe will be the result of joining the latest /// known state of all components, from their respective point-of-views. /// -/// ⚠ The semantics are subtle! Study carefully the example below. -/// -/// Usage: -/// ``` -/// use polars_core::prelude::JoinType; -/// use re_arrow_store::{polars_util, test_bundle, DataStore, RangeQuery, TimeRange}; -/// use re_log_types::{ -/// datagen::{build_frame_nr, build_some_point2d, build_some_rects}, -/// field_types::{Instance, Point2D, Rect2D}, -/// msg_bundle::Component as _, -/// ObjPath as EntityPath, TimeType, Timeline, -/// }; -/// -/// let mut store = DataStore::new(Instance::name(), Default::default()); -/// -/// let ent_path = EntityPath::from("this/that"); -/// -/// let frame1 = 1.into(); -/// let frame2 = 2.into(); -/// let frame3 = 3.into(); -/// let frame4 = 4.into(); -/// -/// let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]); -/// store.insert(&bundle).unwrap(); -/// -/// let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_point2d(2)]); -/// store.insert(&bundle).unwrap(); -/// -/// let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(4)]); -/// store.insert(&bundle).unwrap(); -/// -/// let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]); -/// store.insert(&bundle).unwrap(); -/// -/// let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(1)]); -/// store.insert(&bundle).unwrap(); -/// -/// let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]); -/// store.insert(&bundle).unwrap(); -/// -/// let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(3)]); -/// store.insert(&bundle).unwrap(); -/// -/// let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); -/// let query = RangeQuery { -/// timeline: timeline_frame_nr, -/// range: TimeRange::new(2.into(), 4.into()), -/// }; -/// -/// let dfs = polars_util::range_components( -/// &store, -/// &query, -/// &ent_path, -/// Rect2D::name(), -/// [Instance::name(), Rect2D::name(), Point2D::name()], -/// &JoinType::Outer, -/// ); -/// -/// for (time, df) in dfs.map(Result::unwrap) { -/// eprintln!( -/// "Found data at time {} from {}'s PoV (outer-joining):\n{:?}", -/// TimeType::Sequence.format(time), -/// Rect2D::name(), -/// df, -/// ); -/// } -/// ``` -/// -/// Outputs: -/// ```text -/// Found data at time #1 from rerun.rect2d's PoV (outer-joining): -/// ┌────────────────┬───────────────────┐ -/// │ rerun.instance ┆ rerun.rect2d │ -/// │ --- ┆ --- │ -/// │ u64 ┆ struct[4] │ -/// ╞════════════════╪═══════════════════╡ -/// │ 0 ┆ {0.0,0.0,0.0,0.0} │ -/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -/// │ 1 ┆ {1.0,1.0,0.0,0.0} │ -/// └────────────────┴───────────────────┘ -/// -/// Found data at time #4 from rerun.rect2d's PoV (outer-joining): -/// ┌────────────────┬───────────────────────┬───────────────┐ -/// │ rerun.instance ┆ rerun.rect2d ┆ rerun.point2d │ -/// │ --- ┆ --- ┆ --- │ -/// │ u64 ┆ struct[4] ┆ struct[2] │ -/// ╞════════════════╪═══════════════════════╪═══════════════╡ -/// │ 0 ┆ {0.0,0.0,0.0,0.0} ┆ {20.0,20.0} │ -/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -/// │ 1 ┆ {1.0,1.0,0.0,0.0} ┆ {21.0,21.0} │ -/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -/// │ 2 ┆ {2.0,2.0,1.0,1.0} ┆ {22.0,22.0} │ -/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -/// │ 3 ┆ {null,null,null,null} ┆ {23.0,23.0} │ -/// └────────────────┴───────────────────────┴───────────────┘ -/// -/// Found data at time #4 from rerun.rect2d's PoV (outer-joining): -/// ┌────────────────┬───────────────────┬───────────────┐ -/// │ rerun.instance ┆ rerun.rect2d ┆ rerun.point2d │ -/// │ --- ┆ --- ┆ --- │ -/// │ u64 ┆ struct[4] ┆ struct[2] │ -/// ╞════════════════╪═══════════════════╪═══════════════╡ -/// │ 0 ┆ {0.0,0.0,0.0,0.0} ┆ {30.0,30.0} │ -/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -/// │ 1 ┆ {1.0,1.0,0.0,0.0} ┆ {null,null} │ -/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -/// │ 2 ┆ {2.0,2.0,1.0,1.0} ┆ {null,null} │ -/// └────────────────┴───────────────────┴───────────────┘ -/// `` +/// ⚠ The semantics are subtle! See `example/range_components.rs` for an example of use. pub fn range_components<'a, const N: usize>( store: &'a DataStore, query: &'a RangeQuery, diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index a079905cd7b0..6b984094f20b 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -138,6 +138,16 @@ pub struct DataStoreConfig { /// /// See [`Self::DEFAULT`] for defaults. pub index_bucket_nb_rows: u64, + + /// If enabled, will store the ID of the write request alongside the inserted data. + /// + /// This can make inspecting the data within the store much easier, at the cost of an extra + /// `u64` value stored per row. + /// + /// Enabled by default in debug builds. + /// + /// See [`DataStore::insert_id_key`]. + pub store_insert_ids: bool, } impl Default for DataStoreConfig { @@ -152,6 +162,7 @@ impl DataStoreConfig { component_bucket_nb_rows: u64::MAX, index_bucket_size_bytes: 32 * 1024, // 32kiB index_bucket_nb_rows: 1024, + store_insert_ids: cfg!(debug_assertions), }; } @@ -159,10 +170,15 @@ impl DataStoreConfig { /// A complete data store: covers all timelines, all entities, everything. /// +/// ## Debugging +/// /// `DataStore` provides a very thorough `Display` implementation that makes it manageable to /// know what's going on internally. /// For even more information, you can set `RERUN_DATA_STORE_DISPLAY_SCHEMAS=1` in your /// environment, which will result in additional schema information being printed out. +/// +/// Additionally, if the `polars` feature is enabled, you can dump the entire datastore as a +/// flat denormalized dataframe using [`Self::to_dataframe`]. pub struct DataStore { /// The cluster key specifies a column/component that is guaranteed to always be present for /// every single row of data within the store. @@ -181,7 +197,7 @@ pub struct DataStore { pub(crate) config: DataStoreConfig, /// Used to cache auto-generated cluster components, i.e. `[0]`, `[0, 1]`, `[0, 1, 2]`, etc - /// so that they properly deduplicated. + /// so that they can be properly deduplicated. pub(crate) cluster_comp_cache: IntMap, /// Maps an entity to its index, for a specific timeline. @@ -214,6 +230,16 @@ impl DataStore { } } + /// The column name used for storing insert requests' IDs alongside the data. + /// + /// The insert IDs are stored as-is directly into the index tables, this is _not_ an + /// indirection into an associated component table! + /// + /// See [`DataStoreConfig::store_insert_ids`]. + pub fn insert_id_key() -> ComponentName { + "rerun.insert_id".into() + } + /// See [`Self::cluster_key`] for more information about the cluster key. pub fn cluster_key(&self) -> ComponentName { self.cluster_key diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs new file mode 100644 index 000000000000..9e008150dbbc --- /dev/null +++ b/crates/re_arrow_store/src/store_polars.rs @@ -0,0 +1,229 @@ +use arrow2::{ + array::{Array, ListArray, UInt64Array, Utf8Array}, + bitmap::Bitmap, + buffer::Buffer, + compute::concatenate::concatenate, +}; +use nohash_hasher::IntMap; +use polars_core::{functions::diag_concat_df, prelude::*}; +use re_log_types::ComponentName; + +use crate::{ComponentTable, DataStore, DataStoreConfig, IndexBucket, IndexBucketIndices}; + +// --- + +impl DataStore { + /// Dumps the entire datastore as a flat, denormalized dataframe. + /// + /// This cannot fail: it always tries to yield as much valuable information as it can, even in + /// the face of errors. + pub fn to_dataframe(&self) -> DataFrame { + let dfs: Vec = self + .indices + .values() + .map(|index| { + let dfs: Vec<_> = index + .buckets + .values() + .map(|bucket| (index.ent_path.clone(), bucket)) + .map(|(ent_path, bucket)| { + let mut df = bucket.to_dataframe(&self.config, &self.components); + let nb_rows = df.get_columns()[0].len(); + + // Add a column where every row is the entity path. + let entities = { + let ent_path = ent_path.to_string(); + let ent_path = Some(ent_path.as_str()); + let entities = Utf8Array::::from(vec![ent_path; nb_rows]).boxed(); + new_infallible_series("entity", entities, nb_rows) + }; + let df = df.with_column(entities).unwrap(); // cannot fail + + df.clone() + }) + .collect(); + + // Concatenate all buckets of the index together. + // + // This has to be done diagonally since each bucket can and will have different + // numbers of columns (== components) and rows. + diag_concat_df(dfs.as_slice()) + // TODO(cmc): is there any way this can fail in this case? + .unwrap() + }) + .collect(); + + // Concatenate all indices together. + // + // This has to be done diagonally since these indices refer to different entities with + // potentially wildly different sets of components and lengths. + let df = diag_concat_df(&dfs) + // TODO(cmc): is there any way this can fail in this case? + .unwrap(); + + let df = sort_df_columns(&df, self.config.store_insert_ids); + + if self.config.store_insert_ids { + // If insert IDs are available, sort rows based on those. + df.sort(vec![DataStore::insert_id_key().as_str()], vec![false]) + .unwrap() + } else { + df + } + } +} + +impl IndexBucket { + /// Dumps the entire bucket as a flat, denormalized dataframe. + /// + /// This cannot fail: it always tries to yield as much valuable information as it can, even in + /// the face of errors. + pub fn to_dataframe( + &self, + config: &DataStoreConfig, + components: &IntMap, + ) -> DataFrame { + let (_, times) = self.times(); + let nb_rows = times.len(); + + let IndexBucketIndices { + is_sorted: _, + time_range: _, + times: _, + indices, + } = &*self.indices.read(); + + let insert_ids = config + .store_insert_ids + .then(|| { + indices.get(&DataStore::insert_id_key()).map(|insert_ids| { + let insert_ids = insert_ids + .iter() + .map(|id| id.map(|id| id.0.get())) + .collect::>(); + let insert_ids = UInt64Array::from(insert_ids); + new_infallible_series( + DataStore::insert_id_key().as_str(), + insert_ids.boxed(), + nb_rows, + ) + }) + }) + .flatten(); + + // Need to create one `Series` for the time index and one for each component index. + let comp_series = [ + // One column for insert IDs, if they are available. + insert_ids, + // One column for the time index. + Some(new_infallible_series( + self.timeline.name().as_str(), + times.boxed(), + nb_rows, + )), + ] + .into_iter() + .flatten() // filter options + // One column for each component index. + .chain(indices.iter().filter_map(|(component, comp_row_nrs)| { + let comp_table = components.get(component)?; + + // For each row in the index, grab the associated data from the component tables. + let comp_rows: Vec> = comp_row_nrs + .iter() + .map(|comp_row_nr| comp_row_nr.and_then(|comp_row_nr| comp_table.get(comp_row_nr))) + .collect(); + + // Computing the validity bitmap is just a matter of checking whether the data was + // available in the component tables. + let comp_validity: Vec<_> = comp_rows.iter().map(|row| row.is_some()).collect(); + + // Each cell is actually a list, so we need to compute offsets one cell at a time. + let mut offset = 0i32; + let comp_offsets: Vec<_> = std::iter::once(0) + .chain(comp_rows.iter().map(|row| { + offset += row.as_ref().map_or(0, |row| row.len()) as i32; + offset + })) + .collect(); + let comp_values: Vec<_> = comp_rows.iter().flatten().map(|row| row.as_ref()).collect(); + + // Bring everything together into one big list. + let comp_values = ListArray::::from_data( + ListArray::::default_datatype(comp_table.datatype.clone()), + Buffer::from(comp_offsets), + concatenate(comp_values.as_slice()).unwrap().to_boxed(), + Some(Bitmap::from(comp_validity)), + ) + .boxed(); + + Some(new_infallible_series( + component.as_str(), + comp_values, + nb_rows, + )) + })); + + DataFrame::new(comp_series.collect::>()) + // This cannot fail at this point, all series are guaranteed to have data and be of + // same length. + .unwrap() + } +} + +// --- + +fn new_infallible_series(name: &str, data: Box, len: usize) -> Series { + Series::try_from((name, data)).unwrap_or_else(|_| { + let errs = Utf8Array::::from(vec![Some(""); len]); + Series::try_from((name, errs.boxed())).unwrap() // cannot fail + }) +} + +/// Sorts the columns of the given dataframe according to the following rules: +// - insert ID comes first if it's available, +// - followed by lexically sorted timelines, +// - followed by the entity path, +// - and finally all components in lexical order. +fn sort_df_columns(df: &DataFrame, store_insert_ids: bool) -> DataFrame { + let columns: Vec<_> = { + let mut all = df.get_column_names(); + all.sort(); + + all.remove(all.binary_search(&"entity").expect("has to exist")); + + if store_insert_ids { + all.remove( + all.binary_search(&DataStore::insert_id_key().as_str()) + .expect("has to exist"), + ); + } + + let timelines = all + .iter() + .copied() + .filter(|name| !name.starts_with("rerun.")) + .map(Some) + .collect::>(); + + let components = all + .iter() + .copied() + .filter(|name| name.starts_with("rerun.")) + .map(Some) + .collect::>(); + + [ + vec![store_insert_ids.then(|| DataStore::insert_id_key().as_str())], + timelines, + vec![Some("entity")], + components, + ] + .into_iter() + .flatten() // flatten vectors + .flatten() // filter options + .collect() + }; + + df.select(columns).unwrap() +} diff --git a/crates/re_arrow_store/src/store_read.rs b/crates/re_arrow_store/src/store_read.rs index 84b12fa50485..833036208122 100644 --- a/crates/re_arrow_store/src/store_read.rs +++ b/crates/re_arrow_store/src/store_read.rs @@ -161,12 +161,6 @@ impl DataStore { /// This is what our `latest_components` polars helper does. /// /// For more information about working with dataframes, see the `polars` feature. - // - // TODO(#640): add a short "store-contains-this/latest-at-returns-that" once we have support - // for DataStore::dump_as_df(). - // - // TODO(cmc): at one point we're gonna need a high-level documentation/user-guide of the - // semantics of latest-at PoV queries, giving readers a walkthrough of a real-world query. pub fn latest_at( &self, query: &LatestAtQuery, @@ -306,12 +300,6 @@ impl DataStore { /// This is what our `range_components` polars helper does. /// /// For more information about working with dataframes, see the `polars` feature. - // - // TODO(#640): add a short "store-contains-this/range-returns-that" once we have support for - // DataStore::dump_as_df(). - // - // TODO(cmc): at one point we're gonna need a high-level documentation/user-guide of the - // semantics of range queries, giving readers a walkthrough of a real-world query. pub fn range<'a, const N: usize>( &'a self, query: &RangeQuery, diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 1d5496307f17..2c7ad6cbdb36 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -169,6 +169,16 @@ impl DataStore { // Always insert the cluster component. row_indices.insert(self.cluster_key, cluster_row_idx); + if self.config.store_insert_ids { + // Store the ID of the write request alongside the data. + // + // This is _not_ an actual `RowIndex`, there isn't even a component table associated + // with insert IDs! + // We're just abusing the fact that any value we push here as a `RowIndex` will end up + // as-is in the index. + row_indices.insert(Self::insert_id_key(), RowIndex::from_u64(self.insert_id)); + } + for bundle in components .iter() .filter(|bundle| bundle.name != self.cluster_key) diff --git a/crates/re_arrow_store/src/test_util.rs b/crates/re_arrow_store/src/test_util.rs index faa07b07a571..c44f66be0ca3 100644 --- a/crates/re_arrow_store/src/test_util.rs +++ b/crates/re_arrow_store/src/test_util.rs @@ -103,6 +103,7 @@ pub fn all_configs() -> impl Iterator { component_bucket_nb_rows: comp.component_bucket_nb_rows, index_bucket_size_bytes: idx.index_bucket_size_bytes, index_bucket_nb_rows: idx.index_bucket_nb_rows, + store_insert_ids: comp.store_insert_ids || idx.store_insert_ids, }) }) }