Skip to content

Commit

Permalink
Re-use ComponentName and subsequent cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed Jul 19, 2023
1 parent e9da2d8 commit a6905db
Show file tree
Hide file tree
Showing 79 changed files with 471 additions and 583 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 14 additions & 21 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn latest_at(c: &mut Criterion) {
group.bench_function("default", |b| {
let store = insert_table(Default::default(), InstanceKey::name(), &table);
b.iter(|| {
let cells = latest_data_at(&store, &Rect2D::name(), &[Rect2D::name()]);
let cells = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = cells[0]
.as_ref()
.unwrap()
Expand All @@ -128,7 +128,7 @@ fn latest_at(c: &mut Criterion) {
);
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let cells = latest_data_at(&store, &Rect2D::name(), &[Rect2D::name()]);
let cells = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = cells[0]
.as_ref()
.unwrap()
Expand Down Expand Up @@ -156,19 +156,16 @@ fn latest_at_missing(c: &mut Criterion) {
let store = insert_table(Default::default(), InstanceKey::name(), &table);
group.bench_function("primary/default", |b| {
b.iter(|| {
let results = latest_data_at(
&store,
&("non_existing_component".into()),
&[Rect2D::name()],
);
let results =
latest_data_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
assert!(results[0].is_none());
});
});
group.bench_function("secondaries/default", |b| {
b.iter(|| {
let results = latest_data_at(
&store,
&Rect2D::name(),
Rect2D::name(),
&[
"non_existing_component1".into(),
"non_existing_component2".into(),
Expand All @@ -193,19 +190,16 @@ fn latest_at_missing(c: &mut Criterion) {
);
group.bench_function(format!("primary/bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let results = latest_data_at(
&store,
&("non_existing_component".into()),
&[Rect2D::name()],
);
let results =
latest_data_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
assert!(results[0].is_none());
});
});
group.bench_function(format!("secondaries/bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let results = latest_data_at(
&store,
&Rect2D::name(),
Rect2D::name(),
&[
"non_existing_component1".into(),
"non_existing_component2".into(),
Expand Down Expand Up @@ -247,10 +241,9 @@ fn range(c: &mut Criterion) {
InstanceKey::name(),
&table,
);
let components = [Rect2D::name()];
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let rows = range_data(&store, &components);
let rows = range_data(&store, [Rect2D::name()]);
for (cur_time, (time, cells)) in rows.enumerate() {
let time = time.unwrap();
assert_eq!(cur_time as i64, time.as_i64());
Expand Down Expand Up @@ -354,7 +347,7 @@ fn insert_table(

fn latest_data_at<const N: usize>(
store: &DataStore,
primary: &ComponentName,
primary: ComponentName,
secondaries: &[ComponentName; N],
) -> [Option<DataCell>; N] {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
Expand All @@ -366,10 +359,10 @@ fn latest_data_at<const N: usize>(
.map_or_else(|| [(); N].map(|_| None), |(_, cells)| cells)
}

fn range_data<'a, const N: usize>(
store: &'a DataStore,
components: &'a [ComponentName; N],
) -> impl Iterator<Item = (Option<TimeInt>, [Option<DataCell>; N])> + 'a {
fn range_data<const N: usize>(
store: &DataStore,
components: [ComponentName; N],
) -> impl Iterator<Item = (Option<TimeInt>, [Option<DataCell>; N])> + '_ {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(0.into(), NUM_ROWS.into()));
let ent_path = EntityPath::from("rects");
Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/examples/latest_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() {
&store,
&LatestAtQuery::new(timeline_frame_nr, 10.into()),
&ent_path,
&Rect2D::name(),
Rect2D::name(),
)
.unwrap();
println!(
Expand All @@ -48,7 +48,7 @@ fn main() {
&store,
&LatestAtQuery::new(timeline_frame_nr, 10.into()),
&ent_path,
&Point2D::name(),
Point2D::name(),
)
.unwrap();
println!("Query results from {:?}'s PoV:\n{df}", Point2D::name());
Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/examples/range_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn main() {
&store,
&query,
&ent_path,
&Rect2D::name(),
Rect2D::name(),
[InstanceKey::name(), Rect2D::name(), Point2D::name()],
&JoinType::Outer,
);
Expand All @@ -77,7 +77,7 @@ fn main() {
&store,
&query,
&ent_path,
&Point2D::name(),
Point2D::name(),
[InstanceKey::name(), Rect2D::name(), Point2D::name()],
&JoinType::Outer,
);
Expand Down
28 changes: 14 additions & 14 deletions crates/re_arrow_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ pub fn latest_component(
store: &DataStore,
query: &LatestAtQuery,
ent_path: &EntityPath,
primary: &ComponentName,
primary: ComponentName,
) -> SharedResult<DataFrame> {
let cluster_key = store.cluster_key();

let components = [cluster_key.clone(), primary.clone()];
let components = &[cluster_key, primary];
let (_, cells) = store
.latest_at(query, ent_path, primary, &components)
.latest_at(query, ent_path, primary, components)
.unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));

dataframe_from_cells(&cells)
Expand Down Expand Up @@ -72,9 +72,9 @@ pub fn latest_components(
let dfs = primaries
.iter()
.filter(|primary| **primary != cluster_key)
.map(|primary| latest_component(store, query, ent_path, primary));
.map(|primary| latest_component(store, query, ent_path, *primary));

join_dataframes(cluster_key.clone(), join_type, dfs)
join_dataframes(cluster_key, join_type, dfs)
}

// --- Range ---
Expand Down Expand Up @@ -104,7 +104,7 @@ pub fn range_components<'a, const N: usize>(
store: &'a DataStore,
query: &'a RangeQuery,
ent_path: &'a EntityPath,
primary: &ComponentName,
primary: ComponentName,
components: [ComponentName; N],
join_type: &'a JoinType,
) -> impl Iterator<Item = SharedResult<(Option<TimeInt>, DataFrame)>> + 'a {
Expand All @@ -119,7 +119,7 @@ pub fn range_components<'a, const N: usize>(
// would be to drop the constant sizes all the way down, which would be way more painful to
// deal with.
assert!(components.contains(&cluster_key));
assert!(components.contains(primary));
assert!(components.contains(&primary));

let mut state = None;

Expand Down Expand Up @@ -148,7 +148,7 @@ pub fn range_components<'a, const N: usize>(

let primary_col = components
.iter()
.find_position(|component| *component == primary)
.find_position(|component| **component == primary)
.map(|(col, _)| col)
.unwrap(); // asserted on entry

Expand All @@ -159,7 +159,7 @@ pub fn range_components<'a, const N: usize>(
// followed by the range
.chain(
store
.range(query, ent_path, &components)
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
Expand All @@ -170,7 +170,7 @@ pub fn range_components<'a, const N: usize>(
)
.filter_map(move |(time, is_primary, df)| {
state = Some(join_dataframes(
cluster_key.clone(),
cluster_key,
join_type,
// The order matters here: the newly yielded dataframe goes to the right so that it
// overwrites the data in the state if their column overlaps!
Expand Down Expand Up @@ -248,8 +248,8 @@ pub fn join_dataframes(

left.join(
&right,
[cluster_key.clone()],
[cluster_key.clone()],
[cluster_key],
[cluster_key],
join_type.clone(),
None,
)
Expand All @@ -258,7 +258,7 @@ pub fn join_dataframes(
})
.unwrap_or_else(|| Ok(DataFrame::default()))?;

Ok(df.sort([cluster_key], false).unwrap_or(df))
Ok(df.sort([cluster_key.as_str()], false).unwrap_or(df))
}

/// Returns a new `DataFrame` where all rows that only contain null values (ignoring the cluster
Expand All @@ -267,7 +267,7 @@ pub fn drop_all_nulls(df: &DataFrame, cluster_key: &ComponentName) -> PolarsResu
let cols = df
.get_column_names()
.into_iter()
.filter(|col| *col != cluster_key);
.filter(|col| *col != cluster_key.as_str());

let mut iter = df.select_series(cols)?.into_iter();

Expand Down
30 changes: 10 additions & 20 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::atomic::AtomicU64;

use ahash::HashMap;
use arrow2::datatypes::DataType;
use nohash_hasher::IntMap;
use nohash_hasher::{IntMap, IntSet};
use parking_lot::RwLock;
use re_types::ComponentName;
use smallvec::SmallVec;
Expand Down Expand Up @@ -79,14 +79,10 @@ pub type InsertIdVec = SmallVec<[u64; 4]>;
///
/// See also [`DataStore::lookup_datatype`].
#[derive(Debug, Default, Clone)]
// TODO(jleibs): Can we use IntMap
//pub struct DataTypeRegistry(pub IntMap<ComponentName, DataType>);
pub struct DataTypeRegistry(pub ahash::HashMap<ComponentName, DataType>);
pub struct DataTypeRegistry(pub IntMap<ComponentName, DataType>);

impl std::ops::Deref for DataTypeRegistry {
// TODO(jleibs): Can we use IntMap
//type Target = IntMap<ComponentName, DataType>;
type Target = ahash::HashMap<ComponentName, DataType>;
type Target = IntMap<ComponentName, DataType>;

#[inline]
fn deref(&self) -> &Self::Target {
Expand Down Expand Up @@ -231,7 +227,7 @@ pub struct DataStore {
impl Clone for DataStore {
fn clone(&self) -> Self {
Self {
cluster_key: self.cluster_key.clone(),
cluster_key: self.cluster_key,
config: self.config.clone(),
type_registry: self.type_registry.clone(),
metadata_registry: self.metadata_registry.clone(),
Expand Down Expand Up @@ -281,7 +277,7 @@ impl DataStore {

/// See [`Self::cluster_key`] for more information about the cluster key.
pub fn cluster_key(&self) -> ComponentName {
self.cluster_key.clone()
self.cluster_key
}

/// See [`DataStoreConfig`] for more information about configuration.
Expand Down Expand Up @@ -405,9 +401,7 @@ pub struct IndexedTable {
/// Note that this set will never be purged and will continue to return components that may
/// have been set in the past even if all instances of that component have since been purged
/// to free up space.
// TODO(jleibs): Can we use IntSet?
//pub all_components: IntSet<ComponentName>,
pub all_components: ahash::HashSet<ComponentName>,
pub all_components: IntSet<ComponentName>,

/// The number of rows stored in this table, across all of its buckets.
pub buckets_num_rows: u64,
Expand All @@ -422,7 +416,7 @@ pub struct IndexedTable {

impl IndexedTable {
pub fn new(cluster_key: ComponentName, timeline: Timeline, ent_path: EntityPath) -> Self {
let bucket = IndexedBucket::new(cluster_key.clone(), timeline);
let bucket = IndexedBucket::new(cluster_key, timeline);
let buckets_size_bytes = bucket.total_size_bytes();
Self {
timeline,
Expand Down Expand Up @@ -455,7 +449,7 @@ impl Clone for IndexedBucket {
fn clone(&self) -> Self {
Self {
timeline: self.timeline,
cluster_key: self.cluster_key.clone(),
cluster_key: self.cluster_key,
inner: RwLock::new(self.inner.read().clone()),
}
}
Expand Down Expand Up @@ -507,9 +501,7 @@ pub struct IndexedBucketInner {
///
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
// TODO(jleibs): Can we use HashMap
//pub columns: IntMap<ComponentName, DataCellColumn>,
pub columns: ahash::HashMap<ComponentName, DataCellColumn>,
pub columns: IntMap<ComponentName, DataCellColumn>,

/// The size of both the control & component data stored in this bucket, heap and stack
/// included, in bytes.
Expand Down Expand Up @@ -578,9 +570,7 @@ pub struct PersistentIndexedTable {
///
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
// TODO(jleibs): Can we use IntMapI
//pub columns: IntMap<ComponentName, DataCellColumn>,
pub columns: ahash::HashMap<ComponentName, DataCellColumn>,
pub columns: IntMap<ComponentName, DataCellColumn>,
}

impl PersistentIndexedTable {
Expand Down
5 changes: 3 additions & 2 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeMap;

use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};
use nohash_hasher::IntMap;
use re_log_types::{
DataCellColumn, DataTable, DataTableResult, RowId, Timeline, COLUMN_INSERT_ID,
COLUMN_NUM_INSTANCES, COLUMN_ROW_ID,
Expand Down Expand Up @@ -93,7 +94,7 @@ fn serialize(
col_insert_id: &[u64],
col_row_id: &[RowId],
col_num_instances: &[u32],
table: &ahash::HashMap<ComponentName, DataCellColumn>,
table: &IntMap<ComponentName, DataCellColumn>,
) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
re_tracing::profile_function!();

Expand Down Expand Up @@ -173,7 +174,7 @@ fn serialize_control_columns(

fn serialize_data_columns(
cluster_key: &ComponentName,
table: &ahash::HashMap<ComponentName, DataCellColumn>,
table: &IntMap<ComponentName, DataCellColumn>,
) -> DataTableResult<(Schema, Vec<Box<dyn Array>>)> {
re_tracing::profile_function!();

Expand Down
2 changes: 1 addition & 1 deletion crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl DataStore {
let mut columns2 = BTreeMap::default();
for (component, column) in columns {
let column = filter_column(col_time, column.iter(), time_filter).collect();
columns2.insert(component.clone(), DataCellColumn(column));
columns2.insert(*component, DataCellColumn(column));
}

Some(DataTable {
Expand Down
Loading

0 comments on commit a6905db

Please sign in to comment.