Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary caching 14: don't bake LatestAt(T-1) results into low-level range queries #4793

Merged
merged 6 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_data_store/examples/range_components.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Demonstrates usage of [`re_data_store::polars_util::range_components`].
//!
//! ```text
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_data_store --example range_components
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_data_store --all-features --example range_components
//! ```

use polars_core::prelude::JoinType;
Expand Down
51 changes: 9 additions & 42 deletions crates/re_data_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ pub fn latest_components(
/// Iterates over the rows of any number of components and their respective cluster keys, all from
/// the single point-of-view of the `primary` component, returning an iterator of `DataFrame`s.
///
/// An initial dataframe is yielded with the latest-at state at the start of the time range, if
/// there is any.
///
/// The iterator only ever yields dataframes iff the `primary` component has changed.
/// A change affecting only secondary components will not yield a dataframe.
///
Expand Down Expand Up @@ -126,51 +123,21 @@ pub fn range_components<'a, const N: usize>(

let mut state = None;

// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let latest_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut df_latest = None;
if let Some(latest_time) = latest_time {
let df = latest_components(
store,
&LatestAtQuery::new(query.timeline, latest_time),
ent_path,
&components,
join_type,
);

if df.as_ref().map_or(false, |df| {
// We only care about the initial state if it A) isn't empty and B) contains any data
// at all for the primary component.
!df.is_empty() && df.column(primary.as_ref()).is_ok()
}) {
df_latest = Some(df);
}
}

let primary_col = components
.iter()
.find_position(|component| **component == primary)
.map(|(col, _)| col)
.unwrap(); // asserted on entry

// send the latest-at state before anything else
df_latest
.into_iter()
.map(move |df| (latest_time, true, df))
// followed by the range
.chain(
store
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(&cells),
)
}),
)
store
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(&cells),
)
})
.filter_map(move |(time, is_primary, df)| {
state = Some(join_dataframes(
cluster_key,
Expand Down
109 changes: 22 additions & 87 deletions crates/re_data_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@ fn range_impl(store: &mut DataStore) {

let ent_path = EntityPath::from("this/that");

let frame0 = TimeInt::from(0);
let frame1 = TimeInt::from(1);
let frame2 = TimeInt::from(2);
let frame3 = TimeInt::from(3);
Expand Down Expand Up @@ -554,61 +553,40 @@ fn range_impl(store: &mut DataStore) {

// Unit ranges (Color's PoV)

// NOTE: Check out [1] to see what the results would've looked like with latest-at semantics at
// T-1 baked in (like we used to do).
//
// [1]: <https://github.com/rerun-io/rerun/blob/790f391/crates/re_data_store/tests/data_store.rs#L555-L837>

assert_range_components(
TimeRange::new(frame1, frame1),
[Color::name(), Position2D::name()],
&[
(
Some(frame0),
&[(Color::name(), &row4_3), (Position2D::name(), &row4_4)],
), // timeless
(
Some(frame1),
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
),
],
&[(
Some(frame1),
&[(Color::name(), &row1)], //
)],
);
assert_range_components(
TimeRange::new(frame2, frame2),
[Color::name(), Position2D::name()],
&[
(
Some(frame1),
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
), //
],
&[],
);
assert_range_components(
TimeRange::new(frame3, frame3),
[Color::name(), Position2D::name()],
&[
(
Some(frame2),
&[(Color::name(), &row1), (Position2D::name(), &row2)],
), //
],
&[],
);
assert_range_components(
TimeRange::new(frame4, frame4),
[Color::name(), Position2D::name()],
&[
(
Some(frame3),
&[(Color::name(), &row1), (Position2D::name(), &row3)],
),
(
Some(frame4),
&[(Color::name(), &row4_1), (Position2D::name(), &row3)],
&[(Color::name(), &row4_1)], //
),
(
Some(frame4),
&[(Color::name(), &row4_2), (Position2D::name(), &row3)],
&[(Color::name(), &row4_2)], //
),
(
Some(frame4),
Expand All @@ -619,65 +597,38 @@ fn range_impl(store: &mut DataStore) {
assert_range_components(
TimeRange::new(frame5, frame5),
[Color::name(), Position2D::name()],
&[
(
Some(frame4),
&[(Color::name(), &row4_3), (Position2D::name(), &row4_4)], // !!!
), //
],
&[],
);

// Unit ranges (Position2D's PoV)

assert_range_components(
TimeRange::new(frame1, frame1),
[Position2D::name(), Color::name()],
&[
(
Some(frame0),
&[(Position2D::name(), &row4_4), (Color::name(), &row4_3)],
), // timeless
],
&[],
);
assert_range_components(
TimeRange::new(frame2, frame2),
[Position2D::name(), Color::name()],
&[
(
Some(frame1),
&[
(Position2D::name(), &row4_4), // timeless
(Color::name(), &row1),
],
),
(
Some(frame2),
&[(Position2D::name(), &row2), (Color::name(), &row1)],
&[(Position2D::name(), &row2)], //
), //
],
);
assert_range_components(
TimeRange::new(frame3, frame3),
[Position2D::name(), Color::name()],
&[
(
Some(frame2),
&[(Position2D::name(), &row2), (Color::name(), &row1)],
),
(
Some(frame3),
&[(Position2D::name(), &row3), (Color::name(), &row1)],
),
],
&[(
Some(frame3),
&[(Position2D::name(), &row3)], //
)],
);
assert_range_components(
TimeRange::new(frame4, frame4),
[Position2D::name(), Color::name()],
&[
(
Some(frame3),
&[(Position2D::name(), &row3), (Color::name(), &row1)],
),
(
Some(frame4),
&[(Position2D::name(), &row4_25), (Color::name(), &row4_2)],
Expand All @@ -691,12 +642,7 @@ fn range_impl(store: &mut DataStore) {
assert_range_components(
TimeRange::new(frame5, frame5),
[Position2D::name(), Color::name()],
&[
(
Some(frame4),
&[(Position2D::name(), &row4_4), (Color::name(), &row4_3)],
), //
],
&[],
);

// Full range (Color's PoV)
Expand All @@ -705,16 +651,9 @@ fn range_impl(store: &mut DataStore) {
TimeRange::new(frame1, frame5),
[Color::name(), Position2D::name()],
&[
(
Some(frame0),
&[(Color::name(), &row4_3), (Position2D::name(), &row4_4)],
), // timeless
(
Some(frame1),
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
&[(Color::name(), &row1)], //
),
(
Some(frame4),
Expand All @@ -737,10 +676,6 @@ fn range_impl(store: &mut DataStore) {
TimeRange::new(frame1, frame5),
[Position2D::name(), Color::name()],
&[
(
Some(frame0),
&[(Position2D::name(), &row4_4), (Color::name(), &row4_3)],
), // timeless
(
Some(frame2),
&[(Position2D::name(), &row2), (Color::name(), &row1)],
Expand Down
80 changes: 24 additions & 56 deletions crates/re_query/src/range.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use itertools::Itertools as _;
use re_data_store::{DataStore, LatestAtQuery, RangeQuery};
use re_data_store::{DataStore, RangeQuery};
use re_log_types::EntityPath;
use re_types_core::{Archetype, ComponentName};

use crate::{get_component_with_instances, ArchetypeView, ComponentWithInstances};
use crate::{ArchetypeView, ComponentWithInstances};

// ---

Expand Down Expand Up @@ -61,61 +61,29 @@ pub fn range_archetype<'a, A: Archetype + 'a, const N: usize>(
.take(components.len())
.collect();

// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let query_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut cwis_latest = None;
if let Some(query_time) = query_time {
let mut cwis_latest_raw: Vec<_> = std::iter::repeat_with(|| None)
.take(components.len())
.collect();

// Fetch the latest data for every single component from their respective point-of-views,
// this will allow us to build up the initial state and send an initial latest-at
// entity-view if needed.
for (i, primary) in components.iter().enumerate() {
cwis_latest_raw[i] = get_component_with_instances(
store,
&LatestAtQuery::new(query.timeline, query_time),
ent_path,
*primary,
)
.map(|(_, row_id, cwi)| (row_id, cwi));
}

if cwis_latest_raw[primary_col].is_some() {
cwis_latest = Some(cwis_latest_raw);
}
}

// send the latest-at state before anything else
cwis_latest
.into_iter()
.map(move |cwis| (query_time, true, cwis))
.chain(store.range(query, ent_path, components).map(
move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
})
store
.range(query, ent_path, components)
.map(move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
},
))
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
})
.filter_map(move |(data_time, is_primary, cwis)| {
for (i, cwi) in cwis
.into_iter()
Expand Down
Loading
Loading