Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invalidate hub-wide caches on deletions and overwrites #7525

Merged
merged 8 commits into from
Sep 27, 2024
42 changes: 26 additions & 16 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,31 +321,35 @@ impl EntityDb {
self.entity_path_from_hash.contains_key(&entity_path.hash())
}

pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> {
pub fn add(&mut self, msg: &LogMsg) -> Result<Vec<ChunkStoreEvent>, Error> {
re_tracing::profile_function!();

debug_assert_eq!(msg.store_id(), self.store_id());

match &msg {
LogMsg::SetStoreInfo(msg) => self.set_store_info(msg.clone()),
let store_events = match &msg {
LogMsg::SetStoreInfo(msg) => {
self.set_store_info(msg.clone());
vec![]
}

LogMsg::ArrowMsg(_, arrow_msg) => {
self.last_modified_at = web_time::Instant::now();

let mut chunk = re_chunk::Chunk::from_arrow_msg(arrow_msg)?;
chunk.sort_if_unsorted();
self.add_chunk(&Arc::new(chunk))?;
self.add_chunk(&Arc::new(chunk))?
}

LogMsg::BlueprintActivationCommand(_) => {
// Not for us to handle
vec![]
}
}
};

Ok(())
Ok(store_events)
}

pub fn add_chunk(&mut self, chunk: &Arc<Chunk>) -> Result<(), Error> {
pub fn add_chunk(&mut self, chunk: &Arc<Chunk>) -> Result<Vec<ChunkStoreEvent>, Error> {
let store_events = self.data_store.insert_chunk(chunk)?;

self.register_entity_path(chunk.entity_path());
Expand All @@ -370,7 +374,7 @@ impl EntityDb {
self.stats.on_events(&store_events);
}

Ok(())
Ok(store_events)
}

fn register_entity_path(&mut self, entity_path: &EntityPath) {
Expand All @@ -383,36 +387,42 @@ impl EntityDb {
self.set_store_info = Some(store_info);
}

pub fn gc_everything_but_the_latest_row_on_non_default_timelines(&mut self) {
pub fn gc_everything_but_the_latest_row_on_non_default_timelines(
&mut self,
) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!();

self.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::Everything,
protect_latest: 1, // TODO(jleibs): Bump this after we have an undo buffer
time_budget: DEFAULT_GC_TIME_BUDGET,
});
})
}

/// Free up some RAM by forgetting the older parts of all timelines.
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) {
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!();

assert!((0.0..=1.0).contains(&fraction_to_purge));
if !self.gc(&GarbageCollectionOptions {

let store_events = self.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(fraction_to_purge as _),
protect_latest: 1,
time_budget: DEFAULT_GC_TIME_BUDGET,
}) {
});

if store_events.is_empty() {
// If we weren't able to collect any data, then we need to GC the cache itself in order
// to regain some space.
// See <https://github.com/rerun-io/rerun/issues/7369#issuecomment-2335164098> for the
// complete rationale.
self.query_caches.purge_fraction_of_ram(fraction_to_purge);
}

store_events
}

/// Returns `true` if anything at all was actually GC'd.
pub fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> bool {
fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!();

let (store_events, stats_diff) = self.data_store.gc(gc_options);
Expand All @@ -425,7 +435,7 @@ impl EntityDb {

self.on_store_deletions(&store_events);

!store_events.is_empty()
store_events
}

/// Unconditionally drops all the data for a given [`EntityPath`] .
Expand Down
44 changes: 42 additions & 2 deletions crates/viewer/re_space_view_spatial/src/mesh_cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::sync::Arc;

use ahash::{HashMap, HashSet};

use itertools::Either;
use re_chunk_store::{ChunkStoreEvent, RowId};
use re_entity_db::VersionedInstancePathHash;
use re_log_types::hash::Hash64;
use re_renderer::RenderContext;
use re_types::components::MediaType;
use re_types::{components::MediaType, Loggable as _};
use re_viewer_context::Cache;

use crate::mesh_loader::LoadedMesh;
Expand All @@ -15,6 +19,7 @@ use crate::mesh_loader::LoadedMesh;
/// Note that this is more complex than most other caches,
/// since the cache key is not only used for mesh file blobs,
/// but also for manually logged meshes.
//
// TODO(andreas): Maybe these should be different concerns?
// Blobs need costly unpacking/reading/parsing, regular meshes don't.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
Expand All @@ -26,7 +31,7 @@ pub struct MeshCacheKey {

/// Caches meshes based on their [`MeshCacheKey`].
#[derive(Default)]
pub struct MeshCache(ahash::HashMap<MeshCacheKey, Option<Arc<LoadedMesh>>>);
pub struct MeshCache(HashMap<RowId, HashMap<MeshCacheKey, Option<Arc<LoadedMesh>>>>);

/// Either a [`re_types::archetypes::Asset3D`] or [`re_types::archetypes::Mesh3D`] to be cached.
#[derive(Debug, Clone, Copy)]
Expand All @@ -52,6 +57,8 @@ impl MeshCache {
re_tracing::profile_function!();

self.0
.entry(key.versioned_instance_path_hash.row_id)
.or_default()
.entry(key)
.or_insert_with(|| {
re_log::debug!("Loading CPU mesh {name:?}…");
Expand All @@ -75,6 +82,39 @@ impl Cache for MeshCache {
self.0.clear();
}

fn on_store_events(&mut self, events: &[ChunkStoreEvent]) {
re_tracing::profile_function!();

let row_ids_removed: HashSet<RowId> = events
.iter()
.flat_map(|event| {
let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion;
let contains_mesh_data = || {
let contains_asset_blob = event
.chunk
.components()
.contains_key(&re_types::components::Blob::name());

let contains_vertex_positions = event
.chunk
.components()
.contains_key(&re_types::components::Position3D::name());

contains_asset_blob || contains_vertex_positions
};

if is_deletion() && contains_mesh_data() {
Either::Left(event.chunk.row_ids())
} else {
Either::Right(std::iter::empty())
}
})
.collect();

self.0
.retain(|row_id, _per_key| !row_ids_removed.contains(row_id));
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
Expand Down
29 changes: 22 additions & 7 deletions crates/viewer/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ impl App {
let blueprint_db = store_hub.entity_db_mut(&blueprint_id);
for chunk in updates {
match blueprint_db.add_chunk(&Arc::new(chunk)) {
Ok(()) => {}
Ok(_store_events) => {}
Err(err) => {
re_log::warn_once!("Failed to store blueprint delta: {err}");
}
Expand Down Expand Up @@ -1086,15 +1086,30 @@ impl App {
re_log::warn_once!("Loading a blueprint {store_id} that is active. See https://github.com/rerun-io/rerun/issues/5514 for details.");
}

let entity_db = store_hub.entity_db_mut(store_id);
// TODO(cmc): we have to keep grabbing and releasing entity_db because everything references
// everything and some of it is mutable and some not… it's really not pretty, but it
// does the job for now.

if entity_db.data_source.is_none() {
entity_db.data_source = Some((*channel_source).clone());
{
let entity_db = store_hub.entity_db_mut(store_id);
if entity_db.data_source.is_none() {
entity_db.data_source = Some((*channel_source).clone());
}
}

if let Err(err) = entity_db.add(&msg) {
re_log::error_once!("Failed to add incoming msg: {err}");
};
match store_hub.entity_db_mut(store_id).add(&msg) {
Ok(store_events) => {
if let Some(caches) = store_hub.active_caches() {
caches.on_store_events(&store_events);
}
}

Err(err) => {
re_log::error_once!("Failed to add incoming msg: {err}");
}
}

let entity_db = store_hub.entity_db_mut(store_id);

match &msg {
LogMsg::SetStoreInfo(_) => {
Expand Down
19 changes: 19 additions & 0 deletions crates/viewer/re_viewer_context/src/cache/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::any::{Any, TypeId};

use ahash::HashMap;
use parking_lot::Mutex;
use re_chunk_store::ChunkStoreEvent;

/// Does memoization of different objects for the immediate mode UI.
#[derive(Default)]
Expand All @@ -26,6 +27,17 @@ impl Caches {
}
}

/// React to the chunk store's changelog, if needed.
///
/// Useful to e.g. invalidate unreachable data.
pub fn on_store_events(&self, events: &[ChunkStoreEvent]) {
re_tracing::profile_function!();

for cache in self.0.lock().values_mut() {
cache.on_store_events(events);
}
}

/// Accesses a cache for reading and writing.
///
/// Adds the cache lazily if it wasn't already there.
Expand All @@ -52,6 +64,13 @@ pub trait Cache: std::any::Any + Send + Sync {
/// Attempt to free up memory.
fn purge_memory(&mut self);

/// React to the chunk store's changelog, if needed.
///
/// Useful to e.g. invalidate unreachable data.
fn on_store_events(&mut self, events: &[ChunkStoreEvent]) {
_ = events;
}

// TODO(andreas): Track bytes used for each cache and show in the memory panel!
//fn bytes_used(&self) -> usize;

Expand Down
Loading
Loading