Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic removal of unreachable static chunks #7518

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 173 additions & 15 deletions crates/store/re_chunk_store/src/writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ impl ChunkStore {

let row_id_range_per_component = chunk.row_id_range_per_component();

let mut overwritten_chunk_ids = HashMap::default();

for (&component_name, list_array) in chunk.components() {
let is_empty = list_array
.validity()
Expand All @@ -79,16 +81,25 @@ impl ChunkStore {
// NOTE: When attempting to overwrite static data, the chunk with the most
// recent data within -- according to RowId -- wins.

let cur_row_id_max = self.chunks_per_chunk_id.get(cur_chunk_id).map_or(
RowId::ZERO,
|chunk| {
let (cur_row_id_min, cur_row_id_max) = self
.chunks_per_chunk_id
.get(cur_chunk_id)
.map_or((RowId::ZERO, RowId::ZERO), |chunk| {
chunk
.row_id_range_per_component()
.get(&component_name)
.map_or(RowId::ZERO, |(_, row_id_max)| *row_id_max)
},
);
.map_or(
(RowId::ZERO, RowId::ZERO),
|(row_id_min, row_id_max)| (*row_id_min, *row_id_max),
)
});
if *row_id_max > cur_row_id_max {
// We are about to overwrite the existing chunk with the new one, at
// least for this one specific component.
// Keep track of the overwritten ChunkId: we'll need it further down in
// order to check whether that chunk is now dangling.
overwritten_chunk_ids.insert(*cur_chunk_id, cur_row_id_min);

*cur_chunk_id = chunk.id();
}
})
Expand All @@ -97,13 +108,55 @@ impl ChunkStore {

self.static_chunks_stats += ChunkStoreChunkStats::from_chunk(chunk);

(
Arc::clone(chunk),
vec![ChunkStoreDiff::addition(
non_compacted_chunk, /* added */
None, /* compacted */
)],
)
let mut diffs = vec![ChunkStoreDiff::addition(
non_compacted_chunk, /* added */
None, /* compacted */
)];

// NOTE(1): Our chunks can only cover a single entity path at a time, therefore we know we
// only have to check that one entity for complete overwrite.
// NOTE(2): This condition cannot fail, we just want to avoid unwrapping.
if let Some(per_component) = self.static_chunk_ids_per_entity.get(chunk.entity_path()) {
re_tracing::profile_scope!("static dangling checks");

// At this point, we are in possession of a list of ChunkIds that were at least
// _partially_ overwritten (i.e. some, but not necessarily all, of the components
// that they used to provide the data for are now provided by another, newer chunk).
//
// To determine whether any of these chunks are actually fully overwritten, and
// therefore dangling, we need to make sure there are no components left
// referencing these ChunkIds whatsoever.
//
// Because our storage model guarantees that a single chunk cannot cover more than
// one entity, this is actually pretty cheap to do, since we only have to loop over
// all the components of a single entity.

for (chunk_id, chunk_row_id_min) in overwritten_chunk_ids {
let has_been_fully_overwritten = !per_component
.values()
.any(|cur_chunk_id| *cur_chunk_id == chunk_id);

if has_been_fully_overwritten {
// The chunk is now dangling: remove it from all relevant indices, update
// the stats, and fire deletion events.

let chunk_id_removed =
self.chunk_ids_per_min_row_id.remove(&chunk_row_id_min);
debug_assert!(chunk_id_removed.is_some());

let chunk_removed = self.chunks_per_chunk_id.remove(&chunk_id);
debug_assert!(chunk_removed.is_some());

if let Some(chunk_removed) = chunk_removed {
self.static_chunks_stats -=
ChunkStoreChunkStats::from_chunk(&chunk_removed);
diffs.push(ChunkStoreDiff::deletion(chunk_removed));
}
}
}
}

(Arc::clone(chunk), diffs)
} else {
// Temporal data: just index the chunk on every dimension of interest.
re_tracing::profile_scope!("temporal");
Expand Down Expand Up @@ -525,8 +578,11 @@ impl ChunkStore {

#[cfg(test)]
mod tests {
use re_chunk::Timeline;
use re_log_types::example_components::MyPoint;
use re_chunk::{TimePoint, Timeline};
use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
use similar_asserts::assert_eq;

use crate::ChunkStoreDiffKind;

use super::*;

Expand Down Expand Up @@ -644,4 +700,106 @@ mod tests {

Ok(())
}

#[test]
fn static_overwrites() -> anyhow::Result<()> {
re_log::setup_logging();

let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
Default::default(),
);

let entity_path = EntityPath::from("this/that");

let row_id1_1 = RowId::new();
let row_id2_1 = RowId::new();
let row_id2_2 = RowId::new();

let timepoint_static = TimePoint::default();

let points1 = &[MyPoint::new(1.0, 1.0)];
let colors1 = &[MyColor::from_rgb(1, 1, 1)];
let labels1 = &[MyLabel("111".to_owned())];

let points2 = &[MyPoint::new(2.0, 2.0)];
let colors2 = &[MyColor::from_rgb(2, 2, 2)];
let labels2 = &[MyLabel("222".to_owned())];

let chunk1 = Chunk::builder(entity_path.clone())
.with_component_batches(
row_id1_1,
timepoint_static.clone(),
[points1 as _, colors1 as _, labels1 as _],
)
.build()?;
let chunk2 = Chunk::builder(entity_path.clone())
.with_component_batches(
row_id2_1,
timepoint_static.clone(),
[points2 as _, colors2 as _],
)
.build()?;
let chunk3 = Chunk::builder(entity_path.clone())
.with_component_batches(row_id2_2, timepoint_static, [labels2 as _])
.build()?;

let chunk1 = Arc::new(chunk1);
let chunk2 = Arc::new(chunk2);
let chunk3 = Arc::new(chunk3);

let events = store.insert_chunk(&chunk1)?;
assert!(
events.len() == 1
&& events[0].chunk.id() == chunk1.id()
&& events[0].kind == ChunkStoreDiffKind::Addition,
"the first write should result in the addition of chunk1 and nothing else"
);

let events = store.insert_chunk(&chunk2)?;
assert!(
events.len() == 1
&& events[0].chunk.id() == chunk2.id()
&& events[0].kind == ChunkStoreDiffKind::Addition,
"the second write should result in the addition of chunk2 and nothing else"
);

let stats_before = store.stats();
{
let ChunkStoreChunkStats {
num_chunks,
total_size_bytes: _,
num_rows,
num_events,
} = stats_before.static_chunks;
assert_eq!(2, num_chunks);
assert_eq!(2, num_rows);
assert_eq!(5, num_events);
}

let events = store.insert_chunk(&chunk3)?;
assert!(
events.len() == 2
&& events[0].chunk.id() == chunk3.id()
&& events[0].kind == ChunkStoreDiffKind::Addition
&& events[1].chunk.id() == chunk1.id()
&& events[1].kind == ChunkStoreDiffKind::Deletion,
"the final write should result in the addition of chunk3 _and_ the deletion of the now fully overwritten chunk1"
);

let stats_after = store.stats();
{
let ChunkStoreChunkStats {
num_chunks,
total_size_bytes: _,
num_rows,
num_events,
} = stats_after.static_chunks;
assert_eq!(2, num_chunks);
assert_eq!(2, num_rows);
assert_eq!(3, num_events);
}

Ok(())
}
}
5 changes: 5 additions & 0 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,11 @@ impl EntityDb {
self.query_caches.on_events(&store_events);
self.tree.on_store_additions(&store_events);

// It is possible for writes to trigger deletions: specifically in the case of
// overwritten static data leading to dangling chunks.
self.tree
.on_store_deletions(&self.data_store, &store_events);

// We inform the stats last, since it measures e2e latency.
self.stats.on_events(&store_events);
}
Expand Down
Loading