Skip to content

Commit

Permalink
Primary caching 17: timeless range (#4852)
Browse files Browse the repository at this point in the history
Simply add a timeless path for the range cache, and actually only
iterate over the range the user asked for (we were still blindly
iterating over everything until now).

Also some very minimal clean up related to #4832, but we have a long way
to go...
- #4832

---

- Fixes #4821 

---

Part of the primary caching series of PR (index search, joins,
deserialization):
- #4592
- #4593
- #4659
- #4680 
- #4681
- #4698
- #4711
- #4712
- #4721 
- #4726 
- #4773
- #4784
- #4785
- #4793
- #4800
- #4851
- #4852
- #4853
- #4856
  • Loading branch information
teh-cmc authored Jan 23, 2024
1 parent 20c768f commit 5800d1a
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 61 deletions.
3 changes: 2 additions & 1 deletion crates/re_data_store/src/store_format.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use re_format::{format_bytes, format_number};
use re_log_types::TimeInt;
use re_types_core::SizeBytes as _;

use crate::{DataStore, IndexedBucket, IndexedTable, PersistentIndexedTable};
Expand Down Expand Up @@ -129,7 +130,7 @@ impl std::fmt::Display for IndexedBucket {

let time_range = {
let time_range = &self.inner.read().time_range;
if time_range.min.as_i64() != i64::MAX && time_range.max.as_i64() != i64::MIN {
if time_range.min != TimeInt::MAX && time_range.max != TimeInt::MIN {
format!(
" - {}: {}",
self.timeline.name(),
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl std::fmt::Debug for RangeQuery {
self.timeline.typ().format_utc(self.range.min),
self.timeline.typ().format_utc(self.range.max),
self.timeline.name(),
if self.range.min == TimeInt::MIN {
if self.range.min <= TimeInt::MIN {
"including"
} else {
"excluding"
Expand Down Expand Up @@ -494,7 +494,7 @@ impl DataStore {
.flatten()
.map(|(time, row_id, cells)| (Some(time), row_id, cells));

if query.range.min == TimeInt::MIN {
if query.range.min <= TimeInt::MIN {
let timeless = self
.timeless_tables
.get(&ent_path_hash)
Expand Down
16 changes: 13 additions & 3 deletions crates/re_data_store/src/store_sanity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use re_log_types::{DataCellColumn, NumInstances, RowId, TimeRange, VecDequeSortingExt as _};
use re_log_types::{
DataCellColumn, NumInstances, RowId, TimeInt, TimeRange, VecDequeSortingExt as _,
};
use re_types_core::{ComponentName, Loggable, SizeBytes as _};

use crate::{
Expand Down Expand Up @@ -179,8 +181,16 @@ impl IndexedBucket {
let mut times = col_time.clone();
times.sort();

let expected_min = times.front().copied().unwrap_or(i64::MAX).into();
let expected_max = times.back().copied().unwrap_or(i64::MIN).into();
let expected_min = times
.front()
.copied()
.unwrap_or(TimeInt::MAX.as_i64())
.into();
let expected_max = times
.back()
.copied()
.unwrap_or(TimeInt::MIN.as_i64())
.into();
let expected_time_range = TimeRange::new(expected_min, expected_max);

if expected_time_range != *time_range {
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ fn range_join_across_single_row_impl(store: &mut DataStore) {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = re_data_store::RangeQuery::new(
timeline_frame_nr,
re_data_store::TimeRange::new(i64::MIN.into(), i64::MAX.into()),
re_data_store::TimeRange::new(TimeInt::MIN, TimeInt::MAX),
);
let components = [InstanceKey::name(), Position2D::name(), Color::name()];
let dfs = re_data_store::polars_util::range_components(
Expand Down
1 change: 1 addition & 0 deletions crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl std::fmt::Display for RowId {

impl RowId {
pub const ZERO: Self = Self(re_tuid::Tuid::ZERO);
pub const MAX: Self = Self(re_tuid::Tuid::MAX);

/// Create a new unique [`RowId`] based on the current time.
#[allow(clippy::new_without_default)]
Expand Down
1 change: 1 addition & 0 deletions crates/re_log_types/src/time_point/time_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl TimeInt {
// a bit of leeway.
pub const BEGINNING: Self = Self(i64::MIN / 2);

// TODO(#4832): `TimeInt::BEGINNING` vs. `TimeInt::MIN` vs. `Option<TimeInt>`…
pub const MIN: Self = Self(i64::MIN);
pub const MAX: Self = Self(i64::MAX);

Expand Down
96 changes: 85 additions & 11 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, VecDeque},
ops::Range,
sync::Arc,
};

Expand Down Expand Up @@ -327,6 +328,8 @@ impl CachesPerArchetype {

re_tracing::profile_function!();

// TODO(cmc): range invalidation

for latest_at_cache in self.latest_at_per_archetype.read().values() {
let mut latest_at_cache = latest_at_cache.write();

Expand Down Expand Up @@ -419,12 +422,6 @@ pub struct CacheBucket {
}

impl CacheBucket {
/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.iter()
}

#[inline]
pub fn time_range(&self) -> Option<TimeRange> {
let first_time = self.data_times.front().map(|(t, _)| *t)?;
Expand All @@ -444,6 +441,25 @@ impl CacheBucket {
self.data_times.binary_search(&(data_time, row_id)).is_ok()
}

/// How many timestamps' worth of data is stored in this bucket?
#[inline]
pub fn num_entries(&self) -> usize {
self.data_times.len()
}

#[inline]
pub fn is_empty(&self) -> bool {
self.num_entries() == 0
}

// ---

/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.iter()
}

/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
#[inline]
pub fn iter_pov_instance_keys(&self) -> impl Iterator<Item = &[InstanceKey]> {
Expand Down Expand Up @@ -474,15 +490,73 @@ impl CacheBucket {
Some(data.iter())
}

/// How many timestamps' worth of data is stored in this bucket?
// ---

/// Returns the index range that corresponds to the specified `time_range`.
///
/// Use the returned range with one of the range iteration methods:
/// - [`Self::range_data_times`]
/// - [`Self::range_pov_instance_keys`]
/// - [`Self::range_component`]
/// - [`Self::range_component_opt`]
///
/// Make sure that the bucket hasn't been modified in-between!
///
/// This is `O(2*log(n))`, so make sure to clone the returned range rather than calling this
/// multiple times.
#[inline]
pub fn num_entries(&self) -> usize {
self.data_times.len()
pub fn entry_range(&self, time_range: TimeRange) -> Range<usize> {
let start_index = self
.data_times
.partition_point(|(data_time, _)| data_time < &time_range.min);
let end_index = self
.data_times
.partition_point(|(data_time, _)| data_time <= &time_range.max);
start_index..end_index
}

/// Range over the timestamps of the point-of-view components.
#[inline]
pub fn is_empty(&self) -> bool {
self.num_entries() == 0
pub fn range_data_times(
&self,
entry_range: Range<usize>,
) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.range(entry_range)
}

/// Range over the [`InstanceKey`] batches of the point-of-view components.
#[inline]
pub fn range_pov_instance_keys(
&self,
entry_range: Range<usize>,
) -> impl Iterator<Item = &[InstanceKey]> {
self.pov_instance_keys.range(entry_range)
}

/// Range over the batches of the specified non-optional component.
#[inline]
pub fn range_component<C: Component + Send + Sync + 'static>(
&self,
entry_range: Range<usize>,
) -> Option<impl Iterator<Item = &[C]>> {
let data = self
.components
.get(&C::name())
.and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<C>>())?;
Some(data.range(entry_range))
}

/// Range over the batches of the specified optional component.
#[inline]
pub fn range_component_opt<C: Component + Send + Sync + 'static>(
&self,
entry_range: Range<usize>,
) -> Option<impl Iterator<Item = &[Option<C>]>> {
let data = self
.components
.get(&C::name())
.and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<Option<C>>>())?;
Some(data.range(entry_range))
}
}

Expand Down
47 changes: 23 additions & 24 deletions crates/re_query_cache/src/cache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::BTreeMap;
use re_log_types::{EntityPath, TimeRange, Timeline};
use re_types_core::ComponentName;

use crate::{Caches, LatestAtCache, RangeCache};
use crate::{cache::CacheBucket, Caches, LatestAtCache, RangeCache};

// ---

Expand Down Expand Up @@ -71,6 +71,18 @@ impl Caches {
pub fn stats(detailed_stats: bool) -> CachesStats {
re_tracing::profile_function!();

fn upsert_bucket_stats(
per_component: &mut BTreeMap<ComponentName, CachedComponentStats>,
bucket: &CacheBucket,
) {
for (component_name, data) in &bucket.components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
}

Self::with(|caches| {
let latest_at = caches
.0
Expand Down Expand Up @@ -98,22 +110,12 @@ impl Caches {
if let Some(per_component) = per_component.as_mut() {
re_tracing::profile_scope!("detailed");

for bucket in per_data_time.values() {
for (component_name, data) in &bucket.read().components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
if let Some(bucket) = &timeless {
upsert_bucket_stats(per_component, bucket);
}

if let Some(bucket) = &timeless {
for (component_name, data) in &bucket.components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
for bucket in per_data_time.values() {
upsert_bucket_stats(per_component, &bucket.read());
}
}
}
Expand All @@ -140,27 +142,24 @@ impl Caches {
.values()
.map(|range_cache| {
let RangeCache {
bucket,
per_data_time,
timeless,
total_size_bytes,
} = &*range_cache.read();

let total_rows = bucket.data_times.len() as u64;
let total_rows = per_data_time.data_times.len() as u64;

let mut per_component = detailed_stats.then(BTreeMap::default);
if let Some(per_component) = per_component.as_mut() {
re_tracing::profile_scope!("detailed");

for (component_name, data) in &bucket.components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
upsert_bucket_stats(per_component, timeless);
upsert_bucket_stats(per_component, per_data_time);
}

(
key.timeline,
bucket.time_range().unwrap_or(TimeRange::EMPTY),
per_data_time.time_range().unwrap_or(TimeRange::EMPTY),
CachedEntityStats {
total_size_bytes: *total_size_bytes,
total_rows,
Expand Down
1 change: 0 additions & 1 deletion crates/re_query_cache/src/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ macro_rules! impl_query_archetype_latest_at {
f(data);
}


Ok(())
};

Expand Down
Loading

0 comments on commit 5800d1a

Please sign in to comment.