Skip to content

Commit

Permalink
latest_at now returns the _data_ time too
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Jan 8, 2024
1 parent de76817 commit 0b7e1dd
Show file tree
Hide file tree
Showing 21 changed files with 133 additions and 84 deletions.
2 changes: 1 addition & 1 deletion crates/re_data_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ fn latest_data_at<const N: usize>(

store
.latest_at(&timeline_query, &ent_path, primary, secondaries)
.map_or_else(|| [(); N].map(|_| None), |(_, cells)| cells)
.map_or_else(|| [(); N].map(|_| None), |(_, _, cells)| cells)
}

fn range_data<const N: usize>(
Expand Down
5 changes: 4 additions & 1 deletion crates/re_data_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ pub fn latest_component(
let components = &[cluster_key, primary];
let (_, cells) = store
.latest_at(query, ent_path, primary, components)
.unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
.map_or_else(
|| (RowId::ZERO, [(); 2].map(|_| None)),
|(_, row_id, cells)| (row_id, cells),
);

dataframe_from_cells(&cells)
}
Expand Down
42 changes: 28 additions & 14 deletions crates/re_data_store/src/store_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimePoint, Timeline};
use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimeInt, TimePoint, Timeline};

use re_types_core::{Component, ComponentName};

Expand Down Expand Up @@ -35,7 +35,8 @@ impl<C: Component> std::ops::Deref for VersionedComponent<C> {
}

impl DataStore {
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will generate a log message of `level` otherwise.
Expand All @@ -48,10 +49,11 @@ impl DataStore {
entity_path: &EntityPath,
query: &LatestAtQuery,
level: re_log::Level,
) -> Option<VersionedComponent<C>> {
) -> Option<(Option<TimeInt>, VersionedComponent<C>)> {
re_tracing::profile_function!();

let (row_id, cells) = self.latest_at(query, entity_path, C::name(), &[C::name()])?;
let (data_time, row_id, cells) =
self.latest_at(query, entity_path, C::name(), &[C::name()])?;
let cell = cells.first()?.as_ref()?;

cell.try_to_native_mono::<C>()
Expand Down Expand Up @@ -93,10 +95,11 @@ impl DataStore {
err
})
.ok()?
.map(|c| (row_id, c).into())
.map(|c| (data_time, (row_id, c).into()))
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will log a warning otherwise.
Expand All @@ -109,11 +112,12 @@ impl DataStore {
&self,
entity_path: &EntityPath,
query: &LatestAtQuery,
) -> Option<VersionedComponent<C>> {
) -> Option<(Option<TimeInt>, VersionedComponent<C>)> {
self.query_latest_component_with_log_level(entity_path, query, re_log::Level::Warn)
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
/// _data_ time and [`RowId`].
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will return None and log a debug message otherwise.
Expand All @@ -126,7 +130,7 @@ impl DataStore {
&self,
entity_path: &EntityPath,
query: &LatestAtQuery,
) -> Option<VersionedComponent<C>> {
) -> Option<(Option<TimeInt>, VersionedComponent<C>)> {
self.query_latest_component_with_log_level(entity_path, query, re_log::Level::Debug)
}

Expand All @@ -135,20 +139,21 @@ impl DataStore {
&self,
entity_path: &EntityPath,
query: &LatestAtQuery,
) -> Option<(EntityPath, VersionedComponent<C>)> {
) -> Option<(EntityPath, Option<TimeInt>, VersionedComponent<C>)> {
re_tracing::profile_function!();

let mut cur_path = Some(entity_path.clone());
while let Some(path) = cur_path {
if let Some(c) = self.query_latest_component::<C>(&path, query) {
return Some((path, c));
if let Some((data_time, c)) = self.query_latest_component::<C>(&path, query) {
return Some((path, data_time, c));
}
cur_path = path.parent();
}
None
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`], assuming it is timeless.
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`],
/// assuming it is timeless.
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will log a warning otherwise.
Expand All @@ -164,9 +169,14 @@ impl DataStore {

let query = LatestAtQuery::latest(Timeline::default());
self.query_latest_component(entity_path, &query)
.map(|(data_time, vc)| {
debug_assert!(data_time.is_none());
vc
})
}

/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`], assuming it is timeless.
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`],
/// assuming it is timeless.
///
/// This assumes that the row we get from the store only contains a single instance for this
/// component; it will return None and log a debug message otherwise.
Expand All @@ -182,6 +192,10 @@ impl DataStore {

let query = LatestAtQuery::latest(Timeline::default());
self.query_latest_component_quiet(entity_path, &query)
.map(|(data_time, vc)| {
debug_assert!(data_time.is_none());
vc
})
}
}

Expand Down
75 changes: 44 additions & 31 deletions crates/re_data_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ impl DataStore {
/// Queries the datastore for the cells of the specified `components`, as seen from the point
/// of view of the so-called `primary` component.
///
/// Returns an array of [`DataCell`]s on success, or `None` otherwise.
/// Success is defined by one thing and thing only: whether a cell could be found for the
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
/// success.
/// Success is defined by one thing and one thing only: whether a cell could be found for the
/// `primary` component.
/// The presence or absence of secondary components has no effect on the success criteria.
///
Expand Down Expand Up @@ -266,7 +267,7 @@ impl DataStore {
ent_path: &EntityPath,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<(RowId, [Option<DataCell>; N])> {
) -> Option<(Option<TimeInt>, RowId, [Option<DataCell>; N])> {
// TODO(cmc): kind & query_id need to somehow propagate through the span system.
self.query_id.fetch_add(1, Ordering::Relaxed);

Expand All @@ -282,7 +283,7 @@ impl DataStore {
"query started…"
);

let cells = self
let results = self
.tables
.get(&(ent_path_hash, query.timeline))
.and_then(|table| {
Expand All @@ -301,11 +302,11 @@ impl DataStore {

// If we've found everything we were looking for in the temporal table, then we can
// return the results immediately.
if cells
if results
.as_ref()
.map_or(false, |(_, cells)| cells.iter().all(Option::is_some))
.map_or(false, |(_, _, cells)| cells.iter().all(Option::is_some))
{
return cells;
return results.map(|(data_time, row_id, cells)| (Some(data_time), row_id, cells));
}

let cells_timeless = self.timeless_tables.get(&ent_path_hash).and_then(|table| {
Expand All @@ -324,21 +325,28 @@ impl DataStore {
});

// Otherwise, let's see what's in the timeless table, and then..:
match (cells, cells_timeless) {
match (results, cells_timeless) {
// nothing in the timeless table: return those partial cells we got.
(Some(cells), None) => return Some(cells),
(results @ Some(_), None) => {
return results.map(|(data_time, row_id, cells)| (Some(data_time), row_id, cells))
}

// no temporal cells, but some timeless ones: return those as-is.
(None, Some(cells_timeless)) => return Some(cells_timeless),
(None, results @ Some(_)) => {
return results.map(|(row_id, cells)| (None, row_id, cells))
}

// we have both temporal & timeless cells: let's merge the two when it makes sense
// and return the end result.
(Some((row_id, mut cells)), Some((_, cells_timeless))) => {
(Some((data_time, row_id, mut cells)), Some((_, cells_timeless))) => {
for (i, row_idx) in cells_timeless.into_iter().enumerate() {
if cells[i].is_none() {
cells[i] = row_idx;
}
}
return Some((row_id, cells));
return Some((Some(data_time), row_id, cells));
}

// no cells at all.
(None, None) => {}
}
Expand Down Expand Up @@ -519,14 +527,14 @@ impl IndexedTable {
/// Queries the table for the cells of the specified `components`, as seen from the point
/// of view of the so-called `primary` component.
///
/// Returns an array of [`DataCell`]s on success, or `None` iff no cell could be found for
/// the `primary` component.
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
/// success, or `None` iff no cell could be found for the `primary` component.
pub fn latest_at<const N: usize>(
&self,
time: TimeInt,
query_time: TimeInt,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<(RowId, [Option<DataCell>; N])> {
) -> Option<(TimeInt, RowId, [Option<DataCell>; N])> {
// Early-exit if this entire table is unaware of this component.
if !self.all_components.contains(&primary) {
return None;
Expand All @@ -542,22 +550,22 @@ impl IndexedTable {
// multiple indexed buckets within the same table!

let buckets = self
.range_buckets_rev(..=time)
.range_buckets_rev(..=query_time)
.map(|(_, bucket)| bucket)
.enumerate();
for (attempt, bucket) in buckets {
trace!(
kind = "latest_at",
timeline = %timeline.name(),
time = timeline.typ().format_utc(time),
time = timeline.typ().format_utc(query_time),
%primary,
?components,
attempt,
bucket_time_range = timeline.typ().format_range_utc(bucket.inner.read().time_range),
"found candidate bucket"
);
if let cells @ Some(_) = bucket.latest_at(time, primary, components) {
return cells; // found at least the primary component!
if let ret @ Some(_) = bucket.latest_at(query_time, primary, components) {
return ret; // found at least the primary component!
}
}

Expand Down Expand Up @@ -717,14 +725,14 @@ impl IndexedBucket {
/// Queries the bucket for the cells of the specified `components`, as seen from the point
/// of view of the so-called `primary` component.
///
/// Returns an array of [`DataCell`]s on success, or `None` iff no cell could be found for
/// the `primary` component.
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
/// success, or `None` iff no cell could be found for the `primary` component.
pub fn latest_at<const N: usize>(
&self,
time: TimeInt,
query_time: TimeInt,
primary: ComponentName,
components: &[ComponentName; N],
) -> Option<(RowId, [Option<DataCell>; N])> {
) -> Option<(TimeInt, RowId, [Option<DataCell>; N])> {
self.sort_indices_if_needed();

let IndexedBucketInner {
Expand All @@ -748,11 +756,12 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
"searching for primary & secondary cells…"
);

let time_row_nr = col_time.partition_point(|t| *t <= time.as_i64()) as i64;
let time_row_nr =
col_time.partition_point(|data_time| *data_time <= query_time.as_i64()) as i64;

// The partition point is always _beyond_ the index that we're looking for.
// A partition point of 0 thus means that we're trying to query for data that lives
Expand All @@ -769,7 +778,7 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr,
"found primary row number",
);
Expand All @@ -783,7 +792,7 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr,
"no secondary row number found",
);
Expand All @@ -797,7 +806,7 @@ impl IndexedBucket {
%primary,
?components,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr, %secondary_row_nr,
"found secondary row number",
);
Expand All @@ -812,7 +821,7 @@ impl IndexedBucket {
%primary,
%component,
timeline = %self.timeline.name(),
time = self.timeline.typ().format_utc(time),
query_time = self.timeline.typ().format_utc(query_time),
%primary_row_nr, %secondary_row_nr,
"found cell",
);
Expand All @@ -821,7 +830,11 @@ impl IndexedBucket {
}
}

Some((col_row_id[secondary_row_nr as usize], cells))
Some((
col_time[secondary_row_nr as usize].into(),
col_row_id[secondary_row_nr as usize],
cells,
))
}

/// Iterates the bucket in order to return the cells of the specified `components`,
Expand Down
6 changes: 4 additions & 2 deletions crates/re_data_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ fn row_id_ordering_semantics() -> anyhow::Result<()> {

let got_point = store
.query_latest_component::<MyPoint>(&entity_path, &query)
.map(|(_, data)| data)
.unwrap()
.value;
similar_asserts::assert_eq!(point2, got_point);
Expand Down Expand Up @@ -141,6 +142,7 @@ fn row_id_ordering_semantics() -> anyhow::Result<()> {

let got_point = store
.query_latest_component::<MyPoint>(&entity_path, &query)
.map(|(_, data)| data)
.unwrap()
.value;
similar_asserts::assert_eq!(point1, got_point);
Expand Down Expand Up @@ -359,7 +361,7 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) {
// bunch of non-existing components
{
let components = &["they".into(), "dont".into(), "exist".into()];
let (_, cells) = store
let (_, _, cells) = store
.latest_at(
&LatestAtQuery::new(timeline_frame_nr, frame40),
&ent_path,
Expand All @@ -372,7 +374,7 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) {

// empty component list
{
let (_, cells) = store
let (_, _, cells) = store
.latest_at(
&LatestAtQuery::new(timeline_frame_nr, frame40),
&ent_path,
Expand Down
3 changes: 2 additions & 1 deletion crates/re_data_ui/src/annotation_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ fn annotation_info(
let class_id = ctx
.entity_db
.store()
.query_latest_component::<re_types::components::ClassId>(entity_path, query)?;
.query_latest_component::<re_types::components::ClassId>(entity_path, query)
.map(|(_, data)| data)?;
let annotations = crate::annotations(ctx, query, entity_path);
let class = annotations.resolved_class_description(Some(*class_id));
class.keypoint_map?.get(&keypoint_id).cloned()
Expand Down
Loading

0 comments on commit 0b7e1dd

Please sign in to comment.