Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary caching 8: implement latest-at data-time cache entry deduplication #4712

Merged
merged 3 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 24 additions & 31 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ impl CacheKey {
/// of data.
#[derive(Default)]
pub struct CacheBucket {
/// The timestamps and [`RowId`]s of all cached rows.
/// The _data_ timestamps and [`RowId`]s of all cached rows.
///
/// Reminder: within a single timestamp, rows are sorted according to their [`RowId`]s.
pub(crate) pov_times: VecDeque<(TimeInt, RowId)>,
pub(crate) pov_data_times: VecDeque<(TimeInt, RowId)>,

/// The [`InstanceKey`]s of the point-of-view components.
pub(crate) pov_instance_keys: FlatVecDeque<InstanceKey>,
Expand All @@ -170,8 +170,8 @@ pub struct CacheBucket {
impl CacheBucket {
/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_pov_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.pov_times.iter()
pub fn iter_pov_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.pov_data_times.iter()
}

/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
Expand Down Expand Up @@ -207,7 +207,7 @@ impl CacheBucket {
/// How many timestamps' worth of data is stored in this bucket?
#[inline]
pub fn num_entries(&self) -> usize {
self.pov_times.len()
self.pov_data_times.len()
}

#[inline]
Expand Down Expand Up @@ -236,15 +236,15 @@ macro_rules! impl_insert {
re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M));

let Self {
pov_times,
pov_data_times,
pov_instance_keys,
components: _,
} = self;

let pov_row_id = arch_view.primary_row_id();
let index = pov_times.partition_point(|t| t < &(query_time, pov_row_id));
let index = pov_data_times.partition_point(|t| t < &(query_time, pov_row_id));

pov_times.insert(index, (query_time, pov_row_id));
pov_data_times.insert(index, (query_time, pov_row_id));
pov_instance_keys.insert(index, arch_view.iter_instance_keys());
$(self.insert_component::<A, $pov>(index, arch_view)?;)+
$(self.insert_component_opt::<A, $comp>(index, arch_view)?;)*
Expand Down Expand Up @@ -332,30 +332,23 @@ impl CacheBucket {
// which is notoriously painful in Rust (i.e., macros).
// For this reason we move as much of the code as possible into the already existing macros in `query.rs`.

/// Caches the results of `LatestAt` queries.
/// Caches the results of `LatestAt` archetype queries (`ArchetypeView`).
///
/// The `TimeInt` in the index corresponds to the timestamp of the query, _not_ the timestamp of
/// the resulting data!
//
// TODO(cmc): we need an extra indirection layer so that cached entries can be shared across
// queries with different query timestamps but identical data timestamps.
// This requires keeping track of all `RowId`s in `ArchetypeView`, not just the `RowId` of the
// point-of-view component.
/// There is one `LatestAtCache` for each unique [`CacheKey`].
///
/// All query steps are cached: index search, cluster key joins and deserialization.
#[derive(Default)]
pub struct LatestAtCache(BTreeMap<TimeInt, CacheBucket>);

impl std::ops::Deref for LatestAtCache {
type Target = BTreeMap<TimeInt, CacheBucket>;

#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub struct LatestAtCache {
/// Organized by _query_ time.
///
/// If the data you're looking for isn't in here, try partially running the query (i.e. run the
/// index search in order to find a data time, but don't actually deserialize and join the data)
/// and check if there is any data available for the resulting _data_ time in [`Self::per_data_time`].
pub per_query_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,

impl std::ops::DerefMut for LatestAtCache {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
/// Organized by _data_ time.
///
/// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0`
/// can result in a data time of `T`.
pub per_data_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,
}
2 changes: 1 addition & 1 deletion crates/re_query_cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use self::query::{
query_archetype_pov1, query_archetype_with_history_pov1, MaybeCachedComponentData,
};

pub(crate) use self::cache::LatestAtCache;
pub(crate) use self::cache::{CacheBucket, LatestAtCache};

pub use re_query::{QueryError, Result}; // convenience

Expand Down
94 changes: 68 additions & 26 deletions crates/re_query_cache/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,35 +106,12 @@ macro_rules! impl_query_archetype {
format!("cached={cached} arch={} pov={} comp={}", A::name(), $N, $M)
);

let mut latest_at_callback = |query: &LatestAtQuery, cache: &mut crate::LatestAtCache| {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));

let bucket = cache.entry(query.at).or_default();
// NOTE: Implicitly dropping the write guard here: the LatestAtCache is free once again!

if bucket.is_empty() {
re_tracing::profile_scope!("fill");

let now = web_time::Instant::now();
// TODO(cmc): cache deduplication.
let arch_view = query_archetype::<A>(store, &query, entity_path)?;

bucket.[<insert_pov $N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;

let elapsed = now.elapsed();
::re_log::trace!(
store_id=%store.id(),
%entity_path,
archetype=%A::name(),
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
1f64 / elapsed.as_secs_f64()
);
}

let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> {
re_tracing::profile_scope!("iter");

let it = itertools::izip!(
bucket.iter_pov_times(),
bucket.iter_pov_data_times(),
bucket.iter_pov_instance_keys(),
$(bucket.iter_component::<$pov>()
.ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
Expand All @@ -156,6 +133,71 @@ macro_rules! impl_query_archetype {
Ok(())
};

let mut latest_at_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));

let crate::LatestAtCache { per_query_time, per_data_time } = latest_at_cache;

let query_time_bucket_at_query_time = match per_query_time.entry(query.at) {
std::collections::btree_map::Entry::Occupied(query_time_bucket_at_query_time) => {
// Fastest path: we have an entry for this exact query time, no need to look any
// further.
return iter_results(&query_time_bucket_at_query_time.get().read());
}
entry @ std::collections::btree_map::Entry::Vacant(_) => entry,
};

let arch_view = query_archetype::<A>(store, &query, entity_path)?;
// TODO(cmc): actual timeless caching support.
let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN);

// Fast path: we've run the query and realized that we already have the data for the resulting
// _data_ time, so let's use that to avoid join & deserialization costs.
if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
*query_time_bucket_at_query_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);

// We now know for a fact that a query at that data time would yield the same
// results: copy the bucket accordingly so that the next cache hit for that query
// time ends up taking the fastest path.
let query_time_bucket_at_data_time = per_query_time.entry(data_time);
*query_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);

return iter_results(&data_time_bucket_at_data_time.read());
}

let query_time_bucket_at_query_time = query_time_bucket_at_query_time.or_default();

// Slowest path: this is a complete cache miss.
{
re_tracing::profile_scope!("fill");

// Grabbing the current time is quite costly on web.
#[cfg(not(target_arch = "wasm32"))]
let now = web_time::Instant::now();
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved

let mut query_time_bucket_at_query_time = query_time_bucket_at_query_time.write();
query_time_bucket_at_query_time.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;

#[cfg(not(target_arch = "wasm32"))]
{
let elapsed = now.elapsed();
::re_log::trace!(
store_id=%store.id(),
%entity_path,
archetype=%A::name(),
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
1f64 / elapsed.as_secs_f64()
);
}
}

let data_time_bucket_at_data_time = per_data_time.entry(data_time);
*data_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&query_time_bucket_at_query_time);

iter_results(&query_time_bucket_at_query_time.read())
};


match &query {
// TODO(cmc): cached range support
AnyQuery::Range(query) => {
Expand Down Expand Up @@ -203,7 +245,7 @@ macro_rules! impl_query_archetype {
store.id().clone(),
entity_path.clone(),
query,
|cache| latest_at_callback(query, cache),
|latest_at_cache| latest_at_callback(query, latest_at_cache),
)
},
}
Expand Down
67 changes: 67 additions & 0 deletions crates/re_query_cache/tests/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,73 @@ fn invalidation() {
query_and_compare(&store, &query, &ent_path.into());
}

// Test the following scenario:
// ```py
// rr.set_time(0)
// rr.log("points", rr.Points3D([1, 2, 3]))
//
// # Do first query here: LatestAt(+inf)
// # Expected: points=[[1,2,3]] colors=[]
//
// rr.set_time(1)
// rr.log_components("points", rr.components.Color(0xFF0000))
//
// # Do second query here: LatestAt(+inf)
// # Expected: points=[[1,2,3]] colors=[0xFF0000]
//
// rr.set_time(2)
// rr.log_components("points", rr.components.Color(0x0000FF))
//
// # Do third query here: LatestAt(+inf)
// # Expected: points=[[1,2,3]] colors=[0x0000FF]
// ```
//
// TODO(cmc): this needs proper invalidation to pass
#[should_panic(expected = "assertion failed: `(left == right)`")]
#[test]
fn invalidation_of_future_optionals() {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let ent_path = "points";

let frame1 = [build_frame_nr(1.into())];
let frame2 = [build_frame_nr(2.into())];
let frame3 = [build_frame_nr(3.into())];

let query_time = [build_frame_nr(9999.into())];

let positions = vec![Position2D::new(1.0, 2.0), Position2D::new(3.0, 4.0)];
let row = DataRow::from_cells1_sized(RowId::new(), ent_path, frame1, 2, positions).unwrap();
store.insert_row(&row).unwrap();

let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1);
query_and_compare(&store, &query, &ent_path.into());

let color_instances = vec![InstanceKey::SPLAT];
let colors = vec![Color::from_rgb(255, 0, 0)];
let row =
DataRow::from_cells2_sized(RowId::new(), ent_path, frame2, 1, (color_instances, colors))
.unwrap();
store.insert_row(&row).unwrap();

let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1);
query_and_compare(&store, &query, &ent_path.into());

let color_instances = vec![InstanceKey::SPLAT];
let colors = vec![Color::from_rgb(0, 0, 255)];
let row =
DataRow::from_cells2_sized(RowId::new(), ent_path, frame3, 1, (color_instances, colors))
.unwrap();
store.insert_row(&row).unwrap();

let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1);
query_and_compare(&store, &query, &ent_path.into());
}

// ---

fn query_and_compare(store: &DataStore, query: &LatestAtQuery, ent_path: &EntityPath) {
Expand Down
Loading