Skip to content

Commit

Permalink
Primary caching 12: barebone range support (#4784)
Browse files Browse the repository at this point in the history
**Prefer on a per-commit basis, stuff has moved around**

Range queries are back!... in the most primitive form possible.

No invalidation, no bucketing, no optimization, no nothing. Just putting
everything in place.


https://github.com/rerun-io/rerun/assets/2910679/a65281e4-9843-4598-9547-ce7e45197995

---

Part of the primary caching series of PR (index search, joins,
deserialization):
- #4592
- #4593
- #4659
- #4680 
- #4681
- #4698
- #4711
- #4712
- #4721 
- #4726 
- #4773
- #4784
- #4785
- #4793
- #4800
  • Loading branch information
teh-cmc authored Jan 15, 2024
1 parent cd1b6c2 commit 886014a
Show file tree
Hide file tree
Showing 16 changed files with 977 additions and 272 deletions.
6 changes: 3 additions & 3 deletions crates/re_data_store/benches/arrow2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use std::sync::Arc;

use arrow2::array::{Array, PrimitiveArray, StructArray, UnionArray};
use arrow2::array::{Array, FixedSizeListArray, PrimitiveArray, StructArray};
use criterion::Criterion;
use itertools::Itertools;

Expand Down Expand Up @@ -277,8 +277,8 @@ fn estimated_size_bytes(c: &mut Criterion) {
ArrayKind::Primitive => {
bench_downcast_first::<PrimitiveArray<u64>>(&mut group, kind);
}
ArrayKind::Struct => bench_downcast_first::<StructArray>(&mut group, kind),
ArrayKind::StructLarge => bench_downcast_first::<UnionArray>(&mut group, kind),
ArrayKind::Struct => bench_downcast_first::<FixedSizeListArray>(&mut group, kind),
ArrayKind::StructLarge => bench_downcast_first::<StructArray>(&mut group, kind),
}

fn bench_downcast_first<T: arrow2::array::Array + Clone>(
Expand Down
25 changes: 22 additions & 3 deletions crates/re_log_types/src/example_components.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
//! Example components to be used for tests and docs
use re_types_core::{Loggable, SizeBytes};
use re_types_core::{components::InstanceKey, Loggable, SizeBytes};

// ----------------------------------------------------------------------------

#[derive(Debug)]
pub struct MyPoints;

impl MyPoints {
pub const NUM_COMPONENTS: usize = 5;
}

impl re_types_core::Archetype for MyPoints {
type Indicator = re_types_core::GenericIndicatorComponent<Self>;

fn name() -> re_types_core::ArchetypeName {
"test.MyPoints".into()
"example.MyPoints".into()
}

fn required_components() -> ::std::borrow::Cow<'static, [re_types_core::ComponentName]> {
vec![MyPoint::name()].into()
}

fn recommended_components() -> std::borrow::Cow<'static, [re_types_core::ComponentName]> {
vec![MyColor::name(), MyLabel::name()].into()
vec![
re_types_core::LoggableBatch::name(&Self::Indicator::default()),
InstanceKey::name(),
MyColor::name(),
MyLabel::name(),
]
.into()
}
}

Expand All @@ -32,6 +42,7 @@ pub struct MyPoint {
}

impl MyPoint {
#[inline]
pub fn new(x: f32, y: f32) -> Self {
Self { x, y }
}
Expand Down Expand Up @@ -121,7 +132,15 @@ impl Loggable for MyPoint {
#[repr(transparent)]
pub struct MyColor(pub u32);

impl MyColor {
#[inline]
pub fn from_rgb(r: u8, g: u8, b: u8) -> Self {
Self(u32::from_le_bytes([r, g, b, 255]))
}
}

impl From<u32> for MyColor {
#[inline]
fn from(value: u32) -> Self {
Self(value)
}
Expand Down
103 changes: 63 additions & 40 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use re_types_core::{
components::InstanceKey, Archetype, ArchetypeName, Component, ComponentName, SizeBytes as _,
};

use crate::{ErasedFlatVecDeque, FlatVecDeque};
use crate::{ErasedFlatVecDeque, FlatVecDeque, LatestAtCache, RangeCache};

// ---

Expand Down Expand Up @@ -67,6 +67,17 @@ pub struct CachesPerArchetype {
// than an `ArchetypeName`: the query system doesn't care about archetypes.
pub(crate) latest_at_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<LatestAtCache>>>>,

/// Which [`Archetype`] are we querying for?
///
/// This is very important because of our data model: we not only query for components, but we
/// query for components from a specific point-of-view (the so-called primary component).
/// Different archetypes have different point-of-views, and therefore can end up with different
/// results, even from the same raw data.
//
// TODO(cmc): At some point we should probably just store the PoV and optional components rather
// than an `ArchetypeName`: the query system doesn't care about archetypes.
pub(crate) range_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<RangeCache>>>>,

/// Everything greater than or equal to this timestamp has been asynchronously invalidated.
///
/// The next time this cache gets queried, it must remove any entry matching this criteria.
Expand Down Expand Up @@ -134,6 +145,42 @@ impl Caches {
f(&mut cache)
}

/// Gives write access to the appropriate `RangeCache` according to the specified
/// query parameters.
#[inline]
pub fn with_range<A, F, R>(
store_id: StoreId,
entity_path: EntityPath,
query: &RangeQuery,
mut f: F,
) -> R
where
A: Archetype,
F: FnMut(&mut RangeCache) -> R,
{
let key = CacheKey::new(store_id, entity_path, query.timeline);

let cache =
re_data_store::DataStore::with_subscriber_once(*CACHES, move |caches: &Caches| {
let mut caches = caches.0.write();

let caches_per_archetype = caches.entry(key.clone()).or_default();
caches_per_archetype.handle_pending_invalidation(&key);

let mut range_per_archetype = caches_per_archetype.range_per_archetype.write();
let range_cache = range_per_archetype.entry(A::name()).or_default();

Arc::clone(range_cache)

// Implicitly releasing all intermediary locks.
})
// NOTE: downcasting cannot fail, this is our own private handle.
.unwrap();

let mut cache = cache.write();
f(&mut cache)
}

#[inline]
pub(crate) fn with<F: FnMut(&Caches) -> R, R>(f: F) -> R {
// NOTE: downcasting cannot fail, this is our own private handle.
Expand Down Expand Up @@ -347,6 +394,9 @@ pub struct CacheBucket {
///
/// This corresponds to the data time and `RowId` returned by `re_query::query_archetype`.
///
/// This is guaranteed to always be sorted and dense (i.e. there cannot be a hole in the cached
/// data, unless the raw data itself in the store has a hole at that particular point in time).
///
/// Reminder: within a single timestamp, rows are sorted according to their [`RowId`]s.
pub(crate) data_times: VecDeque<(TimeInt, RowId)>,

Expand Down Expand Up @@ -375,6 +425,18 @@ impl CacheBucket {
self.data_times.iter()
}

#[inline]
pub fn contains_data_time(&self, data_time: TimeInt) -> bool {
let first_time = self.data_times.front().map_or(&TimeInt::MAX, |(t, _)| t);
let last_time = self.data_times.back().map_or(&TimeInt::MIN, |(t, _)| t);
*first_time <= data_time && data_time <= *last_time
}

#[inline]
pub fn contains_data_row(&self, data_time: TimeInt, row_id: RowId) -> bool {
self.data_times.binary_search(&(data_time, row_id)).is_ok()
}

/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
#[inline]
pub fn iter_pov_instance_keys(&self) -> impl Iterator<Item = &[InstanceKey]> {
Expand Down Expand Up @@ -554,42 +616,3 @@ impl CacheBucket {
Ok(added_size_bytes)
}
}

// ---

// NOTE: Because we're working with deserialized data, everything has to be done with metaprogramming,
// which is notoriously painful in Rust (i.e., macros).
// For this reason we move as much of the code as possible into the already existing macros in `query.rs`.

/// Caches the results of `LatestAt` archetype queries (`ArchetypeView`).
///
/// There is one `LatestAtCache` for each unique [`CacheKey`].
///
/// All query steps are cached: index search, cluster key joins and deserialization.
#[derive(Default)]
pub struct LatestAtCache {
/// Organized by _query_ time.
///
/// If the data you're looking for isn't in here, try partially running the query (i.e. run the
/// index search in order to find a data time, but don't actually deserialize and join the data)
/// and check if there is any data available for the resulting _data_ time in [`Self::per_data_time`].
pub per_query_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,

/// Organized by _data_ time.
///
/// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0`
/// can result in a data time of `T`.
pub per_data_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,

/// Dedicated bucket for timeless data, if any.
///
/// Query time and data time are one and the same in the timeless case, therefore we only need
/// this one bucket.
//
// NOTE: Lives separately so we don't pay the extra `Option` cost in the much more common
// timeful case.
pub timeless: Option<CacheBucket>,

/// Total size of the data stored in this cache in bytes.
pub total_size_bytes: u64,
}
Loading

0 comments on commit 886014a

Please sign in to comment.