Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary caching 12: bare-bone range support #4784

Merged
merged 11 commits into from
Jan 15, 2024
6 changes: 3 additions & 3 deletions crates/re_data_store/benches/arrow2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use std::sync::Arc;

use arrow2::array::{Array, PrimitiveArray, StructArray, UnionArray};
use arrow2::array::{Array, FixedSizeListArray, PrimitiveArray, StructArray};
use criterion::Criterion;
use itertools::Itertools;

Expand Down Expand Up @@ -277,8 +277,8 @@ fn estimated_size_bytes(c: &mut Criterion) {
ArrayKind::Primitive => {
bench_downcast_first::<PrimitiveArray<u64>>(&mut group, kind);
}
ArrayKind::Struct => bench_downcast_first::<StructArray>(&mut group, kind),
ArrayKind::StructLarge => bench_downcast_first::<UnionArray>(&mut group, kind),
ArrayKind::Struct => bench_downcast_first::<FixedSizeListArray>(&mut group, kind),
ArrayKind::StructLarge => bench_downcast_first::<StructArray>(&mut group, kind),
}

fn bench_downcast_first<T: arrow2::array::Array + Clone>(
Expand Down
25 changes: 22 additions & 3 deletions crates/re_log_types/src/example_components.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
//! Example components to be used for tests and docs
use re_types_core::{Loggable, SizeBytes};
use re_types_core::{components::InstanceKey, Loggable, SizeBytes};

// ----------------------------------------------------------------------------

#[derive(Debug)]
pub struct MyPoints;

impl MyPoints {
pub const NUM_COMPONENTS: usize = 5;
}

impl re_types_core::Archetype for MyPoints {
type Indicator = re_types_core::GenericIndicatorComponent<Self>;

fn name() -> re_types_core::ArchetypeName {
"test.MyPoints".into()
"example.MyPoints".into()
}

fn required_components() -> ::std::borrow::Cow<'static, [re_types_core::ComponentName]> {
vec![MyPoint::name()].into()
}

fn recommended_components() -> std::borrow::Cow<'static, [re_types_core::ComponentName]> {
vec![MyColor::name(), MyLabel::name()].into()
vec![
re_types_core::LoggableBatch::name(&Self::Indicator::default()),
InstanceKey::name(),
MyColor::name(),
MyLabel::name(),
]
.into()
}
}

Expand All @@ -32,6 +42,7 @@ pub struct MyPoint {
}

impl MyPoint {
#[inline]
pub fn new(x: f32, y: f32) -> Self {
Self { x, y }
}
Expand Down Expand Up @@ -121,7 +132,15 @@ impl Loggable for MyPoint {
#[repr(transparent)]
pub struct MyColor(pub u32);

impl MyColor {
#[inline]
pub fn from_rgb(r: u8, g: u8, b: u8) -> Self {
Self(u32::from_le_bytes([r, g, b, 255]))
}
}

impl From<u32> for MyColor {
#[inline]
fn from(value: u32) -> Self {
Self(value)
}
Expand Down
103 changes: 63 additions & 40 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use re_types_core::{
components::InstanceKey, Archetype, ArchetypeName, Component, ComponentName, SizeBytes as _,
};

use crate::{ErasedFlatVecDeque, FlatVecDeque};
use crate::{ErasedFlatVecDeque, FlatVecDeque, LatestAtCache, RangeCache};

// ---

Expand Down Expand Up @@ -67,6 +67,17 @@ pub struct CachesPerArchetype {
// than an `ArchetypeName`: the query system doesn't care about archetypes.
pub(crate) latest_at_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<LatestAtCache>>>>,

/// Which [`Archetype`] are we querying for?
///
/// This is very important because of our data model: we not only query for components, but we
/// query for components from a specific point-of-view (the so-called primary component).
/// Different archetypes have different point-of-views, and therefore can end up with different
/// results, even from the same raw data.
//
// TODO(cmc): At some point we should probably just store the PoV and optional components rather
// than an `ArchetypeName`: the query system doesn't care about archetypes.
pub(crate) range_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<RangeCache>>>>,

/// Everything greater than or equal to this timestamp has been asynchronously invalidated.
///
/// The next time this cache gets queried, it must remove any entry matching this criteria.
Expand Down Expand Up @@ -134,6 +145,42 @@ impl Caches {
f(&mut cache)
}

/// Gives write access to the appropriate `RangeCache` according to the specified
/// query parameters.
#[inline]
pub fn with_range<A, F, R>(
store_id: StoreId,
entity_path: EntityPath,
query: &RangeQuery,
mut f: F,
) -> R
where
A: Archetype,
F: FnMut(&mut RangeCache) -> R,
{
let key = CacheKey::new(store_id, entity_path, query.timeline);

let cache =
re_data_store::DataStore::with_subscriber_once(*CACHES, move |caches: &Caches| {
let mut caches = caches.0.write();

let caches_per_archetype = caches.entry(key.clone()).or_default();
caches_per_archetype.handle_pending_invalidation(&key);

let mut range_per_archetype = caches_per_archetype.range_per_archetype.write();
let range_cache = range_per_archetype.entry(A::name()).or_default();

Arc::clone(range_cache)

// Implicitly releasing all intermediary locks.
})
// NOTE: downcasting cannot fail, this is our own private handle.
.unwrap();

let mut cache = cache.write();
f(&mut cache)
}

#[inline]
pub(crate) fn with<F: FnMut(&Caches) -> R, R>(f: F) -> R {
// NOTE: downcasting cannot fail, this is our own private handle.
Expand Down Expand Up @@ -347,6 +394,9 @@ pub struct CacheBucket {
///
/// This corresponds to the data time and `RowId` returned by `re_query::query_archetype`.
///
/// This is guaranteed to always be sorted and dense (i.e. there cannot be a hole in the cached
/// data, unless the raw data itself in the store has a hole at that particular point in time).
///
/// Reminder: within a single timestamp, rows are sorted according to their [`RowId`]s.
pub(crate) data_times: VecDeque<(TimeInt, RowId)>,

Expand Down Expand Up @@ -375,6 +425,18 @@ impl CacheBucket {
self.data_times.iter()
}

#[inline]
pub fn contains_data_time(&self, data_time: TimeInt) -> bool {
let first_time = self.data_times.front().map_or(&TimeInt::MAX, |(t, _)| t);
let last_time = self.data_times.back().map_or(&TimeInt::MIN, |(t, _)| t);
*first_time <= data_time && data_time <= *last_time
}

#[inline]
pub fn contains_data_row(&self, data_time: TimeInt, row_id: RowId) -> bool {
self.data_times.binary_search(&(data_time, row_id)).is_ok()
}

/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
#[inline]
pub fn iter_pov_instance_keys(&self) -> impl Iterator<Item = &[InstanceKey]> {
Expand Down Expand Up @@ -554,42 +616,3 @@ impl CacheBucket {
Ok(added_size_bytes)
}
}

// ---

// NOTE: Because we're working with deserialized data, everything has to be done with metaprogramming,
// which is notoriously painful in Rust (i.e., macros).
// For this reason we move as much of the code as possible into the already existing macros in `query.rs`.

/// Caches the results of `LatestAt` archetype queries (`ArchetypeView`).
///
/// There is one `LatestAtCache` for each unique [`CacheKey`].
///
/// All query steps are cached: index search, cluster key joins and deserialization.
#[derive(Default)]
pub struct LatestAtCache {
/// Organized by _query_ time.
///
/// If the data you're looking for isn't in here, try partially running the query (i.e. run the
/// index search in order to find a data time, but don't actually deserialize and join the data)
/// and check if there is any data available for the resulting _data_ time in [`Self::per_data_time`].
pub per_query_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,

/// Organized by _data_ time.
///
/// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0`
/// can result in a data time of `T`.
pub per_data_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,

/// Dedicated bucket for timeless data, if any.
///
/// Query time and data time are one and the same in the timeless case, therefore we only need
/// this one bucket.
//
// NOTE: Lives separately so we don't pay the extra `Option` cost in the much more common
// timeful case.
pub timeless: Option<CacheBucket>,

/// Total size of the data stored in this cache in bytes.
pub total_size_bytes: u64,
}
Loading
Loading