Skip to content

Commit

Permalink
Introduce ChunkStore::drop_entity_path (#6588)
Browse files Browse the repository at this point in the history
Introduces a new API to drop an entity path in its entirety,
unconditionally.

- Part of #6552
- Part of #1329
  • Loading branch information
teh-cmc authored Jul 4, 2024
1 parent e2d1037 commit 84fc7f1
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 68 deletions.
54 changes: 12 additions & 42 deletions crates/re_chunk_store/src/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use std::{
};

use ahash::{HashMap, HashSet};
use nohash_hasher::{IntMap, IntSet};
use re_chunk::{Chunk, ChunkId};
use nohash_hasher::IntMap;
use web_time::Instant;

use re_chunk::{Chunk, ChunkId};
use re_log_types::{EntityPath, TimeInt, Timeline};
use re_types_core::{ComponentName, SizeBytes};

Expand Down Expand Up @@ -51,18 +51,6 @@ pub struct GarbageCollectionOptions {

/// How many component revisions to preserve on each timeline.
pub protect_latest: usize,

/// Components which should not be protected from GC when using
/// [`GarbageCollectionOptions::protect_latest`].
//
// TODO(#6552): this should be removed in favor of a dedicated `remove_entity_path` API.
pub dont_protect_components: IntSet<ComponentName>,

/// Timelines which should not be protected from GC when using `protect_latest`
/// [`GarbageCollectionOptions::protect_latest`].
//
// TODO(#6552): this should be removed in favor of a dedicated `remove_entity_path` API.
pub dont_protect_timelines: IntSet<Timeline>,
}

impl GarbageCollectionOptions {
Expand All @@ -71,8 +59,6 @@ impl GarbageCollectionOptions {
target: GarbageCollectionTarget::Everything,
time_budget: std::time::Duration::MAX,
protect_latest: 0,
dont_protect_components: Default::default(),
dont_protect_timelines: Default::default(),
}
}
}
Expand Down Expand Up @@ -126,11 +112,7 @@ impl ChunkStore {
let total_num_chunks_before = stats_before.total().num_chunks;
let total_num_rows_before = stats_before.total().total_num_rows;

let protected_chunk_ids = self.find_all_protected_chunk_ids(
options.protect_latest,
&options.dont_protect_components,
&options.dont_protect_timelines,
);
let protected_chunk_ids = self.find_all_protected_chunk_ids(options.protect_latest);

let diffs = match options.target {
GarbageCollectionTarget::DropAtLeastFraction(p) => {
Expand Down Expand Up @@ -215,12 +197,7 @@ impl ChunkStore {
//
// TODO(jleibs): More complex functionality might required expanding this to also
// *ignore* specific entities, components, timelines, etc. for this protection.
fn find_all_protected_chunk_ids(
&self,
target_count: usize,
dont_protect_components: &IntSet<ComponentName>,
dont_protect_timelines: &IntSet<Timeline>,
) -> BTreeSet<ChunkId> {
fn find_all_protected_chunk_ids(&self, target_count: usize) -> BTreeSet<ChunkId> {
re_tracing::profile_function!();

if target_count == 0 {
Expand All @@ -230,19 +207,10 @@ impl ChunkStore {
self.temporal_chunk_ids_per_entity
.values()
.flat_map(|temporal_chunk_ids_per_timeline| {
temporal_chunk_ids_per_timeline
.iter()
.filter_map(|(timeline, temporal_chunk_ids_per_component)| {
(!dont_protect_timelines.contains(timeline))
.then_some(temporal_chunk_ids_per_component)
})
.flat_map(|temporal_chunk_ids_per_component| {
temporal_chunk_ids_per_component
.iter()
.filter(|(component_name, _)| {
!dont_protect_components.contains(component_name)
})
.flat_map(|(_, temporal_chunk_ids_per_time)| {
temporal_chunk_ids_per_timeline.iter().flat_map(
|(_timeline, temporal_chunk_ids_per_component)| {
temporal_chunk_ids_per_component.iter().flat_map(
|(_, temporal_chunk_ids_per_time)| {
temporal_chunk_ids_per_time
.per_start_time
.last_key_value()
Expand All @@ -261,8 +229,10 @@ impl ChunkStore {
.into_iter()
.rev()
.take(target_count)
})
})
},
)
},
)
})
.collect()
}
Expand Down
108 changes: 104 additions & 4 deletions crates/re_chunk_store/src/writes.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::sync::Arc;
use std::{collections::BTreeSet, sync::Arc};

use arrow2::array::{Array as _, ListArray as ArrowListArray};
use itertools::Itertools as _;

use re_chunk::{Chunk, RowId};
use re_chunk::{Chunk, EntityPath, RowId};

use crate::{
ChunkStore, ChunkStoreChunkStats, ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreError,
ChunkStoreEvent, ChunkStoreResult,
store::ChunkIdSetPerTime, ChunkStore, ChunkStoreChunkStats, ChunkStoreDiff, ChunkStoreDiffKind,
ChunkStoreError, ChunkStoreEvent, ChunkStoreResult,
};

// Used all over in docstrings.
Expand Down Expand Up @@ -173,4 +174,103 @@ impl ChunkStore {

Ok(Some(event))
}

/// Unconditionally drops all the data for a given `entity_path`.
///
/// Returns the list of `Chunk`s that were dropped from the store in the form of [`ChunkStoreEvent`]s.
///
/// This is _not_ recursive. The store is unaware of the entity hierarchy.
pub fn drop_entity_path(&mut self, entity_path: &EntityPath) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!(entity_path.to_string());

self.gc_id += 1; // close enough

let generation = self.generation();

let Self {
id,
config: _,
type_registry: _,
chunks_per_chunk_id,
chunk_ids_per_min_row_id,
temporal_chunk_ids_per_entity,
temporal_chunks_stats,
static_chunk_ids_per_entity,
static_chunks_stats,
insert_id: _,
query_id: _,
gc_id: _,
event_id,
} = self;

let dropped_static_chunks = {
let dropped_static_chunk_ids: BTreeSet<_> = static_chunk_ids_per_entity
.remove(entity_path)
.unwrap_or_default()
.into_values()
.collect();

chunk_ids_per_min_row_id.retain(|_row_id, chunk_ids| {
chunk_ids.retain(|chunk_id| !dropped_static_chunk_ids.contains(chunk_id));
!chunk_ids.is_empty()
});

dropped_static_chunk_ids.into_iter()
};

let dropped_temporal_chunks = {
let dropped_temporal_chunk_ids: BTreeSet<_> = temporal_chunk_ids_per_entity
.remove(entity_path)
.unwrap_or_default()
.into_values()
.flat_map(|temporal_chunk_ids_per_component| {
temporal_chunk_ids_per_component.into_values()
})
.flat_map(|temporal_chunk_ids_per_time| {
let ChunkIdSetPerTime {
max_interval_length: _,
per_start_time,
per_end_time: _, // same chunk IDs as above
} = temporal_chunk_ids_per_time;

per_start_time
.into_values()
.flat_map(|chunk_ids| chunk_ids.into_iter())
})
.collect();

chunk_ids_per_min_row_id.retain(|_row_id, chunk_ids| {
chunk_ids.retain(|chunk_id| !dropped_temporal_chunk_ids.contains(chunk_id));
!chunk_ids.is_empty()
});

dropped_temporal_chunk_ids.into_iter()
};

let dropped_static_chunks = dropped_static_chunks
.filter_map(|chunk_id| chunks_per_chunk_id.remove(&chunk_id))
.inspect(|chunk| {
*static_chunks_stats -= ChunkStoreChunkStats::from_chunk(chunk);
})
// NOTE: gotta collect to release the mut ref on `chunks_per_chunk_id`.
.collect_vec();

let dropped_temporal_chunks = dropped_temporal_chunks
.filter_map(|chunk_id| chunks_per_chunk_id.remove(&chunk_id))
.inspect(|chunk| {
*temporal_chunks_stats -= ChunkStoreChunkStats::from_chunk(chunk);
});

dropped_static_chunks
.into_iter()
.chain(dropped_temporal_chunks)
.map(ChunkStoreDiff::deletion)
.map(|diff| ChunkStoreEvent {
store_id: id.clone(),
store_generation: generation.clone(),
event_id: event_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
diff,
})
.collect()
}
}
Loading

0 comments on commit 84fc7f1

Please sign in to comment.