diff --git a/crates/re_arrow_store/src/lib.rs b/crates/re_arrow_store/src/lib.rs index c32ebf1d473c..454d16dd3725 100644 --- a/crates/re_arrow_store/src/lib.rs +++ b/crates/re_arrow_store/src/lib.rs @@ -16,6 +16,7 @@ mod arrow_util; mod store; +mod store_dump; mod store_gc; mod store_read; mod store_stats; diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index c31d3be17ae4..50afd118ecd1 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -230,6 +230,8 @@ pub struct DataStore { /// Maps `MsgId`s to some metadata (just timepoints at the moment). /// /// `BTreeMap` because of garbage collection. + // + // TODO(cmc): this isn't garbage collected after all, is it? pub(crate) messages: BTreeMap, /// Used to cache auto-generated cluster components, i.e. `[0]`, `[0, 1]`, `[0, 1, 2]`, etc diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs new file mode 100644 index 000000000000..525f82b854b6 --- /dev/null +++ b/crates/re_arrow_store/src/store_dump.rs @@ -0,0 +1,223 @@ +use itertools::Itertools as _; +use re_log_types::{ + external::arrow2_convert::deserialize::TryIntoCollection, + msg_bundle::{wrap_in_listarray, ComponentBundle, MsgBundle}, + ComponentName, EntityPath, MsgId, TimePoint, +}; + +use crate::{DataStore, IndexBucket}; + +// --- + +impl DataStore { + /// Serializes the entire datastore into an iterator of `MsgBundle`s. + /// + /// Feeding those `MsgBundle`s back into a brand new `DataStore` is guaranteed to end up in + /// the same exact state from a public API standpoint. + /// + /// Panics if no index is present for `id_key` or if it isn't fully dense (i.e.: no empty + /// rows and no empty columns either!). + pub fn as_msg_bundles(&self, id_key: ComponentName) -> impl Iterator + '_ { + // TODO(cmc): This is an obvious entrypoint for all kinds of compaction tasks. + // TODO(cmc): This shouldn't dump cluster keys that were autogenerated. + + let timeless = self.dump_timeless_indices(id_key); + let temporal = self.dump_temporal_indices(id_key); + + timeless.merge_by(temporal, |bundle1, bundle2| { + bundle1.msg_id <= bundle2.msg_id + }) + } + + /// Dumps the timeless indices & tables. + /// + /// Panics if no index is present for `id_key` or if it isn't fully dense (i.e.: no empty + /// rows and no empty columns either!). + fn dump_timeless_indices(&self, id_key: ComponentName) -> impl Iterator + '_ { + let msg_bundles = self.timeless_indices.values().map(|index| { + let ent_path = index.ent_path.clone(); + let timepoint = TimePoint::timeless(); + + // Gather all component indices, except for the internal `insert_id` one. + let all_indices = index + .indices + .iter() + // Definitely should _not_ dump insert IDs + .filter(|(component, _)| **component != DataStore::insert_id_key()) + .collect_vec(); + + // Find the index used to identify messages (i.e. `MsgId` in the case of Rerun). + // + // We currently need to fetch those to be able to create the final `MsgBundle`. + // This is an artifact of the past and should go away as we implement batch support. + // + // Reminder: the store is not aware of message identification, this entirely up to + // the application layer. + // + // It's up to the application layer to guarantee the presence of this index: this will + // panic otherwise. + let ids_index = &index.indices[&id_key]; + let ids = + ids_index.iter().map( + move |row_idx| self.get(&[id_key], &[*row_idx])[0].clone(), /* shallow */ + ); + + let mut all_tables = Vec::with_capacity(all_indices.len()); + for (component, index) in &all_indices { + all_tables.push(( + *component, + index.iter().map( + |row_idx| self.get(&[**component], &[*row_idx])[0].clone(), /* shallow */ + ), + )); + } + + ids.map(move |msg_ids| { + // It's up to the application layer to guarantee the row-density of this + // index: this will panic otherwise. + let msg_ids = msg_ids.unwrap(); + + // Again, this is an artifact of the past: we need to deserialize the raw message + // IDs in order to create actual `MsgBundle`s. + // This'll go away with batching. + // + // NOTE: Reminder that we don't _really_ support splats, so a row with N instances + // will actually have N message IDs. + let msg_ids: Vec = TryIntoCollection::try_into_collection(msg_ids).unwrap(); + + let components = all_tables + .iter_mut() + .filter_map(|(component, index)| { + index + .next() + .flatten() + .map(|data| (**component, wrap_in_listarray(data))) + }) + .map(|(component, data)| ComponentBundle::new(component, data)) + .collect_vec(); + + MsgBundle { + // It's up to the application layer to guarantee the column-density of this + // index: this will panic otherwise. + msg_id: msg_ids[0], + entity_path: ent_path.clone(), + time_point: timepoint.clone(), + components, + } + }) + }); + + msg_bundles.kmerge_by(|bundle1, bundle2| bundle1.msg_id <= bundle2.msg_id) + } + + /// Dumps the temporal indices & tables. + /// + /// Panics if no index is present for `id_key` or if it isn't fully dense (i.e.: no empty + /// rows and no empty columns either!). + fn dump_temporal_indices(&self, id_key: ComponentName) -> impl Iterator + '_ { + let msg_bundles = self.indices.values().flat_map(|index| { + index.buckets.values().map(move |bucket| { + self.dump_temporal_bucket(id_key, bucket, index.ent_path.clone()) + }) + }); + + msg_bundles.kmerge_by(|bundle1, bundle2| bundle1.msg_id <= bundle2.msg_id) + } + + /// Dumps one specific temporal bucket. + /// + /// Panics if no index is present for `id_key` or if it isn't fully dense (i.e.: no empty + /// rows and no empty columns either!). + // + // TODO(cmc): Shouldn't need to collect into a vec, but locks in the store are a mess rn. + fn dump_temporal_bucket( + &self, + id_key: ComponentName, + bucket: &IndexBucket, + ent_path: EntityPath, + ) -> Vec { + let timeline = bucket.timeline; + + bucket.sort_indices_if_needed(); + + let index = bucket.indices.read(); + + // Gather all component indices, except for the internal `insert_id` one. + let all_indices = index + .indices + .iter() + // Definitely should _not_ dump insert IDs + .filter(|(component, _)| **component != DataStore::insert_id_key()) + .collect_vec(); + + // Find the index used to identify messages (i.e. `MsgId` in the case of Rerun). + // + // We currently need to fetch those to be able to create the final `MsgBundle`. + // This is an artifact of the past and should go away as we implement batch support. + // + // Reminder: the store is not aware of message identification, this entirely up to + // the application layer. + // + // It's up to the application layer to guarantee the presence of this index: + // this will panic otherwise. + let ids_index = &index.indices[&id_key]; + let ids = ids_index.iter().map( + move |row_idx| self.get(&[id_key], &[*row_idx])[0].clone(), /* shallow */ + ); + + let mut all_tables = Vec::with_capacity(all_indices.len()); + for (component, index) in &all_indices { + all_tables.push(( + *component, + index.iter().map( + |row_idx| self.get(&[**component], &[*row_idx])[0].clone(), /* shallow */ + ), + )); + } + + // NOTE: The store guarantees that all indices, including the primary, are always the same + // length. + index + .times + .iter() + .zip(ids) + .map(move |(time, msg_ids)| { + // It's up to the application layer to guarantee the row-density of this + // index: this will panic otherwise. + let msg_ids = msg_ids.unwrap(); + + // Again, this is an artifact of the past: we need to deserialize the raw + // message IDs in order to create actual `MsgBundle`s. + // This'll go away with batching. + // + // NOTE: Reminder that we don't _really_ support splats, so a row with N + // instances will actually have N message IDs. + let msg_ids: Vec = TryIntoCollection::try_into_collection(msg_ids).unwrap(); + + // TODO(cmc): We should identify and merge back together messages spread across + // multiple timelines. + let timepoint = TimePoint::from([(timeline, (*time).into())]); + + let components = all_tables + .iter_mut() + .filter_map(|(component, index)| { + index + .next() + .flatten() + .map(|data| (**component, wrap_in_listarray(data))) + }) + .map(|(component, data)| ComponentBundle::new(component, data)) + .collect_vec(); + + MsgBundle { + // It's up to the application layer to guarantee the column-density of this + // index: this will panic otherwise. + msg_id: msg_ids[0], + entity_path: ent_path.clone(), + time_point: timepoint, + components, + } + }) + .collect_vec() + } +} diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 0165e3f2cd47..31358117b89e 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -23,7 +23,7 @@ impl DataStore { pub fn to_dataframe(&self) -> DataFrame { crate::profile_function!(); - const IS_TIMELESS_COL: &str = "_is_timeless"; + const TIMELESS_COL: &str = "_is_timeless"; let timeless_dfs = self.timeless_indices.values().map(|index| { let ent_path = index.ent_path.clone(); @@ -34,7 +34,7 @@ impl DataStore { // Add a column where every row is a boolean true (timeless) let timeless = { let timeless = BooleanArray::from(vec![Some(true); num_rows]).boxed(); - new_infallible_series(IS_TIMELESS_COL, timeless.as_ref(), num_rows) + new_infallible_series(TIMELESS_COL, timeless.as_ref(), num_rows) }; let df = df.with_column(timeless).unwrap(); // cannot fail @@ -89,25 +89,48 @@ impl DataStore { }) .collect(); + // Some internal functions of `polars` will panic if everything's empty: early exit. + if dfs.iter().all(|df| df.is_empty()) { + return DataFrame::empty(); + } + // Concatenate all indices together. // // This has to be done diagonally since these indices refer to different entities with // potentially wildly different sets of components and lengths. - let df = diag_concat_df(dfs.as_slice()) - // TODO(cmc): is there any way this can fail in this case? - .unwrap(); + // + // NOTE: The only way this can fail in this case is if all these frames are empty, because + // the store itself is empty, which we check just above. + let df = diag_concat_df(dfs.as_slice()).unwrap(); + // Arrange the columns in the order that makes the most sense as a user. let df = sort_df_columns(&df, self.config.store_insert_ids); - let has_insert_ids = df.column(DataStore::insert_id_key().as_str()).is_ok(); - let has_timeless = df.column(IS_TIMELESS_COL).is_ok(); + let has_timeless = df.column(TIMELESS_COL).is_ok(); + let insert_id_col = DataStore::insert_id_key().as_str(); + // Now we want to sort _the contents_ of the columns, and we need to make sure we do so in + // as stable a way as possible given our constraints: we cannot actually sort the + // component columns themselves as they are internally lists of their own. let (sort_cols, sort_orders): (Vec<_>, Vec<_>) = [ - has_timeless.then_some((IS_TIMELESS_COL, true)), - has_insert_ids.then_some((DataStore::insert_id_key().as_str(), false)), + df.column(TIMELESS_COL) + .is_ok() + .then_some((TIMELESS_COL, true)), // descending + df.column(insert_id_col) + .is_ok() + .then_some((insert_id_col, false)), // ascending ] .into_iter() .flatten() + // NOTE: Already properly arranged above, and already contains insert_id if needed. + .chain( + df.get_column_names() + .into_iter() + .filter(|col| *col != TIMELESS_COL) // we handle this one separately + .filter(|col| *col != insert_id_col) // we handle this one separately + .filter(|col| !col.starts_with("rerun.")) // lists cannot be sorted + .map(|col| (col, false)), // ascending order + ) .unzip(); let df = if !sort_cols.is_empty() { @@ -117,7 +140,7 @@ impl DataStore { }; if has_timeless { - df.drop(IS_TIMELESS_COL).unwrap() + df.drop(TIMELESS_COL).unwrap() } else { df } diff --git a/crates/re_arrow_store/src/store_stats.rs b/crates/re_arrow_store/src/store_stats.rs index f41a308d117b..49c17ef0ecb4 100644 --- a/crates/re_arrow_store/src/store_stats.rs +++ b/crates/re_arrow_store/src/store_stats.rs @@ -7,7 +7,7 @@ use crate::{ // TODO(cmc): count buckets? // TODO(cmc): compute incrementally once/if this becomes too expensive. -#[derive(Default, Debug)] +#[derive(Default, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct DataStoreStats { pub total_timeless_index_rows: u64, pub total_timeless_index_size_bytes: u64, diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 1f5917b92249..9b3324b7360b 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -283,8 +283,6 @@ fn latest_at() { } fn latest_at_impl(store: &mut DataStore) { - init_logs(); - let ent_path = EntityPath::from("this/that"); let frame0: TimeInt = 0.into(); diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs new file mode 100644 index 000000000000..ca6ac6ee39e1 --- /dev/null +++ b/crates/re_arrow_store/tests/dump.rs @@ -0,0 +1,158 @@ +//! Dumping a datastore to log messages and back. + +use std::sync::atomic::{AtomicBool, Ordering}; + +use itertools::Itertools; +use re_arrow_store::{test_bundle, DataStore, DataStoreStats, TimeInt}; +use re_log_types::{ + component_types::InstanceKey, + datagen::{ + build_frame_nr, build_log_time, build_some_colors, build_some_instances, build_some_point2d, + }, + msg_bundle::{Component as _, MsgBundle}, + EntityPath, MsgId, Time, +}; + +// --- Dump --- + +#[test] +fn dump() { + init_logs(); + + // TODO(cmc): Sprinkle some serialization roundtrips in main end-to-end test suites to account + // for feature intersection. + + for mut config in re_arrow_store::test_util::all_configs() { + // NOTE: insert IDs A) are irrelevant for our purposes and B) cannot possibly match at the + // moment since we don't auto-merge multi-timestamp rows back into a single message (yet?). + config.store_insert_ids = false; + + let mut store1 = DataStore::new(InstanceKey::name(), config.clone()); + let mut store2 = DataStore::new(InstanceKey::name(), config.clone()); + let mut store3 = DataStore::new(InstanceKey::name(), config.clone()); + + dump_impl(&mut store1, &mut store2, &mut store3); + } +} + +fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataStore) { + let frame1: TimeInt = 1.into(); + let frame2: TimeInt = 2.into(); + let frame3: TimeInt = 3.into(); + let frame4: TimeInt = 4.into(); + + let create_bundles = |ent_path| { + let ent_path = EntityPath::from(ent_path); + + let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); + let bundle1 = test_bundle!(ent_path @ [ + build_log_time(Time::now()), build_frame_nr(frame1), + ] => [instances1.clone(), colors1]); + + let points2 = build_some_point2d(3); + let bundle2 = test_bundle!(ent_path @ [ + build_log_time(Time::now()), build_frame_nr(frame2), + ] => [instances1, points2]); + + let points3 = build_some_point2d(10); + let bundle3 = test_bundle!(ent_path @ [ + build_log_time(Time::now()), build_frame_nr(frame3), + ] => [points3]); + + let colors4 = build_some_colors(5); + let bundle4 = test_bundle!(ent_path @ [ + build_log_time(Time::now()), build_frame_nr(frame4), + ] => [colors4]); + + vec![bundle1, bundle2, bundle3, bundle4] + }; + + // helper to insert a bundle both as a temporal and timeless payload + let insert = |store: &mut DataStore, bundle: &MsgBundle| { + // insert temporal + store.insert(bundle).unwrap(); + + // insert timeless + let mut bundle_timeless = bundle.clone(); + bundle_timeless.time_point = Default::default(); + store.insert(&bundle_timeless).unwrap(); + }; + + let ent_paths = ["this/that", "other", "yet/another/one"]; + let bundles = ent_paths + .iter() + .flat_map(|ent_path| create_bundles(*ent_path)) + .collect_vec(); + + // Fill the first store. + for bundle in &bundles { + insert(store1, bundle); + } + if let err @ Err(_) = store1.sanity_check() { + store1.sort_indices_if_needed(); + eprintln!("{store1}"); + err.unwrap(); + } + + // Dump the first store into the second one. + for bundle in store1.as_msg_bundles(MsgId::name()) { + store2.insert(&bundle).unwrap(); + } + if let err @ Err(_) = store2.sanity_check() { + store2.sort_indices_if_needed(); + eprintln!("{store2}"); + err.unwrap(); + } + + // Dump the second store into the third one. + for bundle in store2.as_msg_bundles(MsgId::name()) { + store3.insert(&bundle).unwrap(); + } + if let err @ Err(_) = store3.sanity_check() { + store3.sort_indices_if_needed(); + eprintln!("{store3}"); + err.unwrap(); + } + + let store1_df = store1.to_dataframe(); + let store2_df = store2.to_dataframe(); + let store3_df = store3.to_dataframe(); + assert!( + store1_df == store2_df, + "First & second stores differ:\n{store1_df}\n{store2_df}" + ); + assert!( + store1_df == store3_df, + "First & third stores differ:\n{store1_df}\n{store3_df}" + ); + + // NOTE: These are <= rather than == because..: + // - we're not yet identifying and filtering autogenerated cluster keys while dumping + // - we're not yet identifying and merging multi-timestamp rows + let store1_stats = DataStoreStats::from_store(store1); + let store2_stats = DataStoreStats::from_store(store2); + let store3_stats = DataStoreStats::from_store(store3); + assert!( + store1_stats <= store2_stats, + "First store should have <= amount of data of second store:\n\ + {store1_stats:#?}\n{store2_stats:#?}" + ); + assert!( + store2_stats <= store3_stats, + "Second store should have <= amount of data of third store:\n\ + {store2_stats:#?}\n{store3_stats:#?}" + ); +} + +// --- + +pub fn init_logs() { + static INIT: AtomicBool = AtomicBool::new(false); + + if INIT + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + re_log::setup_native_logging(); + } +}