From be872f720f14e9c2497f051baff95fd44426f470 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 5 Jan 2024 16:15:14 +0100 Subject: [PATCH 1/3] implement latest-at cache deduplication --- crates/re_query_cache/src/cache.rs | 50 +++++++----------- crates/re_query_cache/src/lib.rs | 2 +- crates/re_query_cache/src/query.rs | 81 ++++++++++++++++++++---------- 3 files changed, 75 insertions(+), 58 deletions(-) diff --git a/crates/re_query_cache/src/cache.rs b/crates/re_query_cache/src/cache.rs index c36e3e285a75..614520b579db 100644 --- a/crates/re_query_cache/src/cache.rs +++ b/crates/re_query_cache/src/cache.rs @@ -149,10 +149,10 @@ impl CacheKey { /// of data. #[derive(Default)] pub struct CacheBucket { - /// The timestamps and [`RowId`]s of all cached rows. + /// The _data_ timestamps and [`RowId`]s of all cached rows. /// /// Reminder: within a single timestamp, rows are sorted according to their [`RowId`]s. - pub(crate) pov_times: VecDeque<(TimeInt, RowId)>, + pub(crate) pov_data_times: VecDeque<(TimeInt, RowId)>, /// The [`InstanceKey`]s of the point-of-view components. pub(crate) pov_instance_keys: FlatVecDeque, @@ -170,8 +170,8 @@ pub struct CacheBucket { impl CacheBucket { /// Iterate over the timestamps of the point-of-view components. #[inline] - pub fn iter_pov_times(&self) -> impl Iterator { - self.pov_times.iter() + pub fn iter_pov_data_times(&self) -> impl Iterator { + self.pov_data_times.iter() } /// Iterate over the [`InstanceKey`] batches of the point-of-view components. @@ -207,7 +207,7 @@ impl CacheBucket { /// How many timestamps' worth of data is stored in this bucket? #[inline] pub fn num_entries(&self) -> usize { - self.pov_times.len() + self.pov_data_times.len() } #[inline] @@ -236,15 +236,15 @@ macro_rules! impl_insert { re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M)); let Self { - pov_times, + pov_data_times, pov_instance_keys, components: _, } = self; let pov_row_id = arch_view.primary_row_id(); - let index = pov_times.partition_point(|t| t < &(query_time, pov_row_id)); + let index = pov_data_times.partition_point(|t| t < &(query_time, pov_row_id)); - pov_times.insert(index, (query_time, pov_row_id)); + pov_data_times.insert(index, (query_time, pov_row_id)); pov_instance_keys.insert(index, arch_view.iter_instance_keys()); $(self.insert_component::(index, arch_view)?;)+ $(self.insert_component_opt::(index, arch_view)?;)* @@ -333,29 +333,17 @@ impl CacheBucket { // For this reason we move as much of the code as possible into the already existing macros in `query.rs`. /// Caches the results of `LatestAt` queries. -/// -/// The `TimeInt` in the index corresponds to the timestamp of the query, _not_ the timestamp of -/// the resulting data! -// -// TODO(cmc): we need an extra indirection layer so that cached entries can be shared across -// queries with different query timestamps but identical data timestamps. -// This requires keeping track of all `RowId`s in `ArchetypeView`, not just the `RowId` of the -// point-of-view component. #[derive(Default)] -pub struct LatestAtCache(BTreeMap); - -impl std::ops::Deref for LatestAtCache { - type Target = BTreeMap; - - #[inline] - fn deref(&self) -> &Self::Target { - &self.0 - } -} +pub struct LatestAtCache { + /// Organized by _query_ time. + /// + /// If the data you're looking for isn't in here, try partially running the query and check + /// if there is any data available for the resulting _data_ time in [`Self::per_data_time`]. + pub per_query_time: BTreeMap>>, -impl std::ops::DerefMut for LatestAtCache { - #[inline] - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } + /// Organized by _data_ time. + /// + /// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0` + /// can result in a data time of `T`. + pub per_data_time: BTreeMap>>, } diff --git a/crates/re_query_cache/src/lib.rs b/crates/re_query_cache/src/lib.rs index 2c6c69a9787d..57c3855b63ad 100644 --- a/crates/re_query_cache/src/lib.rs +++ b/crates/re_query_cache/src/lib.rs @@ -10,7 +10,7 @@ pub use self::query::{ query_archetype_pov1, query_archetype_with_history_pov1, MaybeCachedComponentData, }; -pub(crate) use self::cache::LatestAtCache; +pub(crate) use self::cache::{CacheBucket, LatestAtCache}; pub use re_query::{QueryError, Result}; // convenience diff --git a/crates/re_query_cache/src/query.rs b/crates/re_query_cache/src/query.rs index f83e2d2e7227..14da25c1c3ff 100644 --- a/crates/re_query_cache/src/query.rs +++ b/crates/re_query_cache/src/query.rs @@ -106,35 +106,12 @@ macro_rules! impl_query_archetype { format!("cached={cached} arch={} pov={} comp={}", A::name(), $N, $M) ); - let mut latest_at_callback = |query: &LatestAtQuery, cache: &mut crate::LatestAtCache| { - re_tracing::profile_scope!("latest_at", format!("{query:?}")); - - let bucket = cache.entry(query.at).or_default(); - // NOTE: Implicitly dropping the write guard here: the LatestAtCache is free once again! - - if bucket.is_empty() { - re_tracing::profile_scope!("fill"); - - let now = web_time::Instant::now(); - // TODO(cmc): cache deduplication. - let arch_view = query_archetype::(store, &query, entity_path)?; - - bucket.[]::(query.at, &arch_view)?; - - let elapsed = now.elapsed(); - ::re_log::trace!( - store_id=%store.id(), - %entity_path, - archetype=%A::name(), - "cached new entry in {elapsed:?} ({:0.3} entries/s)", - 1f64 / elapsed.as_secs_f64() - ); - } + let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> { re_tracing::profile_scope!("iter"); let it = itertools::izip!( - bucket.iter_pov_times(), + bucket.iter_pov_data_times(), bucket.iter_pov_instance_keys(), $(bucket.iter_component::<$pov>() .ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+ @@ -156,6 +133,58 @@ macro_rules! impl_query_archetype { Ok(()) }; + let mut latest_at_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| { + re_tracing::profile_scope!("latest_at", format!("{query:?}")); + + let crate::LatestAtCache { per_query_time, per_data_time } = latest_at_cache; + + // Fastest path: we have an entry for this exact query, no need to look + // any further. + if let Some(query_time_bucket) = per_query_time.get(&query.at) { + return iter_results(&query_time_bucket.read()); + } + + let arch_view = query_archetype::(store, &query, entity_path)?; + // TODO(cmc): actual timeless caching support. + let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN); + + // Fast path: we've run the query and realized that we already have the data for the resulting + // _data_ time, so let's use that. + if let Some(data_time_bucket) = per_data_time.get(&data_time) { + // We now know for a fact that a query at that data time would yield the same + // results: copy the bucket accordingly so that the next cache hit ends up taking the fastest path. + *per_query_time.entry(data_time).or_default() = std::sync::Arc::clone(&data_time_bucket); + return iter_results(&data_time_bucket.read()); + } + + let query_time_bucket = per_query_time.entry(query.at).or_default(); + + // Slowest path: this is a complete cache miss. + { + re_tracing::profile_scope!("fill"); + + let now = web_time::Instant::now(); + + let mut query_time_bucket = query_time_bucket.write(); + query_time_bucket.[]::(query.at, &arch_view)?; + + let elapsed = now.elapsed(); + ::re_log::trace!( + store_id=%store.id(), + %entity_path, + archetype=%A::name(), + "cached new entry in {elapsed:?} ({:0.3} entries/s)", + 1f64 / elapsed.as_secs_f64() + ); + } + + + *per_data_time.entry(data_time).or_default() = std::sync::Arc::clone(&query_time_bucket); + + iter_results(&query_time_bucket.read()) + }; + + match &query { // TODO(cmc): cached range support AnyQuery::Range(query) => { @@ -203,7 +232,7 @@ macro_rules! impl_query_archetype { store.id().clone(), entity_path.clone(), query, - |cache| latest_at_callback(query, cache), + |latest_at_cache| latest_at_callback(query, latest_at_cache), ) }, } From 3252a8cfea0f0e924e205949d5e6d5c9f06a8174 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Mon, 8 Jan 2024 19:22:18 +0100 Subject: [PATCH 2/3] self-review --- crates/re_query_cache/tests/latest_at.rs | 67 ++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/crates/re_query_cache/tests/latest_at.rs b/crates/re_query_cache/tests/latest_at.rs index 3fa32519f6c5..d3b10e4b3890 100644 --- a/crates/re_query_cache/tests/latest_at.rs +++ b/crates/re_query_cache/tests/latest_at.rs @@ -209,6 +209,73 @@ fn invalidation() { query_and_compare(&store, &query, &ent_path.into()); } +// Test the following scenario: +// ```py +// rr.set_time(0) +// rr.log("points", rr.Points3D([1, 2, 3])) +// +// # Do first query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[] +// +// rr.set_time(1) +// rr.log_components("points", rr.components.Color(0xFF0000)) +// +// # Do second query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[0xFF0000] +// +// rr.set_time(2) +// rr.log_components("points", rr.components.Color(0x0000FF)) +// +// # Do third query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[0x0000FF] +// ``` +// +// TODO(cmc): this needs proper invalidation to pass +#[should_panic(expected = "assertion failed: `(left == right)`")] +#[test] +fn invalidation_of_future_optionals() { + let mut store = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + InstanceKey::name(), + Default::default(), + ); + + let ent_path = "points"; + + let frame1 = [build_frame_nr(1.into())]; + let frame2 = [build_frame_nr(2.into())]; + let frame3 = [build_frame_nr(3.into())]; + + let query_time = [build_frame_nr(9999.into())]; + + let positions = vec![Position2D::new(1.0, 2.0), Position2D::new(3.0, 4.0)]; + let row = DataRow::from_cells1_sized(RowId::new(), ent_path, frame1, 2, positions).unwrap(); + store.insert_row(&row).unwrap(); + + let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1); + query_and_compare(&store, &query, &ent_path.into()); + + let color_instances = vec![InstanceKey::SPLAT]; + let colors = vec![Color::from_rgb(255, 0, 0)]; + let row = + DataRow::from_cells2_sized(RowId::new(), ent_path, frame2, 1, (color_instances, colors)) + .unwrap(); + store.insert_row(&row).unwrap(); + + let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1); + query_and_compare(&store, &query, &ent_path.into()); + + let color_instances = vec![InstanceKey::SPLAT]; + let colors = vec![Color::from_rgb(0, 0, 255)]; + let row = + DataRow::from_cells2_sized(RowId::new(), ent_path, frame3, 1, (color_instances, colors)) + .unwrap(); + store.insert_row(&row).unwrap(); + + let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1); + query_and_compare(&store, &query, &ent_path.into()); +} + // --- fn query_and_compare(store: &DataStore, query: &LatestAtQuery, ent_path: &EntityPath) { From 95fdb2edd4494ea82d330e0efc8462b66a763e43 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Mon, 15 Jan 2024 12:01:12 +0100 Subject: [PATCH 3/3] review --- crates/re_query_cache/src/cache.rs | 11 ++++-- crates/re_query_cache/src/query.rs | 63 ++++++++++++++++++------------ 2 files changed, 46 insertions(+), 28 deletions(-) diff --git a/crates/re_query_cache/src/cache.rs b/crates/re_query_cache/src/cache.rs index 614520b579db..cefaf20e362d 100644 --- a/crates/re_query_cache/src/cache.rs +++ b/crates/re_query_cache/src/cache.rs @@ -332,13 +332,18 @@ impl CacheBucket { // which is notoriously painful in Rust (i.e., macros). // For this reason we move as much of the code as possible into the already existing macros in `query.rs`. -/// Caches the results of `LatestAt` queries. +/// Caches the results of `LatestAt` archetype queries (`ArchetypeView`). +/// +/// There is one `LatestAtCache` for each unique [`CacheKey`]. +/// +/// All query steps are cached: index search, cluster key joins and deserialization. #[derive(Default)] pub struct LatestAtCache { /// Organized by _query_ time. /// - /// If the data you're looking for isn't in here, try partially running the query and check - /// if there is any data available for the resulting _data_ time in [`Self::per_data_time`]. + /// If the data you're looking for isn't in here, try partially running the query (i.e. run the + /// index search in order to find a data time, but don't actually deserialize and join the data) + /// and check if there is any data available for the resulting _data_ time in [`Self::per_data_time`]. pub per_query_time: BTreeMap>>, /// Organized by _data_ time. diff --git a/crates/re_query_cache/src/query.rs b/crates/re_query_cache/src/query.rs index 14da25c1c3ff..077e81f381aa 100644 --- a/crates/re_query_cache/src/query.rs +++ b/crates/re_query_cache/src/query.rs @@ -138,50 +138,63 @@ macro_rules! impl_query_archetype { let crate::LatestAtCache { per_query_time, per_data_time } = latest_at_cache; - // Fastest path: we have an entry for this exact query, no need to look - // any further. - if let Some(query_time_bucket) = per_query_time.get(&query.at) { - return iter_results(&query_time_bucket.read()); - } + let query_time_bucket_at_query_time = match per_query_time.entry(query.at) { + std::collections::btree_map::Entry::Occupied(query_time_bucket_at_query_time) => { + // Fastest path: we have an entry for this exact query time, no need to look any + // further. + return iter_results(&query_time_bucket_at_query_time.get().read()); + } + entry @ std::collections::btree_map::Entry::Vacant(_) => entry, + }; let arch_view = query_archetype::(store, &query, entity_path)?; // TODO(cmc): actual timeless caching support. let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN); // Fast path: we've run the query and realized that we already have the data for the resulting - // _data_ time, so let's use that. - if let Some(data_time_bucket) = per_data_time.get(&data_time) { + // _data_ time, so let's use that to avoid join & deserialization costs. + if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) { + *query_time_bucket_at_query_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time); + // We now know for a fact that a query at that data time would yield the same - // results: copy the bucket accordingly so that the next cache hit ends up taking the fastest path. - *per_query_time.entry(data_time).or_default() = std::sync::Arc::clone(&data_time_bucket); - return iter_results(&data_time_bucket.read()); + // results: copy the bucket accordingly so that the next cache hit for that query + // time ends up taking the fastest path. + let query_time_bucket_at_data_time = per_query_time.entry(data_time); + *query_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time); + + return iter_results(&data_time_bucket_at_data_time.read()); } - let query_time_bucket = per_query_time.entry(query.at).or_default(); + let query_time_bucket_at_query_time = query_time_bucket_at_query_time.or_default(); // Slowest path: this is a complete cache miss. { re_tracing::profile_scope!("fill"); + // Grabbing the current time is quite costly on web. + #[cfg(not(target_arch = "wasm32"))] let now = web_time::Instant::now(); - let mut query_time_bucket = query_time_bucket.write(); - query_time_bucket.[]::(query.at, &arch_view)?; - - let elapsed = now.elapsed(); - ::re_log::trace!( - store_id=%store.id(), - %entity_path, - archetype=%A::name(), - "cached new entry in {elapsed:?} ({:0.3} entries/s)", - 1f64 / elapsed.as_secs_f64() - ); + let mut query_time_bucket_at_query_time = query_time_bucket_at_query_time.write(); + query_time_bucket_at_query_time.[]::(query.at, &arch_view)?; + + #[cfg(not(target_arch = "wasm32"))] + { + let elapsed = now.elapsed(); + ::re_log::trace!( + store_id=%store.id(), + %entity_path, + archetype=%A::name(), + "cached new entry in {elapsed:?} ({:0.3} entries/s)", + 1f64 / elapsed.as_secs_f64() + ); + } } + let data_time_bucket_at_data_time = per_data_time.entry(data_time); + *data_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&query_time_bucket_at_query_time); - *per_data_time.entry(data_time).or_default() = std::sync::Arc::clone(&query_time_bucket); - - iter_results(&query_time_bucket.read()) + iter_results(&query_time_bucket_at_query_time.read()) };