Skip to content

Commit

Permalink
re_query: support for range queries (#653)
Browse files Browse the repository at this point in the history
* get is supposed to return a row, not a [row]

* unwrap note

* the bench too

* self review

* doc test also

* and re_query ofc!

* slicing is _very_ slow, don't do it if you don't have to

* no more col_arrays in re_query

* there's actually no need for concatenating at all

* incrementally compute and cache bucket sizes

* cleaning up and documenting existing limitations

* introducing bucket retirement

* issue ref

* some more doc stuff

* self-review

* polars/fmt should always be there for tests

* streamlining batch support

* take list header into account

* it's fine

* self-review

* just something i want to keep around for later

* (un)wrapping lists is a bit slow... and slicing them is _extremely_ slow!

* merge cmc/datastore/get_a_single_row (#590)

* no more col_arrays in re_query

* introducing the notion of clustering key, thankfully breaking all tests by design

* making good use of that shiny new Instance component

* merge cmc/datastore/get_rid_of_copies (#584)

* missed one

* introducing arrow_util with is_dense_array()

* finding the clustering comp of the row... or creating it!

* rebasin'

* post rebase clean up

* addressing PR comments, I hope

* ensure that clustering components are properly sorted, failing the existing test suite

* build_instances now generate sorted ids, thus greenlighting the test suite

* missed a couple

* addressed PR comments

* going for the ArrayExt route

* completing the quadrifecta of checks

* the unavoidable typed error revolution is on its way, and it's early

* where we're going we don't need polars

* update everything for the new APIs

* error for unsupported clustering key types

* clean up and actually testing our error paths

* move those nasty internal tests into their own dirty corner

* finally some high-level tests in here

* i happen to like where this is going

* shuffling things

* demonstrating that implicit instances are somehow broken

* fully working implicit clustering keys, but demonstrating a sorting issue somewhere

* there is still something weird going on tho

* latest_at behaving as one would expect

* automatically cache generated cluster instances

* time to clean up en masse

* still want to put some stress on the bucketing

* make ArrayExt::is_dense a little more friendly, just in case...

* TimeType::format_range

* independent latest_at query and using appropriate types everywhere

* re_query: use polars/fmt in tests

* re_query: remove implicit instances

* fixing the u32 vs u64 instance drama

* really starting to like how this looks

* cluster-aware polars helpers :>

* cleanin up tests

* continuing cleanup and doc

* updating visuals for this brave new world

* docs

* self-review

* bruh

* bruh...

* ...

* outdated comment

* no reason to search for it multiple times

* polars_helpers => polars_util for consistency's sake

* addressing PR comments and a couple other things

* xxx

* post-merge fixes

* TimeInt should be nohash

* high-level polar range tools + making first half of range impl pass

* implement the streaming half

* finally defeated all demons

* still passes?

* it looks like we've made it out alive

* polars util: join however you wish

* fixed formatting

* point2d's PoVs working as expected

* passing full ranges

* docs and such part 1, the semantics are hell

* fixing the filtering mess in tests

* me stoopid

* polars docs

* addressing the clones

* xxx

* missed a gazillon conflict somehow

* polars util spring cleaning

* do indicate and demonstrate that range_components is _not_ a real streaming join

* fixed some comments

* bruh

* screw it, going for the real deal: full streaming joins

* YESgit sgit s FINALLY SEMANTICS I ACTUALLY LIKE

* yep yep i like this

* I hereby declare myself _satisfied_

* initiating the great cleanup

* add notes for upcoming terminology pr

* bringing IndexRowNr into the mix and slowly starting to fix terminology mess

* improving range_components ergonomics

* putting it all in self-reviewable state

* self-review

* add bench

* xxx

* addressing PR comments

* first impl

* ported simple_query() to simple_range

* doc and such

* added e2e example for range queries

* self-review

* support for new EntityView

* demonstrating nasty edge-case with streaming-joins

* update streaming-join merging rules to fix said edge case

* mimicking range_components' new merging rules

* implement PoV-less, always-yield lower-level API + adapt higher-level one

* addressing PR comments

* ported to new low-level APIs

* xxx

* addressed PR comments

* self and not-so-self reviews

* the future is quite literally here
  • Loading branch information
teh-cmc authored Jan 2, 2023
1 parent f4c7d43 commit 1d28875
Show file tree
Hide file tree
Showing 5 changed files with 464 additions and 1 deletion.
12 changes: 11 additions & 1 deletion crates/re_query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ rust-version.workspace = true
license.workspace = true
publish = false


[features]
default = []
polars = ["polars-core"]
polars = ["dep:polars-core", "re_arrow_store/polars"]


[dependencies]
# Rerun
Expand Down Expand Up @@ -49,9 +51,17 @@ polars-core = { workspace = true, features = [
] }
tracing-subscriber = "0.3"


[lib]
bench = false


[[example]]
name = "range"
path = "examples/range.rs"
required-features = ["polars"]


[[bench]]
name = "obj_query_benchmark"
harness = false
79 changes: 79 additions & 0 deletions crates/re_query/examples/range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Demonstrates usage of [`re_query::range_entity_with_primary`].
//!
//! ```text
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_query --all-features --example range
//! ```
use re_arrow_store::{DataStore, RangeQuery, TimeRange};
use re_log_types::{
datagen::{build_frame_nr, build_some_point2d, build_some_rects},
field_types::{Instance, Point2D, Rect2D},
msg_bundle::{try_build_msg_bundle1, Component as _},
MsgId, ObjPath as EntityPath, TimeType,
};
use re_query::range_entity_with_primary;

fn main() {
let mut store = DataStore::new(Instance::name(), Default::default());

let ent_path: EntityPath = "point".into();

let frame1 = [build_frame_nr(1.into())];
let frame2 = [build_frame_nr(2.into())];
let frame3 = [build_frame_nr(3.into())];
let frame4 = [build_frame_nr(4.into())];

let rects = build_some_rects(2);
let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame1, &rects).unwrap();
store.insert(&bundle).unwrap();

let points = build_some_point2d(2);
let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame2, &points).unwrap();
store.insert(&bundle).unwrap();

let points = build_some_point2d(4);
let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame3, &points).unwrap();
store.insert(&bundle).unwrap();

let rects = build_some_rects(3);
let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &rects).unwrap();
store.insert(&bundle).unwrap();

let points = build_some_point2d(3);
let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &points).unwrap();
store.insert(&bundle).unwrap();

let rects = build_some_rects(3);
let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &rects).unwrap();
store.insert(&bundle).unwrap();

let query = RangeQuery::new(frame2[0].0, TimeRange::new(frame2[0].1, frame4[0].1));

println!("Store contents:\n{}", store.to_dataframe());

println!("\n-----\n");

let components = [Instance::name(), Rect2D::name(), Point2D::name()];
let ent_views = range_entity_with_primary::<Rect2D, 3>(&store, &query, &ent_path, components);
for (time, ent_view) in ent_views {
eprintln!(
"Found data at time {} from {}'s PoV:\n{}",
TimeType::Sequence.format(time),
Rect2D::name(),
&ent_view.as_df2::<Point2D>().unwrap()
);
}

println!("\n-----\n");

let components = [Instance::name(), Rect2D::name(), Point2D::name()];
let ent_views = range_entity_with_primary::<Point2D, 3>(&store, &query, &ent_path, components);
for (time, ent_view) in ent_views {
eprintln!(
"Found data at time {} from {}'s PoV:\n{}",
TimeType::Sequence.format(time),
Point2D::name(),
&ent_view.as_df2::<Rect2D>().unwrap()
);
}
}
2 changes: 2 additions & 0 deletions crates/re_query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
mod entity_view;
mod query;
mod range;
mod visit;

#[cfg(feature = "polars")]
pub mod dataframe_util;

pub use self::entity_view::{ComponentWithInstances, EntityView};
pub use self::query::{get_component_with_instances, query_entity_with_primary};
pub use self::range::range_entity_with_primary;

// Used for doc-tests
#[doc(hidden)]
Expand Down
135 changes: 135 additions & 0 deletions crates/re_query/src/range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use itertools::Itertools as _;
use re_arrow_store::{DataStore, LatestAtQuery, RangeQuery, TimeInt};
use re_log_types::{msg_bundle::Component, ComponentName, ObjPath};

use crate::{get_component_with_instances, ComponentWithInstances, EntityView};

// ---

/// Iterates over the rows of any number of components and their respective cluster keys, all from
/// the single point-of-view of the `primary` component, returning an iterator of `EntityView`s.
///
/// An initial entity-view is yielded with the latest-at state at the start of the time range, if
/// there is any.
///
/// The iterator only ever yields entity-views iff the `primary` component has changed: a change
/// affecting only secondary components will not yield an entity-view.
/// However, the changes in those secondary components will be accumulated into the next yielded
/// entity-view.
///
/// This is a streaming-join: every yielded entity-view will be the result of joining the latest
/// known state of all components, from their respective point-of-views.
///
/// ⚠ The semantics are subtle! See `examples/range.rs` for an example of use.
pub fn range_entity_with_primary<'a, Primary: Component + 'a, const N: usize>(
store: &'a DataStore,
query: &'a RangeQuery,
ent_path: &'a ObjPath,
components: [ComponentName; N],
) -> impl Iterator<Item = (TimeInt, EntityView<Primary>)> + 'a {
let primary = Primary::name();
let cluster_key = store.cluster_key();

// TODO(cmc): Ideally, we'd want to simply add the cluster and primary key to the `components`
// array if they are missing, yielding either `[ComponentName; N+1]` or `[ComponentName; N+2]`.
// Unfortunately this is not supported on stable at the moment, and requires
// feature(generic_const_exprs) on nightly.
//
// The alternative to these assertions (and thus putting the burden on the caller), for now,
// would be to drop the constant sizes all the way down, which would be way more painful to
// deal with.
assert!(components.contains(&cluster_key));
assert!(components.contains(&primary));

let cluster_col = components
.iter()
.find_position(|component| **component == cluster_key)
.map(|(col, _)| col)
.unwrap(); // asserted on entry
let primary_col = components
.iter()
.find_position(|component| **component == primary)
.map(|(col, _)| col)
.unwrap(); // asserted on entry

let mut state: Vec<_> = std::iter::repeat_with(|| None)
.take(components.len())
.collect();

let latest_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

if let Some(latest_time) = latest_time {
// Fetch the latest data for every single component from their respective point-of-views,
// this will allow us to build up the initial state and send an initial latest-at
// entity-view if needed.
for (i, primary) in components.iter().enumerate() {
let cwi = get_component_with_instances(
store,
&LatestAtQuery::new(query.timeline, latest_time),
ent_path,
*primary,
);
state[i] = cwi.ok();
}
}

// Iff the primary component has some initial state, then we want to be sending out an initial
// entity-view.
let ent_view_latest =
if let (Some(latest_time), Some(cwi_prim)) = (latest_time, &state[primary_col]) {
let ent_view = EntityView {
primary: cwi_prim.clone(),
components: components
.iter()
.copied()
.zip(state.iter().cloned() /* shallow */)
.filter(|(component, _)| *component != Primary::name())
.filter_map(|(component, cwi)| cwi.map(|cwi| (component, cwi)))
.collect(),
phantom: std::marker::PhantomData,
};
Some((latest_time, ent_view))
} else {
None
};

let range = store
.range(query, ent_path, components)
.map(move |(time, _, row_indices)| {
let results = store.get(&components, &row_indices);
(
time,
row_indices[primary_col].is_some(), // is_primary
results,
)
});

ent_view_latest
.into_iter()
.chain(range.filter_map(move |(time, is_primary, results)| {
for (i, component) in components.iter().copied().enumerate() {
if let Some(res) = results[i].as_ref() {
state[i] = Some(ComponentWithInstances {
name: component,
instance_keys: results[cluster_col].clone(), // shallow
values: res.clone(), // shallow
});
}
}

// We only yield if the primary component has been updated!
is_primary.then(|| {
let ent_view = EntityView {
// safe to unwrap, set just above
primary: state[primary_col].clone().unwrap(), // shallow
components: components
.iter()
.zip(state.iter().cloned() /* shallow */)
.filter_map(|(component, cwi)| cwi.map(|cwi| (*component, cwi)))
.collect(),
phantom: std::marker::PhantomData,
};
(time, ent_view)
})
}))
}
Loading

1 comment on commit 1d28875

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 1d28875 Previous: f4c7d43 Ratio
datastore/insert/batch/rects/insert 170871 ns/iter (± 1144) 172198 ns/iter (± 573) 0.99
datastore/latest_at/batch/rects/query 726 ns/iter (± 1) 732 ns/iter (± 4) 0.99
datastore/latest_at/missing_components/primary 299 ns/iter (± 0) 298 ns/iter (± 1) 1.00
datastore/latest_at/missing_components/secondaries 384 ns/iter (± 0) 383 ns/iter (± 0) 1.00
datastore/range/batch/rects/query 45826 ns/iter (± 64) 45707 ns/iter (± 84) 1.00
obj_mono_points/insert 984012092 ns/iter (± 5857099) 844107087 ns/iter (± 4108346) 1.17
obj_mono_points/query 351515 ns/iter (± 1547) 365166 ns/iter (± 3017) 0.96
obj_batch_points/insert 95383856 ns/iter (± 513603) 86515767 ns/iter (± 322674) 1.10
obj_batch_points/query 11312 ns/iter (± 47) 11345 ns/iter (± 25) 1.00
obj_batch_points_sequential/insert 23146684 ns/iter (± 236515) 22972935 ns/iter (± 163465) 1.01
obj_batch_points_sequential/query 7909 ns/iter (± 19) 7900 ns/iter (± 35) 1.00
mono_points_classic/generate_messages 4381574 ns/iter (± 117545) 4460557 ns/iter (± 98276) 0.98
mono_points_classic/encode_log_msg 10687902 ns/iter (± 661819) 10992254 ns/iter (± 490935) 0.97
mono_points_classic/encode_total 15207799 ns/iter (± 956548) 16087941 ns/iter (± 1247907) 0.95
mono_points_classic/decode_total 36342832 ns/iter (± 949868) 36034636 ns/iter (± 821158) 1.01
mono_points_arrow/generate_message_bundles 27691863 ns/iter (± 1768002) 28177100 ns/iter (± 1020898) 0.98
mono_points_arrow/generate_messages 102563364 ns/iter (± 1046854) 94086421 ns/iter (± 857424) 1.09
mono_points_arrow/encode_log_msg 129161710 ns/iter (± 951038) 120267805 ns/iter (± 994832) 1.07
mono_points_arrow/encode_total 262380439 ns/iter (± 1762644) 241927627 ns/iter (± 1419060) 1.08
mono_points_arrow/decode_log_msg 146318512 ns/iter (± 969580) 138261775 ns/iter (± 624091) 1.06
mono_points_arrow/decode_message_bundles 59214198 ns/iter (± 923106) 51319149 ns/iter (± 711426) 1.15
mono_points_arrow/decode_total 201380477 ns/iter (± 1758582) 187226736 ns/iter (± 2144223) 1.08
batch_points_classic/generate_messages 3458 ns/iter (± 13) 3399 ns/iter (± 23) 1.02
batch_points_classic/encode_log_msg 393779 ns/iter (± 646) 391060 ns/iter (± 1098) 1.01
batch_points_classic/encode_total 400243 ns/iter (± 805) 400926 ns/iter (± 1126) 1.00
batch_points_classic/decode_total 745873 ns/iter (± 4368) 748069 ns/iter (± 2272) 1.00
batch_points_arrow/generate_message_bundles 243846 ns/iter (± 693) 244852 ns/iter (± 382) 1.00
batch_points_arrow/generate_messages 4550 ns/iter (± 18) 4580 ns/iter (± 16) 0.99
batch_points_arrow/encode_log_msg 264572 ns/iter (± 899) 264407 ns/iter (± 741) 1.00
batch_points_arrow/encode_total 542300 ns/iter (± 1701) 535540 ns/iter (± 1998) 1.01
batch_points_arrow/decode_log_msg 205247 ns/iter (± 444) 206944 ns/iter (± 449) 0.99
batch_points_arrow/decode_message_bundles 1662 ns/iter (± 3) 1702 ns/iter (± 7) 0.98
batch_points_arrow/decode_total 210884 ns/iter (± 527) 210707 ns/iter (± 447) 1.00
arrow_mono_points/insert 4066392993 ns/iter (± 12559278) 3532698352 ns/iter (± 8983778) 1.15
arrow_mono_points/query 1658457 ns/iter (± 23021) 1657753 ns/iter (± 20414) 1.00
arrow_batch_points/insert 1477736 ns/iter (± 5292) 1480920 ns/iter (± 9062) 1.00
arrow_batch_points/query 16874 ns/iter (± 31) 15628 ns/iter (± 97) 1.08
obj_batch_points_sequential/Tuid::random 37 ns/iter (± 0) 38 ns/iter (± 0) 0.97

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.