Skip to content

Commit

Permalink
re_datastore: dump to flat dataframe (#645)
Browse files Browse the repository at this point in the history
* get is supposed to return a row, not a [row]

* unwrap note

* the bench too

* self review

* doc test also

* and re_query ofc!

* slicing is _very_ slow, don't do it if you don't have to

* no more col_arrays in re_query

* there's actually no need for concatenating at all

* incrementally compute and cache bucket sizes

* cleaning up and documenting existing limitations

* introducing bucket retirement

* issue ref

* some more doc stuff

* self-review

* polars/fmt should always be there for tests

* streamlining batch support

* take list header into account

* it's fine

* self-review

* just something i want to keep around for later

* (un)wrapping lists is a bit slow... and slicing them is _extremely_ slow!

* merge cmc/datastore/get_a_single_row (#590)

* no more col_arrays in re_query

* introducing the notion of clustering key, thankfully breaking all tests by design

* making good use of that shiny new Instance component

* merge cmc/datastore/get_rid_of_copies (#584)

* missed one

* introducing arrow_util with is_dense_array()

* finding the clustering comp of the row... or creating it!

* rebasin'

* post rebase clean up

* addressing PR comments, I hope

* ensure that clustering components are properly sorted, failing the existing test suite

* build_instances now generate sorted ids, thus greenlighting the test suite

* missed a couple

* addressed PR comments

* going for the ArrayExt route

* completing the quadrifecta of checks

* the unavoidable typed error revolution is on its way, and it's early

* where we're going we don't need polars

* update everything for the new APIs

* error for unsupported clustering key types

* clean up and actually testing our error paths

* move those nasty internal tests into their own dirty corner

* finally some high-level tests in here

* i happen to like where this is going

* shuffling things

* demonstrating that implicit instances are somehow broken

* fully working implicit clustering keys, but demonstrating a sorting issue somewhere

* there is still something weird going on tho

* latest_at behaving as one would expect

* automatically cache generated cluster instances

* time to clean up en masse

* still want to put some stress on the bucketing

* make ArrayExt::is_dense a little more friendly, just in case...

* TimeType::format_range

* independent latest_at query and using appropriate types everywhere

* re_query: use polars/fmt in tests

* re_query: remove implicit instances

* fixing the u32 vs u64 instance drama

* really starting to like how this looks

* cluster-aware polars helpers :>

* cleanin up tests

* continuing cleanup and doc

* updating visuals for this brave new world

* docs

* self-review

* bruh

* bruh...

* ...

* outdated comment

* no reason to search for it multiple times

* polars_helpers => polars_util for consistency's sake

* addressing PR comments and a couple other things

* xxx

* post-merge fixes

* TimeInt should be nohash

* high-level polar range tools + making first half of range impl pass

* implement the streaming half

* finally defeated all demons

* still passes?

* it looks like we've made it out alive

* polars util: join however you wish

* fixed formatting

* point2d's PoVs working as expected

* passing full ranges

* docs and such part 1, the semantics are hell

* fixing the filtering mess in tests

* me stoopid

* polars docs

* addressing the clones

* xxx

* missed a gazillon conflict somehow

* polars util spring cleaning

* do indicate and demonstrate that range_components is _not_ a real streaming join

* fixed some comments

* bruh

* screw it, going for the real deal: full streaming joins

* YESgit sgit s FINALLY SEMANTICS I ACTUALLY LIKE

* yep yep i like this

* I hereby declare myself _satisfied_

* initiating the great cleanup

* add notes for upcoming terminology pr

* bringing IndexRowNr into the mix and slowly starting to fix terminology mess

* improving range_components ergonomics

* putting it all in self-reviewable state

* self-review

* add bench

* xxx

* having some casual fun with dataframes :>

* now with components!

* just some experiments im not too found of... keeping around just in case

* Revert "just some experiments im not too found of... keeping around just in case"

This reverts commit 15f6487.

* playing around with insert_id-as-data... which turns out to be quite helpful

* going into store_insert_ids for real

* full impl

* add example

* self-review

* reviewable

* addressing PR comments

* is that readable

* Revert "is that readable"

This reverts commit f802ff5.

* standalone examples for all dataframe APIs

* burn all todos

* update doc: you can do that now!

* demonstrating nasty edge-case with streaming-joins

* update streaming-join merging rules to fix said edge case

* addressed PR comments
  • Loading branch information
teh-cmc authored Jan 2, 2023
1 parent 853b714 commit 78b6486
Showing 12 changed files with 559 additions and 215 deletions.
29 changes: 29 additions & 0 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -6,13 +6,15 @@ rust-version.workspace = true
license.workspace = true
publish = false


[features]
default = []
## Enables `parking_lot`'s deadlock detection background thread.
deadlock_detection = ["parking_lot/deadlock_detection"]
## Integration with `polars`, to efficiently use the datastore with dataframes.
polars = ["dep:polars-core"]


[dependencies]
# Rerun
re_format = { path = "../re_format" }
@@ -36,11 +38,15 @@ thiserror.workspace = true

# Optional
polars-core = { workspace = true, optional = true, features = [
"diagonal_concat",
"dtype-date",
"dtype-datetime",
"dtype-time",
"dtype-struct",
"sort_multiple",
] }


[dev-dependencies]
criterion = "0.4"
mimalloc = "0.1"
@@ -52,9 +58,32 @@ polars-core = { workspace = true, features = [
] }
tracing-subscriber = "0.3"


[lib]
bench = false


[[example]]
name = "dump_dataframe"
path = "examples/dump_dataframe.rs"
required-features = ["polars"]

[[example]]
name = "latest_component"
path = "examples/latest_component.rs"
required-features = ["polars"]

[[example]]
name = "latest_components"
path = "examples/latest_components.rs"
required-features = ["polars"]

[[example]]
name = "range_components"
path = "examples/range_components.rs"
required-features = ["polars"]


[[bench]]
name = "data_store"
harness = false
71 changes: 71 additions & 0 deletions crates/re_arrow_store/examples/dump_dataframe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//! Demonstrates usage of [`DataStore::to_dataframe`].
//!
//! ```text
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example dump_dataframe
//! ```
use re_arrow_store::{test_bundle, DataStore};
use re_log_types::{
datagen::{
build_frame_nr, build_log_time, build_some_instances, build_some_instances_from,
build_some_point2d, build_some_rects,
},
field_types::Instance,
msg_bundle::Component as _,
ObjPath as EntityPath, Time,
};

// ---

fn main() {
let mut store = DataStore::new(Instance::name(), Default::default());

let ent_paths = [
EntityPath::from("this/that"),
EntityPath::from("and/this/other/thing"),
];

for ent_path in &ent_paths {
let bundle1 = test_bundle!(ent_path @ [
build_frame_nr(1.into()), build_log_time(Time::now()),
] => [build_some_instances(2), build_some_rects(2)]);
store.insert(&bundle1).unwrap();
}

for ent_path in &ent_paths {
let bundle2 = test_bundle!(ent_path @ [
build_frame_nr(2.into())
] => [build_some_instances(2), build_some_point2d(2)]);
store.insert(&bundle2).unwrap();

let bundle3 = test_bundle!(ent_path @ [
build_frame_nr(3.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert(&bundle3).unwrap();
}

for ent_path in &ent_paths {
let bundle4_1 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(20..23), build_some_rects(3)]);
store.insert(&bundle4_1).unwrap();

let bundle4_15 = test_bundle!(ent_path @ [
build_frame_nr(4.into()),
] => [build_some_instances_from(20..23), build_some_point2d(3)]);
store.insert(&bundle4_15).unwrap();

let bundle4_2 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..28), build_some_rects(3)]);
store.insert(&bundle4_2).unwrap();

let bundle4_25 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..28), build_some_point2d(3)]);
store.insert(&bundle4_25).unwrap();
}

let df = store.to_dataframe();
println!("{df}");
}
54 changes: 54 additions & 0 deletions crates/re_arrow_store/examples/latest_component.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! Demonstrates usage of [`re_arrow_store::polars_util::latest_component`].
//!
//! ```text
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example latest_component
//! ```
use re_arrow_store::polars_util::latest_component;
use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline};
use re_log_types::datagen::build_some_rects;
use re_log_types::field_types::Rect2D;
use re_log_types::{
datagen::{build_frame_nr, build_some_point2d},
field_types::{Instance, Point2D},
msg_bundle::Component,
ObjPath as EntityPath,
};

fn main() {
let mut store = DataStore::new(Instance::name(), Default::default());

let ent_path = EntityPath::from("my/entity");

let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]);
store.insert(&bundle).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]);
store.insert(&bundle).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);

println!("Store contents:\n{}", store.to_dataframe());

println!("\n-----\n");

let df = latest_component(
&store,
&LatestAtQuery::new(timeline_frame_nr, 10.into()),
&ent_path,
Rect2D::name(),
)
.unwrap();
println!("Query results from {:?}'s PoV:\n{df}", Rect2D::name());

println!("\n-----\n");

let df = latest_component(
&store,
&LatestAtQuery::new(timeline_frame_nr, 10.into()),
&ent_path,
Point2D::name(),
)
.unwrap();
println!("Query results from {:?}'s PoV:\n{df}", Point2D::name());
}
40 changes: 40 additions & 0 deletions crates/re_arrow_store/examples/latest_components.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! Demonstrates usage of [`re_arrow_store::polars_util::latest_components`].
//!
//! ```text
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example latest_components
//! ```
use polars_core::prelude::*;
use re_arrow_store::polars_util::latest_components;
use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline};
use re_log_types::{
datagen::{build_frame_nr, build_some_point2d, build_some_rects},
field_types::{Instance, Point2D, Rect2D},
msg_bundle::Component,
ObjPath as EntityPath,
};

fn main() {
let mut store = DataStore::new(Instance::name(), Default::default());

let ent_path = EntityPath::from("my/entity");

let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]);
store.insert(&bundle).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]);
store.insert(&bundle).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let df = latest_components(
&store,
&LatestAtQuery::new(timeline_frame_nr, 10.into()),
&ent_path,
&[Point2D::name(), Rect2D::name()],
&JoinType::Outer,
)
.unwrap();

println!("Store contents:\n{}", store.to_dataframe());
println!("Query results:\n{df}");
}
92 changes: 92 additions & 0 deletions crates/re_arrow_store/examples/range_components.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//! Demonstrates usage of [`re_arrow_store::polars_util::range_components`].
//!
//! ```text
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example range_components
//! ```
use polars_core::prelude::JoinType;
use re_arrow_store::{polars_util, test_bundle, DataStore, RangeQuery, TimeRange};
use re_log_types::{
datagen::{build_frame_nr, build_some_point2d, build_some_rects},
field_types::{Instance, Point2D, Rect2D},
msg_bundle::Component as _,
ObjPath as EntityPath, TimeType, Timeline,
};

fn main() {
let mut store = DataStore::new(Instance::name(), Default::default());

let ent_path = EntityPath::from("this/that");

let frame1 = 1.into();
let frame2 = 2.into();
let frame3 = 3.into();
let frame4 = 4.into();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]);
store.insert(&bundle).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_point2d(2)]);
store.insert(&bundle).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(4)]);
store.insert(&bundle).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]);
store.insert(&bundle).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(1)]);
store.insert(&bundle).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]);
store.insert(&bundle).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(3)]);
store.insert(&bundle).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery {
timeline: timeline_frame_nr,
range: TimeRange::new(2.into(), 4.into()),
};

println!("Store contents:\n{}", store.to_dataframe());

println!("\n-----\n");

let dfs = polars_util::range_components(
&store,
&query,
&ent_path,
Rect2D::name(),
[Instance::name(), Rect2D::name(), Point2D::name()],
&JoinType::Outer,
);
for (time, df) in dfs.map(Result::unwrap) {
eprintln!(
"Found data at time {} from {}'s PoV (outer-joining):\n{}",
TimeType::Sequence.format(time),
Rect2D::name(),
df,
);
}

println!("\n-----\n");

let dfs = polars_util::range_components(
&store,
&query,
&ent_path,
Point2D::name(),
[Instance::name(), Rect2D::name(), Point2D::name()],
&JoinType::Outer,
);
for (time, df) in dfs.map(Result::unwrap) {
eprintln!(
"Found data at time {} from {}'s PoV (outer-joining):\n{}",
TimeType::Sequence.format(time),
Point2D::name(),
df,
);
}
}
3 changes: 3 additions & 0 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
@@ -14,6 +14,9 @@ mod store;
mod store_read;
mod store_write;

#[cfg(feature = "polars")]
mod store_polars;

#[cfg(feature = "polars")]
pub mod polars_util;

Loading

1 comment on commit 78b6486

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 78b6486 Previous: 853b714 Ratio
datastore/insert/batch/rects/insert 175706 ns/iter (± 481) 169114 ns/iter (± 254) 1.04
datastore/latest_at/batch/rects/query 713 ns/iter (± 1) 728 ns/iter (± 1) 0.98
datastore/latest_at/missing_components/primary 298 ns/iter (± 1) 306 ns/iter (± 0) 0.97
datastore/latest_at/missing_components/secondaries 383 ns/iter (± 0) 389 ns/iter (± 0) 0.98
datastore/range/batch/rects/query 46339 ns/iter (± 147) 45077 ns/iter (± 42) 1.03
obj_mono_points/insert 955040379 ns/iter (± 3470260) 992010058 ns/iter (± 5381404) 0.96
obj_mono_points/query 365255 ns/iter (± 1643) 362032 ns/iter (± 1396) 1.01
obj_batch_points/insert 93523083 ns/iter (± 500933) 94465228 ns/iter (± 465520) 0.99
obj_batch_points/query 11303 ns/iter (± 40) 11294 ns/iter (± 23) 1.00
obj_batch_points_sequential/insert 23337416 ns/iter (± 301376) 22838540 ns/iter (± 256885) 1.02
obj_batch_points_sequential/query 7890 ns/iter (± 17) 7881 ns/iter (± 28) 1.00
mono_points_classic/generate_messages 4594160 ns/iter (± 189655) 4357290 ns/iter (± 28474) 1.05
mono_points_classic/encode_log_msg 11238624 ns/iter (± 926713) 10245119 ns/iter (± 353308) 1.10
mono_points_classic/encode_total 16017163 ns/iter (± 1084451) 14569072 ns/iter (± 349489) 1.10
mono_points_classic/decode_total 37337228 ns/iter (± 760089) 35153401 ns/iter (± 625964) 1.06
mono_points_arrow/generate_message_bundles 31779172 ns/iter (± 1163774) 25132270 ns/iter (± 1545286) 1.26
mono_points_arrow/generate_messages 102257529 ns/iter (± 1201648) 101495966 ns/iter (± 1395652) 1.01
mono_points_arrow/encode_log_msg 130567576 ns/iter (± 876238) 128951170 ns/iter (± 934009) 1.01
mono_points_arrow/encode_total 258902176 ns/iter (± 2554611) 260810335 ns/iter (± 1917860) 0.99
mono_points_arrow/decode_log_msg 143341792 ns/iter (± 958919) 145011501 ns/iter (± 1074818) 0.99
mono_points_arrow/decode_message_bundles 54599444 ns/iter (± 1568695) 56050404 ns/iter (± 1583016) 0.97
mono_points_arrow/decode_total 197966026 ns/iter (± 1894680) 199648350 ns/iter (± 1916117) 0.99
batch_points_classic/generate_messages 3391 ns/iter (± 29) 3580 ns/iter (± 24) 0.95
batch_points_classic/encode_log_msg 394647 ns/iter (± 632) 393309 ns/iter (± 650) 1.00
batch_points_classic/encode_total 402392 ns/iter (± 1277) 396671 ns/iter (± 556) 1.01
batch_points_classic/decode_total 744849 ns/iter (± 2932) 743895 ns/iter (± 29862) 1.00
batch_points_arrow/generate_message_bundles 243717 ns/iter (± 817) 243679 ns/iter (± 324) 1.00
batch_points_arrow/generate_messages 4579 ns/iter (± 14) 4562 ns/iter (± 7) 1.00
batch_points_arrow/encode_log_msg 266763 ns/iter (± 897) 265977 ns/iter (± 626) 1.00
batch_points_arrow/encode_total 539320 ns/iter (± 1509) 534835 ns/iter (± 1754) 1.01
batch_points_arrow/decode_log_msg 206928 ns/iter (± 3918) 205868 ns/iter (± 325) 1.01
batch_points_arrow/decode_message_bundles 1668 ns/iter (± 1) 1681 ns/iter (± 7) 0.99
batch_points_arrow/decode_total 213554 ns/iter (± 900) 212120 ns/iter (± 598) 1.01
arrow_mono_points/insert 3989775979 ns/iter (± 12681978) 4052297893 ns/iter (± 10555742) 0.98
arrow_mono_points/query 1724686 ns/iter (± 24401) 1647100 ns/iter (± 10576) 1.05
arrow_batch_points/insert 1517300 ns/iter (± 10406) 1490507 ns/iter (± 10297) 1.02
arrow_batch_points/query 15538 ns/iter (± 9) 16429 ns/iter (± 33) 0.95
obj_batch_points_sequential/Tuid::random 37 ns/iter (± 0) 38 ns/iter (± 0) 0.97

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.