From ca44717cf106988dfdd3b6f476c67ed19761e880 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 26 Sep 2024 17:19:16 +0200 Subject: [PATCH] Automatic removal of unreachable static chunks (#7518) Unreachable static chunks are dead weight: there exists no query that can access their data (at least when using Rerun as a Visualizer). By automatically removing dangling chunks, we make it possible for user to use the Rerun Viewer as a soft-realtime telemetry system (provided we properly invalidate our caches too, which is the subject of an upcoming PR). This raises the question of what should happen when using Rerun as a database: should this data be kept and made accessible? If so, this behavior should probably be made configurable (e.g. when instantiating a ChunkStore in the SDK). * Part of #7404 --- Test: ```python from pathlib import Path import rerun as rr image_file_path = Path(__file__).parent / "ferris.png" rr.init("rerun_example_encoded_image", spawn=True) for _ in range(0, 10): rr.log("image", rr.EncodedImage(path=image_file_path), static=True) ``` Command: ``` RERUN_FLUSH_NUM_ROWS=0 python test.py ``` Before: ![image](https://github.com/user-attachments/assets/a9823055-5247-4d63-9295-fc310aa4923f) After: ![image](https://github.com/user-attachments/assets/c095fda2-02ac-4456-bf3f-e251b24dc75d) --- crates/store/re_chunk_store/src/writes.rs | 188 +++++++++++++++++++-- crates/store/re_entity_db/src/entity_db.rs | 5 + 2 files changed, 178 insertions(+), 15 deletions(-) diff --git a/crates/store/re_chunk_store/src/writes.rs b/crates/store/re_chunk_store/src/writes.rs index 1d3dd2a9fe19..acf20ea04d6a 100644 --- a/crates/store/re_chunk_store/src/writes.rs +++ b/crates/store/re_chunk_store/src/writes.rs @@ -57,6 +57,8 @@ impl ChunkStore { let row_id_range_per_component = chunk.row_id_range_per_component(); + let mut overwritten_chunk_ids = HashMap::default(); + for (&component_name, list_array) in chunk.components() { let is_empty = list_array .validity() @@ -79,16 +81,25 @@ impl ChunkStore { // NOTE: When attempting to overwrite static data, the chunk with the most // recent data within -- according to RowId -- wins. - let cur_row_id_max = self.chunks_per_chunk_id.get(cur_chunk_id).map_or( - RowId::ZERO, - |chunk| { + let (cur_row_id_min, cur_row_id_max) = self + .chunks_per_chunk_id + .get(cur_chunk_id) + .map_or((RowId::ZERO, RowId::ZERO), |chunk| { chunk .row_id_range_per_component() .get(&component_name) - .map_or(RowId::ZERO, |(_, row_id_max)| *row_id_max) - }, - ); + .map_or( + (RowId::ZERO, RowId::ZERO), + |(row_id_min, row_id_max)| (*row_id_min, *row_id_max), + ) + }); if *row_id_max > cur_row_id_max { + // We are about to overwrite the existing chunk with the new one, at + // least for this one specific component. + // Keep track of the overwritten ChunkId: we'll need it further down in + // order to check whether that chunk is now dangling. + overwritten_chunk_ids.insert(*cur_chunk_id, cur_row_id_min); + *cur_chunk_id = chunk.id(); } }) @@ -97,13 +108,55 @@ impl ChunkStore { self.static_chunks_stats += ChunkStoreChunkStats::from_chunk(chunk); - ( - Arc::clone(chunk), - vec![ChunkStoreDiff::addition( - non_compacted_chunk, /* added */ - None, /* compacted */ - )], - ) + let mut diffs = vec![ChunkStoreDiff::addition( + non_compacted_chunk, /* added */ + None, /* compacted */ + )]; + + // NOTE(1): Our chunks can only cover a single entity path at a time, therefore we know we + // only have to check that one entity for complete overwrite. + // NOTE(2): This condition cannot fail, we just want to avoid unwrapping. + if let Some(per_component) = self.static_chunk_ids_per_entity.get(chunk.entity_path()) { + re_tracing::profile_scope!("static dangling checks"); + + // At this point, we are in possession of a list of ChunkIds that were at least + // _partially_ overwritten (i.e. some, but not necessarily all, of the components + // that they used to provide the data for are now provided by another, newer chunk). + // + // To determine whether any of these chunks are actually fully overwritten, and + // therefore dangling, we need to make sure there are no components left + // referencing these ChunkIds whatsoever. + // + // Because our storage model guarantees that a single chunk cannot cover more than + // one entity, this is actually pretty cheap to do, since we only have to loop over + // all the components of a single entity. + + for (chunk_id, chunk_row_id_min) in overwritten_chunk_ids { + let has_been_fully_overwritten = !per_component + .values() + .any(|cur_chunk_id| *cur_chunk_id == chunk_id); + + if has_been_fully_overwritten { + // The chunk is now dangling: remove it from all relevant indices, update + // the stats, and fire deletion events. + + let chunk_id_removed = + self.chunk_ids_per_min_row_id.remove(&chunk_row_id_min); + debug_assert!(chunk_id_removed.is_some()); + + let chunk_removed = self.chunks_per_chunk_id.remove(&chunk_id); + debug_assert!(chunk_removed.is_some()); + + if let Some(chunk_removed) = chunk_removed { + self.static_chunks_stats -= + ChunkStoreChunkStats::from_chunk(&chunk_removed); + diffs.push(ChunkStoreDiff::deletion(chunk_removed)); + } + } + } + } + + (Arc::clone(chunk), diffs) } else { // Temporal data: just index the chunk on every dimension of interest. re_tracing::profile_scope!("temporal"); @@ -525,8 +578,11 @@ impl ChunkStore { #[cfg(test)] mod tests { - use re_chunk::Timeline; - use re_log_types::example_components::MyPoint; + use re_chunk::{TimePoint, Timeline}; + use re_log_types::example_components::{MyColor, MyLabel, MyPoint}; + use similar_asserts::assert_eq; + + use crate::ChunkStoreDiffKind; use super::*; @@ -644,4 +700,106 @@ mod tests { Ok(()) } + + #[test] + fn static_overwrites() -> anyhow::Result<()> { + re_log::setup_logging(); + + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + + let entity_path = EntityPath::from("this/that"); + + let row_id1_1 = RowId::new(); + let row_id2_1 = RowId::new(); + let row_id2_2 = RowId::new(); + + let timepoint_static = TimePoint::default(); + + let points1 = &[MyPoint::new(1.0, 1.0)]; + let colors1 = &[MyColor::from_rgb(1, 1, 1)]; + let labels1 = &[MyLabel("111".to_owned())]; + + let points2 = &[MyPoint::new(2.0, 2.0)]; + let colors2 = &[MyColor::from_rgb(2, 2, 2)]; + let labels2 = &[MyLabel("222".to_owned())]; + + let chunk1 = Chunk::builder(entity_path.clone()) + .with_component_batches( + row_id1_1, + timepoint_static.clone(), + [points1 as _, colors1 as _, labels1 as _], + ) + .build()?; + let chunk2 = Chunk::builder(entity_path.clone()) + .with_component_batches( + row_id2_1, + timepoint_static.clone(), + [points2 as _, colors2 as _], + ) + .build()?; + let chunk3 = Chunk::builder(entity_path.clone()) + .with_component_batches(row_id2_2, timepoint_static, [labels2 as _]) + .build()?; + + let chunk1 = Arc::new(chunk1); + let chunk2 = Arc::new(chunk2); + let chunk3 = Arc::new(chunk3); + + let events = store.insert_chunk(&chunk1)?; + assert!( + events.len() == 1 + && events[0].chunk.id() == chunk1.id() + && events[0].kind == ChunkStoreDiffKind::Addition, + "the first write should result in the addition of chunk1 and nothing else" + ); + + let events = store.insert_chunk(&chunk2)?; + assert!( + events.len() == 1 + && events[0].chunk.id() == chunk2.id() + && events[0].kind == ChunkStoreDiffKind::Addition, + "the second write should result in the addition of chunk2 and nothing else" + ); + + let stats_before = store.stats(); + { + let ChunkStoreChunkStats { + num_chunks, + total_size_bytes: _, + num_rows, + num_events, + } = stats_before.static_chunks; + assert_eq!(2, num_chunks); + assert_eq!(2, num_rows); + assert_eq!(5, num_events); + } + + let events = store.insert_chunk(&chunk3)?; + assert!( + events.len() == 2 + && events[0].chunk.id() == chunk3.id() + && events[0].kind == ChunkStoreDiffKind::Addition + && events[1].chunk.id() == chunk1.id() + && events[1].kind == ChunkStoreDiffKind::Deletion, + "the final write should result in the addition of chunk3 _and_ the deletion of the now fully overwritten chunk1" + ); + + let stats_after = store.stats(); + { + let ChunkStoreChunkStats { + num_chunks, + total_size_bytes: _, + num_rows, + num_events, + } = stats_after.static_chunks; + assert_eq!(2, num_chunks); + assert_eq!(2, num_rows); + assert_eq!(3, num_events); + } + + Ok(()) + } } diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index 7c6aeb4a548f..16444a228eec 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -361,6 +361,11 @@ impl EntityDb { self.query_caches.on_events(&store_events); self.tree.on_store_additions(&store_events); + // It is possible for writes to trigger deletions: specifically in the case of + // overwritten static data leading to dangling chunks. + self.tree + .on_store_deletions(&self.data_store, &store_events); + // We inform the stats last, since it measures e2e latency. self.stats.on_events(&store_events); }