From 1a90a7de9432a6721531f336e0e316be222dd982 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 7 Mar 2023 10:04:34 +0100 Subject: [PATCH 1/8] store_polars: fixed panic edge case --- crates/re_arrow_store/src/store_polars.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 0165e3f2cd47..8ee654bdd959 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -89,13 +89,19 @@ impl DataStore { }) .collect(); + // Some internal functions of `polars` will panic if everything's empty: early exit. + if dfs.iter().all(|df| df.is_empty()) { + return DataFrame::empty(); + } + // Concatenate all indices together. // // This has to be done diagonally since these indices refer to different entities with // potentially wildly different sets of components and lengths. - let df = diag_concat_df(dfs.as_slice()) - // TODO(cmc): is there any way this can fail in this case? - .unwrap(); + // + // NOTE: The only way this can fail in this case is if all these frames are empty, because + // the store itself is empty, which we check just above. + let df = diag_concat_df(dfs.as_slice()).unwrap(); let df = sort_df_columns(&df, self.config.store_insert_ids); From ad6bf6ab3830af5cc69cd2e953859b1b10711a17 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 7 Mar 2023 10:13:24 +0100 Subject: [PATCH 2/8] dump2bundle: first implementation (timeless only) --- crates/re_arrow_store/src/lib.rs | 1 + crates/re_arrow_store/src/store.rs | 2 + crates/re_arrow_store/src/store_dump.rs | 97 +++++++++++++++ crates/re_arrow_store/src/test_util.rs | 128 ++++++++++---------- crates/re_arrow_store/tests/data_store.rs | 2 - crates/re_arrow_store/tests/dump.rs | 140 ++++++++++++++++++++++ 6 files changed, 304 insertions(+), 66 deletions(-) create mode 100644 crates/re_arrow_store/src/store_dump.rs create mode 100644 crates/re_arrow_store/tests/dump.rs diff --git a/crates/re_arrow_store/src/lib.rs b/crates/re_arrow_store/src/lib.rs index c32ebf1d473c..454d16dd3725 100644 --- a/crates/re_arrow_store/src/lib.rs +++ b/crates/re_arrow_store/src/lib.rs @@ -16,6 +16,7 @@ mod arrow_util; mod store; +mod store_dump; mod store_gc; mod store_read; mod store_stats; diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index c31d3be17ae4..6265fb0c6def 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -230,6 +230,8 @@ pub struct DataStore { /// Maps `MsgId`s to some metadata (just timepoints at the moment). /// /// `BTreeMap` because of garbage collection. + // + // TODO: this isn't garbage collected, is it? pub(crate) messages: BTreeMap, /// Used to cache auto-generated cluster components, i.e. `[0]`, `[0, 1]`, `[0, 1, 2]`, etc diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs new file mode 100644 index 000000000000..f17c40549557 --- /dev/null +++ b/crates/re_arrow_store/src/store_dump.rs @@ -0,0 +1,97 @@ +// TODO: all you need to dump the store as log messages + +use itertools::Itertools as _; +use re_log_types::{ + external::arrow2_convert::deserialize::{arrow_array_deserialize_iterator, TryIntoCollection}, + msg_bundle::{wrap_in_listarray, Component, ComponentBundle, MsgBundle}, + MsgId, TimePoint, +}; + +use crate::DataStore; + +// --- + +impl DataStore { + // TODO + pub fn as_msg_bundles(&self) -> impl Iterator + '_ { + // TODO: should we just kmerge everything? + + let msg_bundles = self.timeless_indices.values().map(|index| { + // pub msg_id: MsgId, + // pub components: Vec, + + // pub entity_path: EntityPath, + let ent_path = index.ent_path.clone(); + + // pub time_point: TimePoint, + let timepoint = TimePoint::timeless(); + + // TODO: could compact at this point + // TODO: that's once we support batching tho + + let all_indices = index + .indices + .iter() + // TODO: MsgId should be defined by the caller + // .filter(|(component, _)| { + // ![MsgId::name(), DataStore::insert_id_key()].contains(component) + // }) + .filter(|(component, _)| ![DataStore::insert_id_key()].contains(component)) + .collect_vec(); + + let msg_ids_index = &index.indices[&MsgId::name()]; // TODO + let msg_ids_table = &self.timeless_components[&MsgId::name()]; // TODO + let mut msg_ids = msg_ids_index + .iter() + .map(|row_idx| row_idx.map(|row_idx| msg_ids_table.get(row_idx))); + + // TODO: not so much indices at this point... + let mut indices = Vec::with_capacity(all_indices.len()); + for (i, (component, index)) in all_indices.iter().enumerate() { + let table = &self.timeless_components[*component]; // TODO + indices.push(( + *component, + index + .iter() + .map(|row_idx| row_idx.map(|row_idx| table.get(row_idx))), + )); + } + + let mut msg_bundles = Vec::with_capacity(index.num_rows as _); + + // TODO: shouldnt embed autogenerated keys.. we know it's an autogenerated one if its + // row index can be found in the cluster_comp_cache + + for i in 0..index.num_rows as usize { + // TODO + let msg_id = msg_ids.next().flatten().unwrap(); + // TODO: deserializing this doesn't make much sense + // NOTE: Reminder that we don't _really_ support splats, so a row with N instances + // will actually have N message IDs. + let msg_id: Vec = TryIntoCollection::try_into_collection(msg_id).unwrap(); + + let components = indices + .iter_mut() + .filter_map(|(component, index)| { + index + .next() + .flatten() + .map(|data| (**component, wrap_in_listarray(data))) + }) + .map(|(component, data)| ComponentBundle::new(component, data)) + .collect_vec(); + + msg_bundles.push(MsgBundle { + msg_id: msg_id[0], + entity_path: ent_path.clone(), + time_point: timepoint.clone(), + components, + }); + } + + msg_bundles // TODO + }); + + msg_bundles.kmerge_by(|bundle1, bundle2| bundle1.msg_id < bundle2.msg_id) + } +} diff --git a/crates/re_arrow_store/src/test_util.rs b/crates/re_arrow_store/src/test_util.rs index aebccc2b3243..586d780f9196 100644 --- a/crates/re_arrow_store/src/test_util.rs +++ b/crates/re_arrow_store/src/test_util.rs @@ -28,74 +28,74 @@ macro_rules! test_bundle { pub fn all_configs() -> impl Iterator { const COMPONENT_CONFIGS: &[DataStoreConfig] = &[ DataStoreConfig::DEFAULT, - DataStoreConfig { - component_bucket_nb_rows: 0, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - component_bucket_nb_rows: 1, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - component_bucket_nb_rows: 2, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - component_bucket_nb_rows: 3, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - component_bucket_size_bytes: 0, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - component_bucket_size_bytes: 16, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - component_bucket_size_bytes: 32, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - component_bucket_size_bytes: 64, - ..DataStoreConfig::DEFAULT - }, + // DataStoreConfig { + // component_bucket_nb_rows: 0, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // component_bucket_nb_rows: 1, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // component_bucket_nb_rows: 2, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // component_bucket_nb_rows: 3, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // component_bucket_size_bytes: 0, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // component_bucket_size_bytes: 16, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // component_bucket_size_bytes: 32, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // component_bucket_size_bytes: 64, + // ..DataStoreConfig::DEFAULT + // }, ]; const INDEX_CONFIGS: &[DataStoreConfig] = &[ DataStoreConfig::DEFAULT, - DataStoreConfig { - index_bucket_nb_rows: 0, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - index_bucket_nb_rows: 1, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - index_bucket_nb_rows: 2, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - index_bucket_nb_rows: 3, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - index_bucket_size_bytes: 0, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - index_bucket_size_bytes: 16, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - index_bucket_size_bytes: 32, - ..DataStoreConfig::DEFAULT - }, - DataStoreConfig { - index_bucket_size_bytes: 64, - ..DataStoreConfig::DEFAULT - }, + // DataStoreConfig { + // index_bucket_nb_rows: 0, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // index_bucket_nb_rows: 1, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // index_bucket_nb_rows: 2, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // index_bucket_nb_rows: 3, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // index_bucket_size_bytes: 0, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // index_bucket_size_bytes: 16, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // index_bucket_size_bytes: 32, + // ..DataStoreConfig::DEFAULT + // }, + // DataStoreConfig { + // index_bucket_size_bytes: 64, + // ..DataStoreConfig::DEFAULT + // }, ]; COMPONENT_CONFIGS.iter().flat_map(|comp| { INDEX_CONFIGS.iter().map(|idx| DataStoreConfig { diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 1f5917b92249..9b3324b7360b 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -283,8 +283,6 @@ fn latest_at() { } fn latest_at_impl(store: &mut DataStore) { - init_logs(); - let ent_path = EntityPath::from("this/that"); let frame0: TimeInt = 0.into(); diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs new file mode 100644 index 000000000000..ee922463f2ab --- /dev/null +++ b/crates/re_arrow_store/tests/dump.rs @@ -0,0 +1,140 @@ +//! Dumping a datastore to log messages and back. + +use std::sync::atomic::{AtomicBool, Ordering}; + +use arrow2::array::{Array, UInt64Array}; +use itertools::Itertools; +use nohash_hasher::IntMap; +use polars_core::{prelude::*, series::Series}; +use polars_ops::prelude::DataFrameJoinOps; +use rand::Rng; +use re_arrow_store::{ + polars_util, test_bundle, DataStore, DataStoreConfig, DataStoreStats, GarbageCollectionTarget, + LatestAtQuery, RangeQuery, TimeInt, TimeRange, +}; +use re_log_types::{ + component_types::{ColorRGBA, InstanceKey, Point2D, Rect2D}, + datagen::{ + build_frame_nr, build_some_colors, build_some_instances, build_some_instances_from, + build_some_point2d, build_some_rects, + }, + external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, + msg_bundle::{wrap_in_listarray, Component as _, MsgBundle}, + ComponentName, EntityPath, MsgId, TimeType, Timeline, +}; + +// --- LatestAt --- + +#[test] +fn dump() { + init_logs(); + + // config + // dump + // gc + + for config in re_arrow_store::test_util::all_configs() { + let mut store1 = DataStore::new(InstanceKey::name(), config.clone()); + let mut store2 = DataStore::new(InstanceKey::name(), config.clone()); + let mut store3 = DataStore::new(InstanceKey::name(), config.clone()); + + dump_impl(&mut store1, &mut store2, &mut store3); + // store1.gc( + // GarbageCollectionTarget::DropAtLeastPercentage(1.0), + // Timeline::new("frame_nr", TimeType::Sequence), + // MsgId::name(), + // ); + // dump_impl(&mut store1, &mut store2); + } +} + +fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataStore) { + let frame0: TimeInt = 0.into(); + let frame1: TimeInt = 1.into(); + let frame2: TimeInt = 2.into(); + let frame3: TimeInt = 3.into(); + let frame4: TimeInt = 4.into(); + + let create_bundles = |store: &mut DataStore, ent_path| { + let ent_path = EntityPath::from(ent_path); + + let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); + let bundle1 = + test_bundle!(ent_path @ [build_frame_nr(frame1)] => [instances1.clone(), colors1]); + + let points2 = build_some_point2d(3); + let bundle2 = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [instances1, points2]); + + let points3 = build_some_point2d(10); + let bundle3 = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [points3]); + + let colors4 = build_some_colors(5); + let bundle4 = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [colors4]); + + vec![bundle1, bundle2, bundle3, bundle4] + }; + + // helper to insert a bundle both as a temporal and timeless payload + let insert = |store: &mut DataStore, bundle: &MsgBundle| { + // insert temporal + // TODO + // store.insert(bundle).unwrap(); + + // insert timeless + let mut bundle_timeless = bundle.clone(); + bundle_timeless.time_point = Default::default(); + store.insert(&bundle_timeless).unwrap(); + }; + + let ent_paths = ["this/that", "other", "yet/another/one"]; + let bundles = ent_paths + .iter() + .flat_map(|ent_path| create_bundles(store1, *ent_path)) + .collect_vec(); + + for bundle in &bundles { + insert(store1, bundle); + } + + if let err @ Err(_) = store1.sanity_check() { + store1.sort_indices_if_needed(); + eprintln!("{store1}"); + err.unwrap(); + } + + // Dump the first store into the second one. + for bundle in store1.as_msg_bundles() { + insert(store2, &bundle); + } + + // Dump the second store into the third one. + for bundle in store2.as_msg_bundles() { + insert(store3, &bundle); + } + + let store1_df = store1.to_dataframe(); + let store2_df = store2.to_dataframe(); + let store3_df = store3.to_dataframe(); + + assert_eq!( + store1_df, store2_df, + "First & second stores differ:\n{store1_df}\n{store2_df}" + ); + assert_eq!( + store1_df, store3_df, + "First & third stores differ:\n{store1_df}\n{store3_df}" + ); +} + +// --- + +pub fn init_logs() { + static INIT: AtomicBool = AtomicBool::new(false); + + if INIT + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + re_log::setup_native_logging(); + } +} From ba8f5653ec8acbcc3909779aa0d8252ae49276b8 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 7 Mar 2023 10:44:38 +0100 Subject: [PATCH 3/8] productionizing timeless dumps --- crates/re_arrow_store/src/store_dump.rs | 76 +++++++++++++++---------- crates/re_arrow_store/tests/dump.rs | 7 ++- 2 files changed, 51 insertions(+), 32 deletions(-) diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs index f17c40549557..cf6f291255ca 100644 --- a/crates/re_arrow_store/src/store_dump.rs +++ b/crates/re_arrow_store/src/store_dump.rs @@ -4,7 +4,7 @@ use itertools::Itertools as _; use re_log_types::{ external::arrow2_convert::deserialize::{arrow_array_deserialize_iterator, TryIntoCollection}, msg_bundle::{wrap_in_listarray, Component, ComponentBundle, MsgBundle}, - MsgId, TimePoint, + ComponentName, MsgId, TimePoint, }; use crate::DataStore; @@ -13,9 +13,15 @@ use crate::DataStore; impl DataStore { // TODO - pub fn as_msg_bundles(&self) -> impl Iterator + '_ { - // TODO: should we just kmerge everything? + /// Panics if no index is present for `id_key` or if it isn't fully dense (i.e.: no empty + /// rows and no empty columns!). + pub fn as_msg_bundles(&self, id_key: ComponentName) -> impl Iterator + '_ { + self.dump_timeless_indices(id_key) + } + /// Panics if no index is present for `id_key` or if it isn't fully dense (i.e.: no empty + /// rows and no empty columns!). + fn dump_timeless_indices(&self, id_key: ComponentName) -> impl Iterator + '_ { let msg_bundles = self.timeless_indices.values().map(|index| { // pub msg_id: MsgId, // pub components: Vec, @@ -29,27 +35,34 @@ impl DataStore { // TODO: could compact at this point // TODO: that's once we support batching tho + // Gather all component indices, except for the internal `insert_id` one. let all_indices = index .indices .iter() - // TODO: MsgId should be defined by the caller - // .filter(|(component, _)| { - // ![MsgId::name(), DataStore::insert_id_key()].contains(component) - // }) - .filter(|(component, _)| ![DataStore::insert_id_key()].contains(component)) + // Definitely should _not_ dump insert IDs + .filter(|(component, _)| **component != DataStore::insert_id_key()) .collect_vec(); - let msg_ids_index = &index.indices[&MsgId::name()]; // TODO - let msg_ids_table = &self.timeless_components[&MsgId::name()]; // TODO - let mut msg_ids = msg_ids_index + // Find the index used to identify messages (i.e. `MsgId` in the case of Rerun). + // + // We currently need to fetch those to be able to create the final `MsgBundle`. + // This is an artifact of the past and should go away as we implement batch support. + // + // Reminder: the store is not aware of message identification, this entirely up to + // the application layer. + // + // It's up to the application layer to guarantee the presence of this index: this will + // panic otherwise. + let ids_index = &index.indices[&id_key]; + let ids_table = &self.timeless_components[&id_key]; + let ids = ids_index .iter() - .map(|row_idx| row_idx.map(|row_idx| msg_ids_table.get(row_idx))); + .map(|row_idx| row_idx.map(|row_idx| ids_table.get(row_idx))); - // TODO: not so much indices at this point... - let mut indices = Vec::with_capacity(all_indices.len()); - for (i, (component, index)) in all_indices.iter().enumerate() { + let mut all_tables = Vec::with_capacity(all_indices.len()); + for (component, index) in &all_indices { let table = &self.timeless_components[*component]; // TODO - indices.push(( + all_tables.push(( *component, index .iter() @@ -57,20 +70,23 @@ impl DataStore { )); } - let mut msg_bundles = Vec::with_capacity(index.num_rows as _); - // TODO: shouldnt embed autogenerated keys.. we know it's an autogenerated one if its // row index can be found in the cluster_comp_cache - for i in 0..index.num_rows as usize { - // TODO - let msg_id = msg_ids.next().flatten().unwrap(); - // TODO: deserializing this doesn't make much sense + ids.map(move |msg_ids| { + // It's up to the application layer to guarantee the row-density of this + // index: this will panic otherwise. + let msg_ids = msg_ids.unwrap(); + + // Again, this is an artifact of the past: we need to deserialize the raw message + // IDs in order to create actual `MsgBundle`s. + // This'll go away with batching. + // // NOTE: Reminder that we don't _really_ support splats, so a row with N instances // will actually have N message IDs. - let msg_id: Vec = TryIntoCollection::try_into_collection(msg_id).unwrap(); + let msg_ids: Vec = TryIntoCollection::try_into_collection(msg_ids).unwrap(); - let components = indices + let components = all_tables .iter_mut() .filter_map(|(component, index)| { index @@ -81,15 +97,15 @@ impl DataStore { .map(|(component, data)| ComponentBundle::new(component, data)) .collect_vec(); - msg_bundles.push(MsgBundle { - msg_id: msg_id[0], + MsgBundle { + // It's up to the application layer to guarantee the column-density of this + // index: this will panic otherwise. + msg_id: msg_ids[0], entity_path: ent_path.clone(), time_point: timepoint.clone(), components, - }); - } - - msg_bundles // TODO + } + }) }); msg_bundles.kmerge_by(|bundle1, bundle2| bundle1.msg_id < bundle2.msg_id) diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs index ee922463f2ab..8810a607b448 100644 --- a/crates/re_arrow_store/tests/dump.rs +++ b/crates/re_arrow_store/tests/dump.rs @@ -103,12 +103,12 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt } // Dump the first store into the second one. - for bundle in store1.as_msg_bundles() { + for bundle in store1.as_msg_bundles(MsgId::name()) { insert(store2, &bundle); } // Dump the second store into the third one. - for bundle in store2.as_msg_bundles() { + for bundle in store2.as_msg_bundles(MsgId::name()) { insert(store3, &bundle); } @@ -116,6 +116,9 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt let store2_df = store2.to_dataframe(); let store3_df = store3.to_dataframe(); + dbg!(DataStoreStats::from_store(store1)); + dbg!(DataStoreStats::from_store(store3)); + assert_eq!( store1_df, store2_df, "First & second stores differ:\n{store1_df}\n{store2_df}" From b7154578e77c08ca55f41718b92cee72bab44b8e Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 7 Mar 2023 11:05:16 +0100 Subject: [PATCH 4/8] more clean up --- crates/re_arrow_store/src/store_dump.rs | 28 +++++++++++------------- crates/re_arrow_store/src/store_stats.rs | 2 +- crates/re_arrow_store/tests/dump.rs | 20 +++++++++++++---- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs index cf6f291255ca..1531e5fb2cde 100644 --- a/crates/re_arrow_store/src/store_dump.rs +++ b/crates/re_arrow_store/src/store_dump.rs @@ -12,29 +12,27 @@ use crate::DataStore; // --- impl DataStore { - // TODO + /// Serializes the entire datastore into an iterator of `MsgBundle`s. + /// + /// Feeding those `MsgBundle`s back into a brand new `DataStore` is guaranteed to end up in + /// the same exact state from a public API standpoint. + /// /// Panics if no index is present for `id_key` or if it isn't fully dense (i.e.: no empty - /// rows and no empty columns!). + /// rows and no empty columns either!). pub fn as_msg_bundles(&self, id_key: ComponentName) -> impl Iterator + '_ { + // TODO(cmc): This is an obvious entrypoint for all kinds of compaction tasks. self.dump_timeless_indices(id_key) } + /// Dumps the timeless indices & tables. + /// /// Panics if no index is present for `id_key` or if it isn't fully dense (i.e.: no empty - /// rows and no empty columns!). + /// rows and no empty columns either!). fn dump_timeless_indices(&self, id_key: ComponentName) -> impl Iterator + '_ { let msg_bundles = self.timeless_indices.values().map(|index| { - // pub msg_id: MsgId, - // pub components: Vec, - - // pub entity_path: EntityPath, let ent_path = index.ent_path.clone(); - - // pub time_point: TimePoint, let timepoint = TimePoint::timeless(); - // TODO: could compact at this point - // TODO: that's once we support batching tho - // Gather all component indices, except for the internal `insert_id` one. let all_indices = index .indices @@ -61,7 +59,8 @@ impl DataStore { let mut all_tables = Vec::with_capacity(all_indices.len()); for (component, index) in &all_indices { - let table = &self.timeless_components[*component]; // TODO + // NOTE: the store guarantees that all matching tables exist. + let table = &self.timeless_components[*component]; all_tables.push(( *component, index @@ -70,8 +69,7 @@ impl DataStore { )); } - // TODO: shouldnt embed autogenerated keys.. we know it's an autogenerated one if its - // row index can be found in the cluster_comp_cache + // TODO(cmc): This shouldn't dump cluster keys that were autogenerated. ids.map(move |msg_ids| { // It's up to the application layer to guarantee the row-density of this diff --git a/crates/re_arrow_store/src/store_stats.rs b/crates/re_arrow_store/src/store_stats.rs index f41a308d117b..49c17ef0ecb4 100644 --- a/crates/re_arrow_store/src/store_stats.rs +++ b/crates/re_arrow_store/src/store_stats.rs @@ -7,7 +7,7 @@ use crate::{ // TODO(cmc): count buckets? // TODO(cmc): compute incrementally once/if this becomes too expensive. -#[derive(Default, Debug)] +#[derive(Default, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct DataStoreStats { pub total_timeless_index_rows: u64, pub total_timeless_index_size_bytes: u64, diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs index 8810a607b448..6dc2d1f06739 100644 --- a/crates/re_arrow_store/tests/dump.rs +++ b/crates/re_arrow_store/tests/dump.rs @@ -115,10 +115,6 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt let store1_df = store1.to_dataframe(); let store2_df = store2.to_dataframe(); let store3_df = store3.to_dataframe(); - - dbg!(DataStoreStats::from_store(store1)); - dbg!(DataStoreStats::from_store(store3)); - assert_eq!( store1_df, store2_df, "First & second stores differ:\n{store1_df}\n{store2_df}" @@ -127,6 +123,22 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt store1_df, store3_df, "First & third stores differ:\n{store1_df}\n{store3_df}" ); + + // NOTE: These are <= rather than == because we're not yet identifying and filtering + // autogenerated cluster keys while dumping. + let store1_stats = DataStoreStats::from_store(store1); + let store2_stats = DataStoreStats::from_store(store2); + let store3_stats = DataStoreStats::from_store(store3); + assert!( + store1_stats <= store2_stats, + "First store should have <= amount of data of second store:\n\ + {store1_stats:?}\n{store2_stats:?}" + ); + assert!( + store2_stats <= store3_stats, + "Second store should have <= amount of data of third store:\n\ + {store2_stats:?}\n{store3_stats:?}" + ); } // --- From cf0312b4aa73cecf3c8444f9cf882403ab29ae4a Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 7 Mar 2023 11:35:34 +0100 Subject: [PATCH 5/8] first pass at dumping temporal stuff --- crates/re_arrow_store/src/store_dump.rs | 126 +++++++++++++++++++++++- crates/re_arrow_store/tests/dump.rs | 5 +- 2 files changed, 124 insertions(+), 7 deletions(-) diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs index 1531e5fb2cde..d0879e1b71cd 100644 --- a/crates/re_arrow_store/src/store_dump.rs +++ b/crates/re_arrow_store/src/store_dump.rs @@ -4,10 +4,10 @@ use itertools::Itertools as _; use re_log_types::{ external::arrow2_convert::deserialize::{arrow_array_deserialize_iterator, TryIntoCollection}, msg_bundle::{wrap_in_listarray, Component, ComponentBundle, MsgBundle}, - ComponentName, MsgId, TimePoint, + ComponentName, EntityPath, MsgId, TimePoint, }; -use crate::DataStore; +use crate::{DataStore, IndexBucket}; // --- @@ -21,7 +21,12 @@ impl DataStore { /// rows and no empty columns either!). pub fn as_msg_bundles(&self, id_key: ComponentName) -> impl Iterator + '_ { // TODO(cmc): This is an obvious entrypoint for all kinds of compaction tasks. - self.dump_timeless_indices(id_key) + // TODO(cmc): This shouldn't dump cluster keys that were autogenerated. + + let timeless = self.dump_timeless_indices(id_key); + let temporal = self.dump_temporal_indices(id_key); + + timeless.merge_by(temporal, |bundle1, bundle2| bundle1.msg_id < bundle2.msg_id) } /// Dumps the timeless indices & tables. @@ -69,8 +74,6 @@ impl DataStore { )); } - // TODO(cmc): This shouldn't dump cluster keys that were autogenerated. - ids.map(move |msg_ids| { // It's up to the application layer to guarantee the row-density of this // index: this will panic otherwise. @@ -108,4 +111,117 @@ impl DataStore { msg_bundles.kmerge_by(|bundle1, bundle2| bundle1.msg_id < bundle2.msg_id) } + + /// Dumps the temporal indices & tables. + /// + /// Panics if no index is present for `id_key` or if it isn't fully dense (i.e.: no empty + /// rows and no empty columns either!). + fn dump_temporal_indices(&self, id_key: ComponentName) -> impl Iterator + '_ { + let msg_bundles = self.indices.values().flat_map(|index| { + index.buckets.values().map(move |bucket| { + self.dump_temporal_bucket(id_key, bucket, index.ent_path.clone()) + }) + }); + + msg_bundles.kmerge_by(|bundle1, bundle2| bundle1.msg_id < bundle2.msg_id) + } + + /// Dumps one specific temporal bucket. + /// + /// Panics if no index is present for `id_key` or if it isn't fully dense (i.e.: no empty + /// rows and no empty columns either!). + // + // TODO(cmc): Shouldn't need to collect into a vec, but locks in the store are a mess rn. + fn dump_temporal_bucket( + &self, + id_key: ComponentName, + bucket: &IndexBucket, + ent_path: EntityPath, + ) -> Vec { + let timeline = bucket.timeline; + + bucket.sort_indices_if_needed(); + + let index = bucket.indices.read(); + + // Gather all component indices, except for the internal `insert_id` one. + let all_indices = index + .indices + .iter() + // Definitely should _not_ dump insert IDs + .filter(|(component, _)| **component != DataStore::insert_id_key()) + .collect_vec(); + + // Find the index used to identify messages (i.e. `MsgId` in the case of Rerun). + // + // We currently need to fetch those to be able to create the final `MsgBundle`. + // This is an artifact of the past and should go away as we implement batch support. + // + // Reminder: the store is not aware of message identification, this entirely up to + // the application layer. + // + // It's up to the application layer to guarantee the presence of this index: + // this will panic otherwise. + let ids_index = &index.indices[&id_key]; + let ids_table = &self.timeless_components[&id_key]; + let ids = ids_index + .iter() + .map(|row_idx| row_idx.map(|row_idx| ids_table.get(row_idx))); + + let mut all_tables = Vec::with_capacity(all_indices.len()); + for (component, index) in &all_indices { + // NOTE: the store guarantees that all matching tables exist. + let table = &self.timeless_components[*component]; + all_tables.push(( + *component, + index + .iter() + .map(|row_idx| row_idx.map(|row_idx| table.get(row_idx))), + )); + } + + // NOTE: The store guarantees that all indices, including the primary, are always the same + // length. + index + .times + .iter() + .zip(ids) + .map(move |(time, msg_ids)| { + // It's up to the application layer to guarantee the row-density of this + // index: this will panic otherwise. + let msg_ids = msg_ids.unwrap(); + + // Again, this is an artifact of the past: we need to deserialize the raw + // message IDs in order to create actual `MsgBundle`s. + // This'll go away with batching. + // + // NOTE: Reminder that we don't _really_ support splats, so a row with N + // instances will actually have N message IDs. + let msg_ids: Vec = TryIntoCollection::try_into_collection(msg_ids).unwrap(); + + // TODO: this will probably dupe for multi timepoint data + let timepoint = TimePoint::from([(timeline, (*time).into())]); + + let components = all_tables + .iter_mut() + .filter_map(|(component, index)| { + index + .next() + .flatten() + .map(|data| (**component, wrap_in_listarray(data))) + }) + .map(|(component, data)| ComponentBundle::new(component, data)) + .collect_vec(); + + MsgBundle { + // It's up to the application layer to guarantee the column-density of this + // index: this will panic otherwise. + msg_id: msg_ids[0], + entity_path: ent_path.clone(), + time_point: timepoint, + components, + } + }) + .collect_vec() + } } diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs index 6dc2d1f06739..eb1b13335e5d 100644 --- a/crates/re_arrow_store/tests/dump.rs +++ b/crates/re_arrow_store/tests/dump.rs @@ -29,6 +29,7 @@ use re_log_types::{ fn dump() { init_logs(); + // TODO: // config // dump // gc @@ -55,6 +56,7 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt let frame3: TimeInt = 3.into(); let frame4: TimeInt = 4.into(); + // TODO: real important to add an extra timeline here let create_bundles = |store: &mut DataStore, ent_path| { let ent_path = EntityPath::from(ent_path); @@ -77,8 +79,7 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt // helper to insert a bundle both as a temporal and timeless payload let insert = |store: &mut DataStore, bundle: &MsgBundle| { // insert temporal - // TODO - // store.insert(bundle).unwrap(); + store.insert(bundle).unwrap(); // insert timeless let mut bundle_timeless = bundle.clone(); From 995bc8783e2ce76e4441b80b1db531ea0121365d Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 7 Mar 2023 14:11:50 +0100 Subject: [PATCH 6/8] keep those polar sorts as stable as we can --- crates/re_arrow_store/src/store_polars.rs | 31 ++++++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 8ee654bdd959..31358117b89e 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -23,7 +23,7 @@ impl DataStore { pub fn to_dataframe(&self) -> DataFrame { crate::profile_function!(); - const IS_TIMELESS_COL: &str = "_is_timeless"; + const TIMELESS_COL: &str = "_is_timeless"; let timeless_dfs = self.timeless_indices.values().map(|index| { let ent_path = index.ent_path.clone(); @@ -34,7 +34,7 @@ impl DataStore { // Add a column where every row is a boolean true (timeless) let timeless = { let timeless = BooleanArray::from(vec![Some(true); num_rows]).boxed(); - new_infallible_series(IS_TIMELESS_COL, timeless.as_ref(), num_rows) + new_infallible_series(TIMELESS_COL, timeless.as_ref(), num_rows) }; let df = df.with_column(timeless).unwrap(); // cannot fail @@ -103,17 +103,34 @@ impl DataStore { // the store itself is empty, which we check just above. let df = diag_concat_df(dfs.as_slice()).unwrap(); + // Arrange the columns in the order that makes the most sense as a user. let df = sort_df_columns(&df, self.config.store_insert_ids); - let has_insert_ids = df.column(DataStore::insert_id_key().as_str()).is_ok(); - let has_timeless = df.column(IS_TIMELESS_COL).is_ok(); + let has_timeless = df.column(TIMELESS_COL).is_ok(); + let insert_id_col = DataStore::insert_id_key().as_str(); + // Now we want to sort _the contents_ of the columns, and we need to make sure we do so in + // as stable a way as possible given our constraints: we cannot actually sort the + // component columns themselves as they are internally lists of their own. let (sort_cols, sort_orders): (Vec<_>, Vec<_>) = [ - has_timeless.then_some((IS_TIMELESS_COL, true)), - has_insert_ids.then_some((DataStore::insert_id_key().as_str(), false)), + df.column(TIMELESS_COL) + .is_ok() + .then_some((TIMELESS_COL, true)), // descending + df.column(insert_id_col) + .is_ok() + .then_some((insert_id_col, false)), // ascending ] .into_iter() .flatten() + // NOTE: Already properly arranged above, and already contains insert_id if needed. + .chain( + df.get_column_names() + .into_iter() + .filter(|col| *col != TIMELESS_COL) // we handle this one separately + .filter(|col| *col != insert_id_col) // we handle this one separately + .filter(|col| !col.starts_with("rerun.")) // lists cannot be sorted + .map(|col| (col, false)), // ascending order + ) .unzip(); let df = if !sort_cols.is_empty() { @@ -123,7 +140,7 @@ impl DataStore { }; if has_timeless { - df.drop(IS_TIMELESS_COL).unwrap() + df.drop(TIMELESS_COL).unwrap() } else { df } From 83bc3f3f3eb33bec9c13f1b4985ec67b66fe0fd1 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 7 Mar 2023 14:42:33 +0100 Subject: [PATCH 7/8] dealing with all edge cases --- crates/re_arrow_store/src/store_dump.rs | 46 ++++++------ crates/re_arrow_store/tests/dump.rs | 96 +++++++++++++------------ 2 files changed, 72 insertions(+), 70 deletions(-) diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs index d0879e1b71cd..322a8a4ebf2a 100644 --- a/crates/re_arrow_store/src/store_dump.rs +++ b/crates/re_arrow_store/src/store_dump.rs @@ -2,8 +2,8 @@ use itertools::Itertools as _; use re_log_types::{ - external::arrow2_convert::deserialize::{arrow_array_deserialize_iterator, TryIntoCollection}, - msg_bundle::{wrap_in_listarray, Component, ComponentBundle, MsgBundle}, + external::arrow2_convert::deserialize::TryIntoCollection, + msg_bundle::{wrap_in_listarray, ComponentBundle, MsgBundle}, ComponentName, EntityPath, MsgId, TimePoint, }; @@ -26,7 +26,9 @@ impl DataStore { let timeless = self.dump_timeless_indices(id_key); let temporal = self.dump_temporal_indices(id_key); - timeless.merge_by(temporal, |bundle1, bundle2| bundle1.msg_id < bundle2.msg_id) + timeless.merge_by(temporal, |bundle1, bundle2| { + bundle1.msg_id <= bundle2.msg_id + }) } /// Dumps the timeless indices & tables. @@ -57,20 +59,18 @@ impl DataStore { // It's up to the application layer to guarantee the presence of this index: this will // panic otherwise. let ids_index = &index.indices[&id_key]; - let ids_table = &self.timeless_components[&id_key]; - let ids = ids_index - .iter() - .map(|row_idx| row_idx.map(|row_idx| ids_table.get(row_idx))); + let ids = + ids_index.iter().map( + move |row_idx| self.get(&[id_key], &[*row_idx])[0].clone(), /* shallow */ + ); let mut all_tables = Vec::with_capacity(all_indices.len()); for (component, index) in &all_indices { - // NOTE: the store guarantees that all matching tables exist. - let table = &self.timeless_components[*component]; all_tables.push(( *component, - index - .iter() - .map(|row_idx| row_idx.map(|row_idx| table.get(row_idx))), + index.iter().map( + |row_idx| self.get(&[**component], &[*row_idx])[0].clone(), /* shallow */ + ), )); } @@ -109,7 +109,7 @@ impl DataStore { }) }); - msg_bundles.kmerge_by(|bundle1, bundle2| bundle1.msg_id < bundle2.msg_id) + msg_bundles.kmerge_by(|bundle1, bundle2| bundle1.msg_id <= bundle2.msg_id) } /// Dumps the temporal indices & tables. @@ -123,7 +123,7 @@ impl DataStore { }) }); - msg_bundles.kmerge_by(|bundle1, bundle2| bundle1.msg_id < bundle2.msg_id) + msg_bundles.kmerge_by(|bundle1, bundle2| bundle1.msg_id <= bundle2.msg_id) } /// Dumps one specific temporal bucket. @@ -163,20 +163,17 @@ impl DataStore { // It's up to the application layer to guarantee the presence of this index: // this will panic otherwise. let ids_index = &index.indices[&id_key]; - let ids_table = &self.timeless_components[&id_key]; - let ids = ids_index - .iter() - .map(|row_idx| row_idx.map(|row_idx| ids_table.get(row_idx))); + let ids = ids_index.iter().map( + move |row_idx| self.get(&[id_key], &[*row_idx])[0].clone(), /* shallow */ + ); let mut all_tables = Vec::with_capacity(all_indices.len()); for (component, index) in &all_indices { - // NOTE: the store guarantees that all matching tables exist. - let table = &self.timeless_components[*component]; all_tables.push(( *component, - index - .iter() - .map(|row_idx| row_idx.map(|row_idx| table.get(row_idx))), + index.iter().map( + |row_idx| self.get(&[**component], &[*row_idx])[0].clone(), /* shallow */ + ), )); } @@ -199,7 +196,8 @@ impl DataStore { // instances will actually have N message IDs. let msg_ids: Vec = TryIntoCollection::try_into_collection(msg_ids).unwrap(); - // TODO: this will probably dupe for multi timepoint data + // TODO(cmc): We should identify and merge back together messages spread across + // multiple timelines. let timepoint = TimePoint::from([(timeline, (*time).into())]); let components = all_tables diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs index eb1b13335e5d..70409c3691a0 100644 --- a/crates/re_arrow_store/tests/dump.rs +++ b/crates/re_arrow_store/tests/dump.rs @@ -2,50 +2,37 @@ use std::sync::atomic::{AtomicBool, Ordering}; -use arrow2::array::{Array, UInt64Array}; use itertools::Itertools; -use nohash_hasher::IntMap; -use polars_core::{prelude::*, series::Series}; use polars_ops::prelude::DataFrameJoinOps; -use rand::Rng; -use re_arrow_store::{ - polars_util, test_bundle, DataStore, DataStoreConfig, DataStoreStats, GarbageCollectionTarget, - LatestAtQuery, RangeQuery, TimeInt, TimeRange, -}; +use re_arrow_store::{polars_util, test_bundle, DataStore, DataStoreStats, TimeInt}; use re_log_types::{ - component_types::{ColorRGBA, InstanceKey, Point2D, Rect2D}, + component_types::InstanceKey, datagen::{ - build_frame_nr, build_some_colors, build_some_instances, build_some_instances_from, - build_some_point2d, build_some_rects, + build_frame_nr, build_log_time, build_some_colors, build_some_instances, build_some_point2d, }, - external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, - msg_bundle::{wrap_in_listarray, Component as _, MsgBundle}, - ComponentName, EntityPath, MsgId, TimeType, Timeline, + msg_bundle::{Component as _, MsgBundle}, + EntityPath, MsgId, Time, }; -// --- LatestAt --- +// --- Dump --- #[test] fn dump() { init_logs(); - // TODO: - // config - // dump - // gc + // TODO(cmc): Sprinkle some serialization roundtrips in main end-to-end test suites to account + // for feature intersection. + + for mut config in re_arrow_store::test_util::all_configs() { + // NOTE: insert IDs A) are irrelevant for our purposes and B) cannot possibly match at the + // moment since we don't auto-merge multi-timestamp rows back into a single message (yet?). + config.store_insert_ids = false; - for config in re_arrow_store::test_util::all_configs() { let mut store1 = DataStore::new(InstanceKey::name(), config.clone()); let mut store2 = DataStore::new(InstanceKey::name(), config.clone()); let mut store3 = DataStore::new(InstanceKey::name(), config.clone()); dump_impl(&mut store1, &mut store2, &mut store3); - // store1.gc( - // GarbageCollectionTarget::DropAtLeastPercentage(1.0), - // Timeline::new("frame_nr", TimeType::Sequence), - // MsgId::name(), - // ); - // dump_impl(&mut store1, &mut store2); } } @@ -56,22 +43,28 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt let frame3: TimeInt = 3.into(); let frame4: TimeInt = 4.into(); - // TODO: real important to add an extra timeline here let create_bundles = |store: &mut DataStore, ent_path| { let ent_path = EntityPath::from(ent_path); let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); - let bundle1 = - test_bundle!(ent_path @ [build_frame_nr(frame1)] => [instances1.clone(), colors1]); + let bundle1 = test_bundle!(ent_path @ [ + build_log_time(Time::now()), build_frame_nr(frame1), + ] => [instances1.clone(), colors1]); let points2 = build_some_point2d(3); - let bundle2 = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [instances1, points2]); + let bundle2 = test_bundle!(ent_path @ [ + build_log_time(Time::now()), build_frame_nr(frame2), + ] => [instances1, points2]); let points3 = build_some_point2d(10); - let bundle3 = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [points3]); + let bundle3 = test_bundle!(ent_path @ [ + build_log_time(Time::now()), build_frame_nr(frame3), + ] => [points3]); let colors4 = build_some_colors(5); - let bundle4 = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [colors4]); + let bundle4 = test_bundle!(ent_path @ [ + build_log_time(Time::now()), build_frame_nr(frame4), + ] => [colors4]); vec![bundle1, bundle2, bundle3, bundle4] }; @@ -93,10 +86,10 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt .flat_map(|ent_path| create_bundles(store1, *ent_path)) .collect_vec(); + // Fill the first store. for bundle in &bundles { insert(store1, bundle); } - if let err @ Err(_) = store1.sanity_check() { store1.sort_indices_if_needed(); eprintln!("{store1}"); @@ -105,40 +98,51 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt // Dump the first store into the second one. for bundle in store1.as_msg_bundles(MsgId::name()) { - insert(store2, &bundle); + store2.insert(&bundle).unwrap(); + } + if let err @ Err(_) = store2.sanity_check() { + store2.sort_indices_if_needed(); + eprintln!("{store2}"); + err.unwrap(); } // Dump the second store into the third one. for bundle in store2.as_msg_bundles(MsgId::name()) { - insert(store3, &bundle); + store3.insert(&bundle).unwrap(); + } + if let err @ Err(_) = store3.sanity_check() { + store3.sort_indices_if_needed(); + eprintln!("{store3}"); + err.unwrap(); } - let store1_df = store1.to_dataframe(); - let store2_df = store2.to_dataframe(); - let store3_df = store3.to_dataframe(); - assert_eq!( - store1_df, store2_df, + let mut store1_df = store1.to_dataframe(); + let mut store2_df = store2.to_dataframe(); + let mut store3_df = store3.to_dataframe(); + assert!( + store1_df == store2_df, "First & second stores differ:\n{store1_df}\n{store2_df}" ); - assert_eq!( - store1_df, store3_df, + assert!( + store1_df == store3_df, "First & third stores differ:\n{store1_df}\n{store3_df}" ); - // NOTE: These are <= rather than == because we're not yet identifying and filtering - // autogenerated cluster keys while dumping. + // NOTE: These are <= rather than == because..: + // - we're not yet identifying and filtering autogenerated cluster keys while dumping + // - we're not yet identifying and merging multi-timestamp rows let store1_stats = DataStoreStats::from_store(store1); let store2_stats = DataStoreStats::from_store(store2); let store3_stats = DataStoreStats::from_store(store3); assert!( store1_stats <= store2_stats, "First store should have <= amount of data of second store:\n\ - {store1_stats:?}\n{store2_stats:?}" + {store1_stats:#?}\n{store2_stats:#?}" ); assert!( store2_stats <= store3_stats, "Second store should have <= amount of data of third store:\n\ - {store2_stats:?}\n{store3_stats:?}" + {store2_stats:#?}\n{store3_stats:#?}" ); } From c22640d25bda4d82c757c6c9ec69118025108acc Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 7 Mar 2023 14:48:59 +0100 Subject: [PATCH 8/8] cleanin up --- crates/re_arrow_store/src/store.rs | 2 +- crates/re_arrow_store/src/store_dump.rs | 2 - crates/re_arrow_store/src/test_util.rs | 128 ++++++++++++------------ crates/re_arrow_store/tests/dump.rs | 14 ++- 4 files changed, 71 insertions(+), 75 deletions(-) diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index 6265fb0c6def..50afd118ecd1 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -231,7 +231,7 @@ pub struct DataStore { /// /// `BTreeMap` because of garbage collection. // - // TODO: this isn't garbage collected, is it? + // TODO(cmc): this isn't garbage collected after all, is it? pub(crate) messages: BTreeMap, /// Used to cache auto-generated cluster components, i.e. `[0]`, `[0, 1]`, `[0, 1, 2]`, etc diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs index 322a8a4ebf2a..525f82b854b6 100644 --- a/crates/re_arrow_store/src/store_dump.rs +++ b/crates/re_arrow_store/src/store_dump.rs @@ -1,5 +1,3 @@ -// TODO: all you need to dump the store as log messages - use itertools::Itertools as _; use re_log_types::{ external::arrow2_convert::deserialize::TryIntoCollection, diff --git a/crates/re_arrow_store/src/test_util.rs b/crates/re_arrow_store/src/test_util.rs index 586d780f9196..aebccc2b3243 100644 --- a/crates/re_arrow_store/src/test_util.rs +++ b/crates/re_arrow_store/src/test_util.rs @@ -28,74 +28,74 @@ macro_rules! test_bundle { pub fn all_configs() -> impl Iterator { const COMPONENT_CONFIGS: &[DataStoreConfig] = &[ DataStoreConfig::DEFAULT, - // DataStoreConfig { - // component_bucket_nb_rows: 0, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // component_bucket_nb_rows: 1, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // component_bucket_nb_rows: 2, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // component_bucket_nb_rows: 3, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // component_bucket_size_bytes: 0, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // component_bucket_size_bytes: 16, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // component_bucket_size_bytes: 32, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // component_bucket_size_bytes: 64, - // ..DataStoreConfig::DEFAULT - // }, + DataStoreConfig { + component_bucket_nb_rows: 0, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + component_bucket_nb_rows: 1, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + component_bucket_nb_rows: 2, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + component_bucket_nb_rows: 3, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + component_bucket_size_bytes: 0, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + component_bucket_size_bytes: 16, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + component_bucket_size_bytes: 32, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + component_bucket_size_bytes: 64, + ..DataStoreConfig::DEFAULT + }, ]; const INDEX_CONFIGS: &[DataStoreConfig] = &[ DataStoreConfig::DEFAULT, - // DataStoreConfig { - // index_bucket_nb_rows: 0, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // index_bucket_nb_rows: 1, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // index_bucket_nb_rows: 2, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // index_bucket_nb_rows: 3, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // index_bucket_size_bytes: 0, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // index_bucket_size_bytes: 16, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // index_bucket_size_bytes: 32, - // ..DataStoreConfig::DEFAULT - // }, - // DataStoreConfig { - // index_bucket_size_bytes: 64, - // ..DataStoreConfig::DEFAULT - // }, + DataStoreConfig { + index_bucket_nb_rows: 0, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + index_bucket_nb_rows: 1, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + index_bucket_nb_rows: 2, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + index_bucket_nb_rows: 3, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + index_bucket_size_bytes: 0, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + index_bucket_size_bytes: 16, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + index_bucket_size_bytes: 32, + ..DataStoreConfig::DEFAULT + }, + DataStoreConfig { + index_bucket_size_bytes: 64, + ..DataStoreConfig::DEFAULT + }, ]; COMPONENT_CONFIGS.iter().flat_map(|comp| { INDEX_CONFIGS.iter().map(|idx| DataStoreConfig { diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs index 70409c3691a0..ca6ac6ee39e1 100644 --- a/crates/re_arrow_store/tests/dump.rs +++ b/crates/re_arrow_store/tests/dump.rs @@ -3,8 +3,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use itertools::Itertools; -use polars_ops::prelude::DataFrameJoinOps; -use re_arrow_store::{polars_util, test_bundle, DataStore, DataStoreStats, TimeInt}; +use re_arrow_store::{test_bundle, DataStore, DataStoreStats, TimeInt}; use re_log_types::{ component_types::InstanceKey, datagen::{ @@ -37,13 +36,12 @@ fn dump() { } fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataStore) { - let frame0: TimeInt = 0.into(); let frame1: TimeInt = 1.into(); let frame2: TimeInt = 2.into(); let frame3: TimeInt = 3.into(); let frame4: TimeInt = 4.into(); - let create_bundles = |store: &mut DataStore, ent_path| { + let create_bundles = |ent_path| { let ent_path = EntityPath::from(ent_path); let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); @@ -83,7 +81,7 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt let ent_paths = ["this/that", "other", "yet/another/one"]; let bundles = ent_paths .iter() - .flat_map(|ent_path| create_bundles(store1, *ent_path)) + .flat_map(|ent_path| create_bundles(*ent_path)) .collect_vec(); // Fill the first store. @@ -116,9 +114,9 @@ fn dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataSt err.unwrap(); } - let mut store1_df = store1.to_dataframe(); - let mut store2_df = store2.to_dataframe(); - let mut store3_df = store3.to_dataframe(); + let store1_df = store1.to_dataframe(); + let store2_df = store2.to_dataframe(); + let store3_df = store3.to_dataframe(); assert!( store1_df == store2_df, "First & second stores differ:\n{store1_df}\n{store2_df}"