From 72a90cf8b7f70fea5a54a21522df39dc51a900e4 Mon Sep 17 00:00:00 2001 From: Clement Rey <cr.rey.clement@gmail.com> Date: Thu, 26 Sep 2024 18:11:25 +0200 Subject: [PATCH 1/8] forward store events to hub-wide caches --- crates/store/re_entity_db/src/entity_db.rs | 42 ++++++++++++------- crates/viewer/re_viewer/src/app.rs | 29 +++++++++---- .../re_viewer_context/src/cache/caches.rs | 19 +++++++++ .../re_viewer_context/src/image_info.rs | 2 +- .../viewer/re_viewer_context/src/store_hub.rs | 12 +++++- 5 files changed, 78 insertions(+), 26 deletions(-) diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index 16444a228eec..76e3636b7c99 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -321,31 +321,35 @@ impl EntityDb { self.entity_path_from_hash.contains_key(&entity_path.hash()) } - pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> { + pub fn add(&mut self, msg: &LogMsg) -> Result<Vec<ChunkStoreEvent>, Error> { re_tracing::profile_function!(); debug_assert_eq!(msg.store_id(), self.store_id()); - match &msg { - LogMsg::SetStoreInfo(msg) => self.set_store_info(msg.clone()), + let store_events = match &msg { + LogMsg::SetStoreInfo(msg) => { + self.set_store_info(msg.clone()); + vec![] + } LogMsg::ArrowMsg(_, arrow_msg) => { self.last_modified_at = web_time::Instant::now(); let mut chunk = re_chunk::Chunk::from_arrow_msg(arrow_msg)?; chunk.sort_if_unsorted(); - self.add_chunk(&Arc::new(chunk))?; + self.add_chunk(&Arc::new(chunk))? } LogMsg::BlueprintActivationCommand(_) => { // Not for us to handle + vec![] } - } + }; - Ok(()) + Ok(store_events) } - pub fn add_chunk(&mut self, chunk: &Arc<Chunk>) -> Result<(), Error> { + pub fn add_chunk(&mut self, chunk: &Arc<Chunk>) -> Result<Vec<ChunkStoreEvent>, Error> { let store_events = self.data_store.insert_chunk(chunk)?; self.register_entity_path(chunk.entity_path()); @@ -370,7 +374,7 @@ impl EntityDb { self.stats.on_events(&store_events); } - Ok(()) + Ok(store_events) } fn register_entity_path(&mut self, entity_path: &EntityPath) { @@ -383,36 +387,42 @@ impl EntityDb { self.set_store_info = Some(store_info); } - pub fn gc_everything_but_the_latest_row_on_non_default_timelines(&mut self) { + pub fn gc_everything_but_the_latest_row_on_non_default_timelines( + &mut self, + ) -> Vec<ChunkStoreEvent> { re_tracing::profile_function!(); self.gc(&GarbageCollectionOptions { target: GarbageCollectionTarget::Everything, protect_latest: 1, // TODO(jleibs): Bump this after we have an undo buffer time_budget: DEFAULT_GC_TIME_BUDGET, - }); + }) } /// Free up some RAM by forgetting the older parts of all timelines. - pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) { + pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) -> Vec<ChunkStoreEvent> { re_tracing::profile_function!(); assert!((0.0..=1.0).contains(&fraction_to_purge)); - if !self.gc(&GarbageCollectionOptions { + + let store_events = self.gc(&GarbageCollectionOptions { target: GarbageCollectionTarget::DropAtLeastFraction(fraction_to_purge as _), protect_latest: 1, time_budget: DEFAULT_GC_TIME_BUDGET, - }) { + }); + + if store_events.is_empty() { // If we weren't able to collect any data, then we need to GC the cache itself in order // to regain some space. // See <https://github.com/rerun-io/rerun/issues/7369#issuecomment-2335164098> for the // complete rationale. self.query_caches.purge_fraction_of_ram(fraction_to_purge); } + + store_events } - /// Returns `true` if anything at all was actually GC'd. - pub fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> bool { + fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> Vec<ChunkStoreEvent> { re_tracing::profile_function!(); let (store_events, stats_diff) = self.data_store.gc(gc_options); @@ -425,7 +435,7 @@ impl EntityDb { self.on_store_deletions(&store_events); - !store_events.is_empty() + store_events } /// Unconditionally drops all the data for a given [`EntityPath`] . diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index ac4c5e6e5044..3d5468fdb944 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -509,7 +509,7 @@ impl App { let blueprint_db = store_hub.entity_db_mut(&blueprint_id); for chunk in updates { match blueprint_db.add_chunk(&Arc::new(chunk)) { - Ok(()) => {} + Ok(_store_events) => {} Err(err) => { re_log::warn_once!("Failed to store blueprint delta: {err}"); } @@ -1086,15 +1086,30 @@ impl App { re_log::warn_once!("Loading a blueprint {store_id} that is active. See https://github.com/rerun-io/rerun/issues/5514 for details."); } - let entity_db = store_hub.entity_db_mut(store_id); + // TODO(cmc): we have to keep grabbing and releasing entity_db because everything references + // everything and some of it is mutable and some not… it's really not pretty, but it + // does the job for now. - if entity_db.data_source.is_none() { - entity_db.data_source = Some((*channel_source).clone()); + { + let entity_db = store_hub.entity_db_mut(store_id); + if entity_db.data_source.is_none() { + entity_db.data_source = Some((*channel_source).clone()); + } } - if let Err(err) = entity_db.add(&msg) { - re_log::error_once!("Failed to add incoming msg: {err}"); - }; + match store_hub.entity_db_mut(store_id).add(&msg) { + Ok(store_events) => { + if let Some(caches) = store_hub.active_caches() { + caches.on_store_events(&store_events); + } + } + + Err(err) => { + re_log::error_once!("Failed to add incoming msg: {err}"); + } + } + + let entity_db = store_hub.entity_db_mut(store_id); match &msg { LogMsg::SetStoreInfo(_) => { diff --git a/crates/viewer/re_viewer_context/src/cache/caches.rs b/crates/viewer/re_viewer_context/src/cache/caches.rs index 2858df38547c..fcd45c9e0530 100644 --- a/crates/viewer/re_viewer_context/src/cache/caches.rs +++ b/crates/viewer/re_viewer_context/src/cache/caches.rs @@ -2,6 +2,7 @@ use std::any::{Any, TypeId}; use ahash::HashMap; use parking_lot::Mutex; +use re_chunk_store::ChunkStoreEvent; /// Does memoization of different objects for the immediate mode UI. #[derive(Default)] @@ -26,6 +27,17 @@ impl Caches { } } + /// React to the chunk store's changelog, if needed. + /// + /// Useful to e.g. invalidate unreachable data. + pub fn on_store_events(&self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + for cache in self.0.lock().values_mut() { + cache.on_store_events(events); + } + } + /// Accesses a cache for reading and writing. /// /// Adds the cache lazily if it wasn't already there. @@ -52,6 +64,13 @@ pub trait Cache: std::any::Any + Send + Sync { /// Attempt to free up memory. fn purge_memory(&mut self); + /// React to the chunk store's changelog, if needed. + /// + /// Useful to e.g. invalidate unreachable data. + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + _ = events; + } + // TODO(andreas): Track bytes used for each cache and show in the memory panel! //fn bytes_used(&self) -> usize; diff --git a/crates/viewer/re_viewer_context/src/image_info.rs b/crates/viewer/re_viewer_context/src/image_info.rs index 8359934a9100..b144cf50473e 100644 --- a/crates/viewer/re_viewer_context/src/image_info.rs +++ b/crates/viewer/re_viewer_context/src/image_info.rs @@ -13,7 +13,7 @@ use re_types::{ /// It has enough information to render the image on the screen. #[derive(Clone)] pub struct ImageInfo { - /// The row id that contaoned the blob. + /// The row id that contained the blob. /// /// Can be used instead of hashing [`Self::buffer`]. pub buffer_row_id: RowId, diff --git a/crates/viewer/re_viewer_context/src/store_hub.rs b/crates/viewer/re_viewer_context/src/store_hub.rs index 25ed6d38ac20..00079d04f5df 100644 --- a/crates/viewer/re_viewer_context/src/store_hub.rs +++ b/crates/viewer/re_viewer_context/src/store_hub.rs @@ -617,9 +617,13 @@ impl StoreHub { }; let store_size_before = entity_db.store().stats().total().total_size_bytes; - entity_db.purge_fraction_of_ram(fraction_to_purge); + let store_events = entity_db.purge_fraction_of_ram(fraction_to_purge); let store_size_after = entity_db.store().stats().total().total_size_bytes; + if let Some(caches) = self.caches_per_recording.get_mut(&store_id) { + caches.on_store_events(&store_events); + } + // No point keeping an empty recording around. if entity_db.is_empty() { self.remove(&store_id); @@ -684,7 +688,11 @@ impl StoreHub { // TODO(jleibs): Decide a better tuning for this. Would like to save a // reasonable amount of history, or incremental snapshots. - blueprint.gc_everything_but_the_latest_row_on_non_default_timelines(); + let store_events = + blueprint.gc_everything_but_the_latest_row_on_non_default_timelines(); + if let Some(caches) = self.caches_per_recording.get_mut(blueprint_id) { + caches.on_store_events(&store_events); + } self.blueprint_last_gc .insert(blueprint_id.clone(), blueprint.generation()); From 1b79ec1e2a8d8ec7f4468c3f094b32ce2c0f09fc Mon Sep 17 00:00:00 2001 From: Clement Rey <cr.rey.clement@gmail.com> Date: Thu, 26 Sep 2024 18:12:10 +0200 Subject: [PATCH 2/8] invalidate image caches on deletions and overwrites --- .../src/cache/image_decode_cache.rs | 84 ++++++++++++++----- .../src/cache/image_stats_cache.rs | 42 +++++++++- 2 files changed, 100 insertions(+), 26 deletions(-) diff --git a/crates/viewer/re_viewer_context/src/cache/image_decode_cache.rs b/crates/viewer/re_viewer_context/src/cache/image_decode_cache.rs index 0698923ec0f8..efbcc865548f 100644 --- a/crates/viewer/re_viewer_context/src/cache/image_decode_cache.rs +++ b/crates/viewer/re_viewer_context/src/cache/image_decode_cache.rs @@ -1,8 +1,13 @@ +use ahash::{HashMap, HashSet}; + +use itertools::Either; use re_chunk::RowId; +use re_chunk_store::ChunkStoreEvent; use re_log_types::hash::Hash64; use re_types::{ archetypes::Image, image::{ImageKind, ImageLoadError}, + Loggable as _, }; use crate::{Cache, ImageInfo}; @@ -21,7 +26,7 @@ struct DecodedImageResult { /// Caches the results of decoding [`re_types::archetypes::EncodedImage`]. #[derive(Default)] pub struct ImageDecodeCache { - cache: ahash::HashMap<Hash64, DecodedImageResult>, + cache: HashMap<RowId, HashMap<Hash64, DecodedImageResult>>, memory_used: u64, generation: u64, } @@ -35,31 +40,36 @@ impl ImageDecodeCache { /// so we don't need the instance id here. pub fn entry( &mut self, - row_id: RowId, + blob_row_id: RowId, image_bytes: &[u8], media_type: Option<&str>, ) -> Result<ImageInfo, ImageLoadError> { re_tracing::profile_function!(); - let key = Hash64::hash((row_id, media_type)); - - let lookup = self.cache.entry(key).or_insert_with(|| { - let result = decode_image(row_id, image_bytes, media_type); - let memory_used = result.as_ref().map_or(0, |image| image.buffer.len() as u64); - self.memory_used += memory_used; - DecodedImageResult { - result, - memory_used, - last_use_generation: 0, - } - }); + let inner_key = Hash64::hash(media_type); + + let lookup = self + .cache + .entry(blob_row_id) + .or_default() + .entry(inner_key) + .or_insert_with(|| { + let result = decode_image(blob_row_id, image_bytes, media_type); + let memory_used = result.as_ref().map_or(0, |image| image.buffer.len() as u64); + self.memory_used += memory_used; + DecodedImageResult { + result, + memory_used, + last_use_generation: 0, + } + }); lookup.last_use_generation = self.generation; lookup.result.clone() } } fn decode_image( - row_id: RowId, + blob_row_id: RowId, image_bytes: &[u8], media_type: Option<&str>, ) -> Result<ImageInfo, ImageLoadError> { @@ -89,7 +99,7 @@ fn decode_image( let Image { buffer, format, .. } = image_arch; Ok(ImageInfo { - buffer_row_id: row_id, + buffer_row_id: blob_row_id, buffer: buffer.0, format: format.0, kind: ImageKind::Color, @@ -122,12 +132,16 @@ impl Cache for ImageDecodeCache { let before = self.memory_used; - self.cache.retain(|_, ci| { - let retain = ci.last_use_generation == self.generation; - if !retain { - self.memory_used -= ci.memory_used; - } - retain + self.cache.retain(|_row_id, per_key| { + per_key.retain(|_, ci| { + let retain = ci.last_use_generation == self.generation; + if !retain { + self.memory_used -= ci.memory_used; + } + retain + }); + + !per_key.is_empty() }); re_log::trace!( @@ -137,6 +151,32 @@ impl Cache for ImageDecodeCache { ); } + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + let row_ids_removed: HashSet<RowId> = events + .iter() + .flat_map(|event| { + let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion; + let contains_image_blob = || { + event + .chunk + .components() + .contains_key(&re_types::components::Blob::name()) + }; + + if is_deletion() && contains_image_blob() { + Either::Left(event.chunk.row_ids()) + } else { + Either::Right(std::iter::empty()) + } + }) + .collect(); + + self.cache + .retain(|row_id, _per_key| !row_ids_removed.contains(row_id)); + } + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self } diff --git a/crates/viewer/re_viewer_context/src/cache/image_stats_cache.rs b/crates/viewer/re_viewer_context/src/cache/image_stats_cache.rs index cbd882cd9fad..0ff2ce3ce927 100644 --- a/crates/viewer/re_viewer_context/src/cache/image_stats_cache.rs +++ b/crates/viewer/re_viewer_context/src/cache/image_stats_cache.rs @@ -1,17 +1,25 @@ -use egui::util::hash; +use ahash::{HashMap, HashSet}; +use itertools::Either; + +use re_chunk::RowId; +use re_chunk_store::ChunkStoreEvent; +use re_log_types::hash::Hash64; +use re_types::Loggable as _; use crate::{Cache, ImageInfo, ImageStats}; // Caches image stats using a [`RowId`] #[derive(Default)] -pub struct ImageStatsCache(ahash::HashMap<u64, ImageStats>); +pub struct ImageStatsCache(HashMap<RowId, HashMap<Hash64, ImageStats>>); impl ImageStatsCache { pub fn entry(&mut self, image: &ImageInfo) -> ImageStats { - let key = hash((image.buffer_row_id, image.format)); + let inner_key = Hash64::hash(image.format); *self .0 - .entry(key) + .entry(image.buffer_row_id) + .or_default() + .entry(inner_key) .or_insert_with(|| ImageStats::from_image(image)) } } @@ -21,6 +29,32 @@ impl Cache for ImageStatsCache { // Purging the image stats is not worth it - these are very small objects! } + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + let row_ids_removed: HashSet<RowId> = events + .iter() + .flat_map(|event| { + let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion; + let contains_image_blob = || { + event + .chunk + .components() + .contains_key(&re_types::components::Blob::name()) + }; + + if is_deletion() && contains_image_blob() { + Either::Left(event.chunk.row_ids()) + } else { + Either::Right(std::iter::empty()) + } + }) + .collect(); + + self.0 + .retain(|row_id, _per_key| !row_ids_removed.contains(row_id)); + } + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self } From 27e4d4b2f036debb7726f0307357bebe50831594 Mon Sep 17 00:00:00 2001 From: Clement Rey <cr.rey.clement@gmail.com> Date: Thu, 26 Sep 2024 18:12:24 +0200 Subject: [PATCH 3/8] invalidate mesh cache on deletions and overwrites --- .../re_space_view_spatial/src/mesh_cache.rs | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/crates/viewer/re_space_view_spatial/src/mesh_cache.rs b/crates/viewer/re_space_view_spatial/src/mesh_cache.rs index 50532fe48f60..3aeb3d10d957 100644 --- a/crates/viewer/re_space_view_spatial/src/mesh_cache.rs +++ b/crates/viewer/re_space_view_spatial/src/mesh_cache.rs @@ -1,9 +1,13 @@ use std::sync::Arc; +use ahash::{HashMap, HashSet}; + +use itertools::Either; +use re_chunk_store::{ChunkStoreEvent, RowId}; use re_entity_db::VersionedInstancePathHash; use re_log_types::hash::Hash64; use re_renderer::RenderContext; -use re_types::components::MediaType; +use re_types::{components::MediaType, Loggable as _}; use re_viewer_context::Cache; use crate::mesh_loader::LoadedMesh; @@ -15,6 +19,7 @@ use crate::mesh_loader::LoadedMesh; /// Note that this is more complex than most other caches, /// since the cache key is not only used for mesh file blobs, /// but also for manually logged meshes. +// // TODO(andreas): Maybe these should be different concerns? // Blobs need costly unpacking/reading/parsing, regular meshes don't. #[derive(Debug, PartialEq, Eq, Hash, Clone)] @@ -26,7 +31,7 @@ pub struct MeshCacheKey { /// Caches meshes based on their [`MeshCacheKey`]. #[derive(Default)] -pub struct MeshCache(ahash::HashMap<MeshCacheKey, Option<Arc<LoadedMesh>>>); +pub struct MeshCache(HashMap<RowId, HashMap<MeshCacheKey, Option<Arc<LoadedMesh>>>>); /// Either a [`re_types::archetypes::Asset3D`] or [`re_types::archetypes::Mesh3D`] to be cached. #[derive(Debug, Clone, Copy)] @@ -52,6 +57,8 @@ impl MeshCache { re_tracing::profile_function!(); self.0 + .entry(key.versioned_instance_path_hash.row_id) + .or_default() .entry(key) .or_insert_with(|| { re_log::debug!("Loading CPU mesh {name:?}…"); @@ -75,6 +82,39 @@ impl Cache for MeshCache { self.0.clear(); } + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + let row_ids_removed: HashSet<RowId> = events + .iter() + .flat_map(|event| { + let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion; + let contains_mesh_data = || { + let contains_asset_blob = event + .chunk + .components() + .contains_key(&re_types::components::Blob::name()); + + let contains_vertex_positions = event + .chunk + .components() + .contains_key(&re_types::components::Position3D::name()); + + contains_asset_blob || contains_vertex_positions + }; + + if is_deletion() && contains_mesh_data() { + Either::Left(event.chunk.row_ids()) + } else { + Either::Right(std::iter::empty()) + } + }) + .collect(); + + self.0 + .retain(|row_id, _per_key| !row_ids_removed.contains(row_id)); + } + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self } From 07341382bb015f18cc2756d5513743f92db01951 Mon Sep 17 00:00:00 2001 From: Clement Rey <cr.rey.clement@gmail.com> Date: Thu, 26 Sep 2024 18:12:35 +0200 Subject: [PATCH 4/8] invalidate tensor cache on deletions and overwrites --- .../src/cache/tensor_stats_cache.rs | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/crates/viewer/re_viewer_context/src/cache/tensor_stats_cache.rs b/crates/viewer/re_viewer_context/src/cache/tensor_stats_cache.rs index fdba62d03365..bbc7ed5e30ae 100644 --- a/crates/viewer/re_viewer_context/src/cache/tensor_stats_cache.rs +++ b/crates/viewer/re_viewer_context/src/cache/tensor_stats_cache.rs @@ -1,22 +1,25 @@ +use ahash::{HashMap, HashSet}; +use itertools::Either; + use re_chunk::RowId; -use re_types::datatypes::TensorData; +use re_chunk_store::ChunkStoreEvent; +use re_types::{datatypes::TensorData, Loggable as _}; -use crate::Cache; -use crate::TensorStats; +use crate::{Cache, TensorStats}; /// Caches tensor stats using a [`RowId`], i.e. a specific instance of /// a `TensorData` component #[derive(Default)] -pub struct TensorStatsCache(ahash::HashMap<RowId, TensorStats>); +pub struct TensorStatsCache(HashMap<RowId, TensorStats>); impl TensorStatsCache { /// The key should be the `RowId` of the `TensorData`. /// NOTE: `TensorData` is never batched (they are mono-components), /// so we don't need the instance id here. - pub fn entry(&mut self, key: RowId, tensor: &TensorData) -> TensorStats { + pub fn entry(&mut self, tensor_data_row_id: RowId, tensor: &TensorData) -> TensorStats { *self .0 - .entry(key) + .entry(tensor_data_row_id) .or_insert_with(|| TensorStats::from_tensor(tensor)) } } @@ -26,6 +29,32 @@ impl Cache for TensorStatsCache { // Purging the tensor stats is not worth it - these are very small objects! } + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + let row_ids_removed: HashSet<RowId> = events + .iter() + .flat_map(|event| { + let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion; + let contains_tensor_data = || { + event + .chunk + .components() + .contains_key(&re_types::components::TensorData::name()) + }; + + if is_deletion() && contains_tensor_data() { + Either::Left(event.chunk.row_ids()) + } else { + Either::Right(std::iter::empty()) + } + }) + .collect(); + + self.0 + .retain(|row_id, _per_key| !row_ids_removed.contains(row_id)); + } + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self } From 718409b70ef92e6856d9c3a4500022cc48cb8a48 Mon Sep 17 00:00:00 2001 From: Clement Rey <cr.rey.clement@gmail.com> Date: Thu, 26 Sep 2024 18:12:52 +0200 Subject: [PATCH 5/8] invalidate video cache on deletions and overwrites --- .../src/cache/video_cache.rs | 81 ++++++++++++++----- 1 file changed, 61 insertions(+), 20 deletions(-) diff --git a/crates/viewer/re_viewer_context/src/cache/video_cache.rs b/crates/viewer/re_viewer_context/src/cache/video_cache.rs index e15e88254a7e..8faa31a76a0f 100644 --- a/crates/viewer/re_viewer_context/src/cache/video_cache.rs +++ b/crates/viewer/re_viewer_context/src/cache/video_cache.rs @@ -1,13 +1,19 @@ -use crate::Cache; -use re_chunk::RowId; -use re_log_types::hash::Hash64; -use re_renderer::{external::re_video::VideoLoadError, video::Video}; - use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; +use ahash::{HashMap, HashSet}; + +use itertools::Either; +use re_chunk::RowId; +use re_chunk_store::ChunkStoreEvent; +use re_log_types::hash::Hash64; +use re_renderer::{external::re_video::VideoLoadError, video::Video}; +use re_types::Loggable as _; + +use crate::Cache; + // ---------------------------------------------------------------------------- struct Entry { @@ -19,7 +25,7 @@ struct Entry { /// Caches meshes based on media type & row id. #[derive(Default)] -pub struct VideoCache(ahash::HashMap<Hash64, Entry>); +pub struct VideoCache(HashMap<RowId, HashMap<Hash64, Entry>>); impl VideoCache { /// Read in some video data and cache the result. @@ -29,21 +35,26 @@ impl VideoCache { /// so we don't need the instance id here. pub fn entry( &mut self, - row_id: RowId, + blob_row_id: RowId, video_data: &re_types::datatypes::Blob, media_type: Option<&str>, ) -> Arc<Result<Video, VideoLoadError>> { re_tracing::profile_function!(); - let key = Hash64::hash((row_id, media_type)); + let inner_key = Hash64::hash(media_type); - let entry = self.0.entry(key).or_insert_with(|| { - let video = Video::load(video_data, media_type); - Entry { - used_this_frame: AtomicBool::new(true), - video: Arc::new(video), - } - }); + let entry = self + .0 + .entry(blob_row_id) + .or_default() + .entry(inner_key) + .or_insert_with(|| { + let video = Video::load(video_data, media_type); + Entry { + used_this_frame: AtomicBool::new(true), + video: Arc::new(video), + } + }); // Using acquire/release here to be on the safe side and for semantical soundness: // Whatever thread is acquiring the fact that this was used, should also see/acquire @@ -55,17 +66,47 @@ impl VideoCache { impl Cache for VideoCache { fn begin_frame(&mut self, renderer_active_frame_idx: u64) { - for v in self.0.values() { - v.used_this_frame.store(false, Ordering::Release); - if let Ok(video) = v.video.as_ref() { - video.purge_unused_decoders(renderer_active_frame_idx); + for per_key in self.0.values() { + for v in per_key.values() { + v.used_this_frame.store(false, Ordering::Release); + if let Ok(video) = v.video.as_ref() { + video.purge_unused_decoders(renderer_active_frame_idx); + } } } } fn purge_memory(&mut self) { + self.0.retain(|_row_id, per_key| { + per_key.retain(|_, v| v.used_this_frame.load(Ordering::Acquire)); + !per_key.is_empty() + }); + } + + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + let row_ids_removed: HashSet<RowId> = events + .iter() + .flat_map(|event| { + let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion; + let contains_video_blob = || { + event + .chunk + .components() + .contains_key(&re_types::components::Blob::name()) + }; + + if is_deletion() && contains_video_blob() { + Either::Left(event.chunk.row_ids()) + } else { + Either::Right(std::iter::empty()) + } + }) + .collect(); + self.0 - .retain(|_, v| v.used_this_frame.load(Ordering::Acquire)); + .retain(|row_id, _per_key| !row_ids_removed.contains(row_id)); } fn as_any_mut(&mut self) -> &mut dyn std::any::Any { From 33f53a2e8e1d65b7b4e19087a179045cc7bcce8f Mon Sep 17 00:00:00 2001 From: Clement Rey <cr.rey.clement@gmail.com> Date: Thu, 26 Sep 2024 18:21:23 +0200 Subject: [PATCH 6/8] add --static flag to face_tracking example for future reference --- .../python/face_tracking/face_tracking.py | 68 +++++++++++++++---- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/examples/python/face_tracking/face_tracking.py b/examples/python/face_tracking/face_tracking.py index 3981c23e1d9b..7c389014104a 100755 --- a/examples/python/face_tracking/face_tracking.py +++ b/examples/python/face_tracking/face_tracking.py @@ -20,6 +20,10 @@ import tqdm from mediapipe.tasks.python import vision +# If set, log everything as static +global ALL_STATIC +ALL_STATIC: bool = False + EXAMPLE_DIR: Final = Path(os.path.dirname(__file__)) DATASET_DIR: Final = EXAMPLE_DIR / "dataset" MODEL_DIR: Final = EXAMPLE_DIR / "model" @@ -118,7 +122,8 @@ def __init__(self, video_mode: bool = False): rr.log( "video/detector", rr.ClassDescription( - info=rr.AnnotationInfo(id=0), keypoint_connections=[(0, 1), (1, 2), (2, 0), (2, 3), (0, 4), (1, 5)] + info=rr.AnnotationInfo(id=0), + keypoint_connections=[(0, 1), (1, 2), (2, 0), (2, 3), (0, 4), (1, 5)], ), static=True, ) @@ -132,7 +137,7 @@ def detect_and_log(self, image: cv2.typing.MatLike, frame_time_nano: int) -> Non if self._video_mode else self._detector.detect(image) ) - rr.log("video/detector/faces", rr.Clear(recursive=True)) + rr.log("video/detector/faces", rr.Clear(recursive=True), static=ALL_STATIC) for i, detection in enumerate(detection_result.detections): # log bounding box bbox = detection.bounding_box @@ -142,16 +147,22 @@ def detect_and_log(self, image: cv2.typing.MatLike, frame_time_nano: int) -> Non rr.log( f"video/detector/faces/{i}/bbox", rr.Boxes2D( - array=[bbox.origin_x, bbox.origin_y, bbox.width, bbox.height], array_format=rr.Box2DFormat.XYWH + array=[bbox.origin_x, bbox.origin_y, bbox.width, bbox.height], + array_format=rr.Box2DFormat.XYWH, ), rr.AnyValues(index=index, score=score), + static=ALL_STATIC, ) # MediaPipe's keypoints are normalized to [0, 1], so we need to scale them to get pixel coordinates. pts = [ (math.floor(keypoint.x * width), math.floor(keypoint.y * height)) for keypoint in detection.keypoints ] - rr.log(f"video/detector/faces/{i}/keypoints", rr.Points2D(pts, radii=3, keypoint_ids=list(range(6)))) + rr.log( + f"video/detector/faces/{i}/keypoints", + rr.Points2D(pts, radii=3, keypoint_ids=list(range(6))), + static=ALL_STATIC, + ) class FaceLandmarkerLogger: @@ -247,17 +258,25 @@ def is_empty(i): # type: ignore[no-untyped-def] return True if is_empty(zip(detection_result.face_landmarks, detection_result.face_blendshapes)): - rr.log("video/landmarker/faces", rr.Clear(recursive=True)) - rr.log("reconstruction/faces", rr.Clear(recursive=True)) - rr.log("blendshapes", rr.Clear(recursive=True)) + rr.log("video/landmarker/faces", rr.Clear(recursive=True), static=ALL_STATIC) + rr.log("reconstruction/faces", rr.Clear(recursive=True), static=ALL_STATIC) + rr.log("blendshapes", rr.Clear(recursive=True), static=ALL_STATIC) for i, (landmark, blendshapes) in enumerate( zip(detection_result.face_landmarks, detection_result.face_blendshapes) ): if len(landmark) == 0 or len(blendshapes) == 0: - rr.log(f"video/landmarker/faces/{i}/landmarks", rr.Clear(recursive=True)) - rr.log(f"reconstruction/faces/{i}", rr.Clear(recursive=True)) - rr.log(f"blendshapes/{i}", rr.Clear(recursive=True)) + rr.log( + f"video/landmarker/faces/{i}/landmarks", + rr.Clear(recursive=True), + static=ALL_STATIC, + ) + rr.log( + f"reconstruction/faces/{i}", + rr.Clear(recursive=True), + static=ALL_STATIC, + ) + rr.log(f"blendshapes/{i}", rr.Clear(recursive=True), static=ALL_STATIC) continue # MediaPipe's keypoints are normalized to [0, 1], so we need to scale them to get pixel coordinates. @@ -266,6 +285,7 @@ def is_empty(i): # type: ignore[no-untyped-def] rr.log( f"video/landmarker/faces/{i}/landmarks", rr.Points2D(pts, radii=3, keypoint_ids=keypoint_ids, class_ids=self._class_ids), + static=ALL_STATIC, ) rr.log( @@ -275,11 +295,16 @@ def is_empty(i): # type: ignore[no-untyped-def] keypoint_ids=keypoint_ids, class_ids=self._class_ids, ), + static=ALL_STATIC, ) for blendshape in blendshapes: if blendshape.category_name in BLENDSHAPES_CATEGORIES: - rr.log(f"blendshapes/{i}/{blendshape.category_name}", rr.Scalar(blendshape.score)) + # NOTE(cmc): That one we still log as temporal, otherwise it's really meh. + rr.log( + f"blendshapes/{i}/{blendshape.category_name}", + rr.Scalar(blendshape.score), + ) # ======================================================================================== @@ -362,7 +387,11 @@ def run_from_video_capture(vid: int | str, max_dim: int | None, max_frame_count: rr.set_time_nanos("frame_time", frame_time_nano) detector.detect_and_log(frame, frame_time_nano) landmarker.detect_and_log(frame, frame_time_nano) - rr.log("video/image", rr.Image(frame, color_model="BGR")) + rr.log( + "video/image", + rr.Image(frame, color_model="BGR"), + static=ALL_STATIC, + ) except KeyboardInterrupt: pass @@ -380,7 +409,11 @@ def run_from_sample_image(path: Path, max_dim: int | None, num_faces: int) -> No landmarker = FaceLandmarkerLogger(video_mode=False, num_faces=num_faces) logger.detect_and_log(image, 0) landmarker.detect_and_log(image, 0) - rr.log("video/image", rr.Image(image, color_model="BGR")) + rr.log( + "video/image", + rr.Image(image, color_model="BGR"), + static=ALL_STATIC, + ) def main() -> None: @@ -400,7 +433,10 @@ def main() -> None: ) parser.add_argument("--video", type=Path, help="Run on the provided video file.") parser.add_argument( - "--camera", type=int, default=0, help="Run from the camera stream (parameter is the camera ID, usually 0)" + "--camera", + type=int, + default=0, + help="Run from the camera stream (parameter is the camera ID, usually 0)", ) parser.add_argument( "--max-frame", @@ -421,6 +457,7 @@ def main() -> None: "(temporal smoothing is applied only for a value of 1)." ), ) + parser.add_argument("--static", action="store_true", help="If set, logs everything as static") rr.script_add_args(parser) @@ -450,6 +487,9 @@ def main() -> None: ), ) + global ALL_STATIC + ALL_STATIC = args.static + if args.demo_image: if not SAMPLE_IMAGE_PATH.exists(): download_file(SAMPLE_IMAGE_URL, SAMPLE_IMAGE_PATH) From 09ad22aaf82bbe19c2f17c1d397201e24698a4e8 Mon Sep 17 00:00:00 2001 From: Clement Rey <cr.rey.clement@gmail.com> Date: Fri, 27 Sep 2024 12:01:04 +0200 Subject: [PATCH 7/8] Update examples/python/face_tracking/face_tracking.py Co-authored-by: Andreas Reich <andreas@rerun.io> --- examples/python/face_tracking/face_tracking.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/examples/python/face_tracking/face_tracking.py b/examples/python/face_tracking/face_tracking.py index 7c389014104a..a6edeedb361d 100755 --- a/examples/python/face_tracking/face_tracking.py +++ b/examples/python/face_tracking/face_tracking.py @@ -20,7 +20,14 @@ import tqdm from mediapipe.tasks.python import vision -# If set, log everything as static +# If set, log everything as static. +# +# Generally, the Viewer accumulates data until its set memory budget at which point it will +# remove the oldest data from the recording (see https://rerun.io/docs/howto/limit-ram) +# By instead logging data as static, no data will be accumulated over time since previous +# data is overwritten. +# Naturally, the drawback of this is that there's no history of previous data sent to the viewer, +# as well as no timestamps, making the Viewer's timeline effectively inactive. global ALL_STATIC ALL_STATIC: bool = False From 7501d082cefe54c7595ee044591babe1d206f4da Mon Sep 17 00:00:00 2001 From: Clement Rey <cr.rey.clement@gmail.com> Date: Fri, 27 Sep 2024 12:05:30 +0200 Subject: [PATCH 8/8] lint --- .../python/face_tracking/face_tracking.py | 61 ++++++++++++++----- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/examples/python/face_tracking/face_tracking.py b/examples/python/face_tracking/face_tracking.py index a6edeedb361d..c5729fde2d36 100755 --- a/examples/python/face_tracking/face_tracking.py +++ b/examples/python/face_tracking/face_tracking.py @@ -27,7 +27,7 @@ # By instead logging data as static, no data will be accumulated over time since previous # data is overwritten. # Naturally, the drawback of this is that there's no history of previous data sent to the viewer, -# as well as no timestamps, making the Viewer's timeline effectively inactive. +# as well as no timestamps, making the Viewer's timeline effectively inactive. global ALL_STATIC ALL_STATIC: bool = False @@ -121,7 +121,9 @@ def __init__(self, video_mode: bool = False): ) self._options = vision.FaceDetectorOptions( base_options=self._base_options, - running_mode=mp.tasks.vision.RunningMode.VIDEO if self._video_mode else mp.tasks.vision.RunningMode.IMAGE, + running_mode=mp.tasks.vision.RunningMode.VIDEO + if self._video_mode + else mp.tasks.vision.RunningMode.IMAGE, ) self._detector = vision.FaceDetector.create_from_options(self._options) @@ -163,7 +165,8 @@ def detect_and_log(self, image: cv2.typing.MatLike, frame_time_nano: int) -> Non # MediaPipe's keypoints are normalized to [0, 1], so we need to scale them to get pixel coordinates. pts = [ - (math.floor(keypoint.x * width), math.floor(keypoint.y * height)) for keypoint in detection.keypoints + (math.floor(keypoint.x * width), math.floor(keypoint.y * height)) + for keypoint in detection.keypoints ] rr.log( f"video/detector/faces/{i}/keypoints", @@ -199,7 +202,9 @@ def __init__(self, video_mode: bool = False, num_faces: int = 1): base_options=self._base_options, output_face_blendshapes=True, num_faces=num_faces, - running_mode=mp.tasks.vision.RunningMode.VIDEO if self._video_mode else mp.tasks.vision.RunningMode.IMAGE, + running_mode=mp.tasks.vision.RunningMode.VIDEO + if self._video_mode + else mp.tasks.vision.RunningMode.IMAGE, ) self._detector = vision.FaceLandmarker.create_from_options(self._options) @@ -221,7 +226,9 @@ def __init__(self, video_mode: bool = False, num_faces: int = 1): mp.solutions.face_mesh.FACEMESH_NOSE, ] - self._class_ids = [0] * mp.solutions.face_mesh.FACEMESH_NUM_LANDMARKS_WITH_IRISES + self._class_ids = [ + 0 + ] * mp.solutions.face_mesh.FACEMESH_NUM_LANDMARKS_WITH_IRISES class_descriptions = [] for i, klass in enumerate(classes): # MediaPipe only provides connections for class, not actual class per keypoint. So we have to extract the @@ -241,7 +248,9 @@ def __init__(self, video_mode: bool = False, num_faces: int = 1): ) ) - rr.log("video/landmarker", rr.AnnotationContext(class_descriptions), static=True) + rr.log( + "video/landmarker", rr.AnnotationContext(class_descriptions), static=True + ) rr.log("reconstruction", rr.AnnotationContext(class_descriptions), static=True) # properly align the 3D face in the viewer @@ -264,8 +273,12 @@ def is_empty(i): # type: ignore[no-untyped-def] except StopIteration: return True - if is_empty(zip(detection_result.face_landmarks, detection_result.face_blendshapes)): - rr.log("video/landmarker/faces", rr.Clear(recursive=True), static=ALL_STATIC) + if is_empty( + zip(detection_result.face_landmarks, detection_result.face_blendshapes) + ): + rr.log( + "video/landmarker/faces", rr.Clear(recursive=True), static=ALL_STATIC + ) rr.log("reconstruction/faces", rr.Clear(recursive=True), static=ALL_STATIC) rr.log("blendshapes", rr.Clear(recursive=True), static=ALL_STATIC) @@ -287,11 +300,15 @@ def is_empty(i): # type: ignore[no-untyped-def] continue # MediaPipe's keypoints are normalized to [0, 1], so we need to scale them to get pixel coordinates. - pts = [(math.floor(lm.x * width), math.floor(lm.y * height)) for lm in landmark] + pts = [ + (math.floor(lm.x * width), math.floor(lm.y * height)) for lm in landmark + ] keypoint_ids = list(range(len(landmark))) rr.log( f"video/landmarker/faces/{i}/landmarks", - rr.Points2D(pts, radii=3, keypoint_ids=keypoint_ids, class_ids=self._class_ids), + rr.Points2D( + pts, radii=3, keypoint_ids=keypoint_ids, class_ids=self._class_ids + ), static=ALL_STATIC, ) @@ -344,7 +361,9 @@ def resize_image(image: cv2.typing.MatLike, max_dim: int | None) -> cv2.typing.M return image -def run_from_video_capture(vid: int | str, max_dim: int | None, max_frame_count: int | None, num_faces: int) -> None: +def run_from_video_capture( + vid: int | str, max_dim: int | None, max_frame_count: int | None, num_faces: int +) -> None: """ Run the face detector on a video stream. @@ -369,7 +388,9 @@ def run_from_video_capture(vid: int | str, max_dim: int | None, max_frame_count: print("Capturing video stream. Press ctrl-c to stop.") try: - it: Iterable[int] = itertools.count() if max_frame_count is None else range(max_frame_count) + it: Iterable[int] = ( + itertools.count() if max_frame_count is None else range(max_frame_count) + ) for frame_idx in tqdm.tqdm(it, desc="Processing frames"): # Capture frame-by-frame @@ -427,7 +448,9 @@ def main() -> None: logging.getLogger().addHandler(logging.StreamHandler()) logging.getLogger().setLevel("INFO") - parser = argparse.ArgumentParser(description="Uses the MediaPipe Face Detection to track a human pose in video.") + parser = argparse.ArgumentParser( + description="Uses the MediaPipe Face Detection to track a human pose in video." + ) parser.add_argument( "--demo-image", action="store_true", @@ -464,7 +487,9 @@ def main() -> None: "(temporal smoothing is applied only for a value of 1)." ), ) - parser.add_argument("--static", action="store_true", help="If set, logs everything as static") + parser.add_argument( + "--static", action="store_true", help="If set, logs everything as static" + ) rr.script_add_args(parser) @@ -505,9 +530,13 @@ def main() -> None: elif args.image is not None: run_from_sample_image(args.image, args.max_dim, args.num_faces) elif args.video is not None: - run_from_video_capture(str(args.video), args.max_dim, args.max_frame, args.num_faces) + run_from_video_capture( + str(args.video), args.max_dim, args.max_frame, args.num_faces + ) else: - run_from_video_capture(args.camera, args.max_dim, args.max_frame, args.num_faces) + run_from_video_capture( + args.camera, args.max_dim, args.max_frame, args.num_faces + ) rr.script_teardown(args)