Skip to content

Commit

Permalink
Query-time clears (latest-at only) (#6586)
Browse files Browse the repository at this point in the history
Resolves pending clears at query time, rather than through write
cascades.

Only works for latest-at queries for now. We'll take care of range
queries once the deserialization cache has been removed, it's just not
worth dealing with all the extra complexity at the moment.
The only visible change for users is that the "scalar plot hole" feature
doesn't work anymore.

- Fixes #6454
- Fixes #4220
- Fixes #3287
  • Loading branch information
teh-cmc authored Jul 4, 2024
1 parent b99ae0a commit 176b937
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 462 deletions.
109 changes: 7 additions & 102 deletions crates/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::BTreeMap;

use itertools::Itertools;
use nohash_hasher::IntMap;
use parking_lot::Mutex;
Expand All @@ -8,13 +6,13 @@ use re_data_store::{
DataStore, DataStoreConfig, GarbageCollectionOptions, StoreEvent, StoreSubscriber,
};
use re_log_types::{
ApplicationId, ComponentPath, DataCell, DataRow, DataTable, DataTableResult, EntityPath,
EntityPathHash, LogMsg, ResolvedTimeRange, ResolvedTimeRangeF, RowId, SetStoreInfo, StoreId,
StoreInfo, StoreKind, TimePoint, Timeline,
ApplicationId, ComponentPath, DataRow, DataTable, DataTableResult, EntityPath, EntityPathHash,
LogMsg, ResolvedTimeRange, ResolvedTimeRangeF, RowId, SetStoreInfo, StoreId, StoreInfo,
StoreKind, Timeline,
};
use re_types_core::{Archetype, Loggable};

use crate::{ClearCascade, CompactedStoreEvents, Error, TimesPerTimeline};
use crate::{Error, TimesPerTimeline};

// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -59,8 +57,6 @@ fn insert_row_with_retries(
// In the future a row-id clash should probably either be considered an error (with a loud warning)
// or an ignored idempotent operation (with the assumption that if the RowId is the same, so is the data).
// In any case, we cannot log loudly here.
// We also get here because of `ClearCascade`, but that could be solved by adding a random increment
// in `on_clear_cascade` (see https://github.com/rerun-io/rerun/issues/4469).
re_log::trace!(
"Found duplicated RowId ({}) during insert. Incrementing it by random offset (retry {}/{})…",
row.row_id,
Expand Down Expand Up @@ -432,8 +428,6 @@ impl EntityDb {
}

/// Inserts a [`DataRow`] into the database.
///
/// Updates the [`crate::EntityTree`] and applies [`ClearCascade`]s as needed.
pub fn add_data_row(&mut self, row: DataRow) -> Result<(), Error> {
re_tracing::profile_function!(format!("num_cells={}", row.num_cells()));

Expand Down Expand Up @@ -464,106 +458,18 @@ impl EntityDb {
DEFAULT_INSERT_ROW_STEP_SIZE,
)?;

// First-pass: update our internal views by notifying them of resulting [`StoreEvent`]s.
//
// This might result in a [`ClearCascade`] if the events trigger one or more immediate
// and/or pending clears.
// Update our internal views by notifying them of resulting [`StoreEvent`]s.
let original_store_events = &[store_event];
self.times_per_timeline.on_events(original_store_events);
self.query_caches.on_events(original_store_events);
let clear_cascade = self.tree.on_store_additions(original_store_events);

// Second-pass: update the [`DataStore`] by applying the [`ClearCascade`].
//
// This will in turn generate new [`StoreEvent`]s that our internal views need to be
// notified of, again!
let new_store_events = self.on_clear_cascade(clear_cascade);
self.times_per_timeline.on_events(&new_store_events);
self.query_caches.on_events(&new_store_events);
let clear_cascade = self.tree.on_store_additions(&new_store_events);

// Clears don't affect `Clear` components themselves, therefore we cannot have recursive
// cascades, thus this whole process must stabilize after one iteration.
if !clear_cascade.is_empty() {
re_log::debug!(
"recursive clear cascade detected -- might just have been logged this way"
);
}
self.tree.on_store_additions(original_store_events);

// We inform the stats last, since it measures e2e latency.
self.stats.on_events(original_store_events);

Ok(())
}

fn on_clear_cascade(&mut self, clear_cascade: ClearCascade) -> Vec<StoreEvent> {
let mut store_events = Vec::new();

// Create the empty cells to be inserted.
//
// Reminder: these are the [`RowId`]s of the `Clear` components that triggered the
// cascade, they are not unique and may be shared across many entity paths.
let mut to_be_inserted =
BTreeMap::<RowId, BTreeMap<EntityPath, (TimePoint, Vec<DataCell>)>>::default();
for (row_id, per_entity) in clear_cascade.to_be_cleared {
for (entity_path, (timepoint, component_paths)) in per_entity {
let per_entity = to_be_inserted.entry(row_id).or_default();
let (cur_timepoint, cells) = per_entity.entry(entity_path).or_default();

*cur_timepoint = timepoint.union_max(cur_timepoint);
for component_path in component_paths {
if let Some(data_type) = self
.data_store
.lookup_datatype(&component_path.component_name)
{
cells.push(DataCell::from_arrow_empty(
component_path.component_name,
data_type.clone(),
));
}
}
}
}

for (row_id, per_entity) in to_be_inserted {
let mut row_id = row_id;
for (entity_path, (timepoint, cells)) in per_entity {
// NOTE: It is important we insert all those empty components using a single row (id)!
// 1. It'll be much more efficient when querying that data back.
// 2. Otherwise we will end up with a flaky row ordering, as we have no way to tie-break
// these rows! This flaky ordering will in turn leak through the public
// API (e.g. range queries)!
match DataRow::from_cells(row_id, timepoint.clone(), entity_path, cells) {
Ok(row) => {
let res = insert_row_with_retries(
&mut self.data_store,
row,
MAX_INSERT_ROW_ATTEMPTS,
DEFAULT_INSERT_ROW_STEP_SIZE,
);

match res {
Ok(store_event) => {
row_id = store_event.row_id.next();
store_events.push(store_event);
}
Err(err) => {
re_log::error_once!(
"Failed to propagate EntityTree cascade: {err}"
);
}
}
}
Err(err) => {
re_log::error_once!("Failed to propagate EntityTree cascade: {err}");
}
}
}
}

store_events
}

fn register_entity_path(&mut self, entity_path: &EntityPath) {
self.entity_path_from_hash
.entry(entity_path.hash())
Expand Down Expand Up @@ -648,8 +554,7 @@ impl EntityDb {
query_caches.on_events(store_events);

let store_events = store_events.iter().collect_vec();
let compacted = CompactedStoreEvents::new(&store_events);
tree.on_store_deletions(&store_events, &compacted);
tree.on_store_deletions(&store_events);
}

/// Key used for sorting recordings in the UI.
Expand Down
Loading

0 comments on commit 176b937

Please sign in to comment.