Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC improvements 1: fix timeless tables not being RowId-ordered #4395

Merged
merged 5 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub(crate) use self::store::{
MetadataRegistry, PersistentIndexedTable,
};

#[allow(unused_imports)] // only used with some sets of feature flags atm
pub(crate) use self::store::PersistentIndexedTableInner;

// Re-exports
#[doc(no_inline)]
pub use arrow2::io::ipc::read::{StreamReader, StreamState};
Expand Down
53 changes: 45 additions & 8 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,7 @@ impl Default for IndexedBucketInner {
/// ```text
/// cargo test -p re_arrow_store -- --nocapture datastore_internal_repr
/// ```
//
// TODO(#1807): timeless should be row-id ordered too then
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PersistentIndexedTable {
/// The entity this table is related to, for debugging purposes.
pub ent_path: EntityPath,
Expand All @@ -572,6 +570,32 @@ pub struct PersistentIndexedTable {
/// place.
pub cluster_key: ComponentName,

// To simplify interior mutability.
pub inner: RwLock<PersistentIndexedTableInner>,
}

impl Clone for PersistentIndexedTable {
fn clone(&self) -> Self {
Self {
ent_path: self.ent_path.clone(),
cluster_key: self.cluster_key,
inner: RwLock::new(self.inner.read().clone()),
}
}
}

impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
Self {
cluster_key,
ent_path,
inner: RwLock::new(PersistentIndexedTableInner::default()),
}
}
}

#[derive(Debug, Clone)]
pub struct PersistentIndexedTableInner {
/// The entire column of insertion IDs, if enabled in [`DataStoreConfig`].
///
/// Keeps track of insertion order from the point-of-view of the [`DataStore`].
Expand All @@ -592,21 +616,34 @@ pub struct PersistentIndexedTable {
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// Are the rows in this table sorted?
///
/// Querying a [`PersistentIndexedTable`] will always trigger a sort if the rows within
/// aren't already sorted.
pub is_sorted: bool,
}

impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
impl Default for PersistentIndexedTableInner {
fn default() -> Self {
Self {
cluster_key,
ent_path,
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
is_sorted: true,
}
}
}

impl PersistentIndexedTableInner {
#[inline]
pub fn is_empty(&self) -> bool {
self.col_num_instances.is_empty()
self.num_rows() == 0
}

#[inline]
pub fn num_rows(&self) -> u64 {
self.col_row_id.len() as u64
}
}
11 changes: 9 additions & 2 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use nohash_hasher::IntMap;
use re_log_types::{DataCellColumn, DataTable, DataTableResult, NumInstances, RowId, Timeline};
use re_types_core::ComponentName;

use crate::store::{IndexedBucket, IndexedBucketInner, PersistentIndexedTable};
use crate::store::{
IndexedBucket, IndexedBucketInner, PersistentIndexedTable, PersistentIndexedTableInner,
};

// ---

Expand Down Expand Up @@ -66,11 +68,16 @@ impl PersistentIndexedTable {
let Self {
ent_path: _,
cluster_key,
inner,
} = self;

let PersistentIndexedTableInner {
col_insert_id,
col_row_id,
col_num_instances,
columns,
} = self;
is_sorted: _,
} = &*inner.read();

serialize(
cluster_key,
Expand Down
13 changes: 10 additions & 3 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use re_log_types::{
};

use crate::{
store::{IndexedBucketInner, PersistentIndexedTable},
store::{IndexedBucketInner, PersistentIndexedTable, PersistentIndexedTableInner},
DataStore, IndexedBucket,
};

Expand Down Expand Up @@ -71,18 +71,25 @@ impl DataStore {
let PersistentIndexedTable {
ent_path,
cluster_key: _,
inner,
} = table;

let inner = &*inner.read();
let PersistentIndexedTableInner {
col_insert_id: _,
col_row_id,
col_num_instances,
columns,
} = table;
is_sorted,
} = inner;
debug_assert!(is_sorted);

DataTable {
table_id: TableId::random(),
col_row_id: col_row_id.clone(),
col_timelines: Default::default(),
col_entity_path: std::iter::repeat_with(|| ent_path.clone())
.take(table.num_rows() as _)
.take(inner.num_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone().into_iter().collect(), // shallow
Expand Down
13 changes: 2 additions & 11 deletions crates/re_arrow_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,12 @@ impl std::fmt::Display for IndexedBucket {
impl std::fmt::Display for PersistentIndexedTable {
#[allow(clippy::string_add)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
ent_path,
cluster_key: _,
col_insert_id: _,
col_row_id: _,
col_num_instances: _,
columns: _,
} = self;

f.write_fmt(format_args!("entity: {ent_path}\n"))?;
f.write_fmt(format_args!("entity: {}\n", self.ent_path))?;

f.write_fmt(format_args!(
"size: {} across {} rows\n",
format_bytes(self.total_size_bytes() as _),
format_number(self.num_rows() as _),
format_number(self.inner.read().num_rows() as _),
))?;

let (schema, columns) = self.serialize().map_err(|err| {
Expand Down
51 changes: 27 additions & 24 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use re_log_types::{EntityPath, RowId, TimeInt, TimeRange, Timeline};
use re_types_core::{ComponentName, SizeBytes as _};

use crate::{
store::{ClusterCellCache, IndexedBucketInner, IndexedTable, PersistentIndexedTable},
store::{
ClusterCellCache, IndexedBucketInner, IndexedTable, PersistentIndexedTable,
PersistentIndexedTableInner,
},
DataStore, DataStoreStats, StoreDiff, StoreDiffKind, StoreEvent,
};

Expand Down Expand Up @@ -297,10 +300,6 @@ impl DataStore {
}

/// For each `EntityPath`, `Timeline`, `Component` find the N latest [`RowId`]s.
///
/// These are the rows that must be protected so as not to impact a latest-at query.
/// Note that latest for Timeless is currently based on insertion-order rather than
/// tuid. [See: #1807](https://github.com/rerun-io/rerun/issues/1807)
//
// TODO(jleibs): More complex functionality might required expanding this to also
// *ignore* specific entities, components, timelines, etc. for this protection.
Expand Down Expand Up @@ -363,12 +362,13 @@ impl DataStore {
}

// Find all protected rows in timeless tables
// TODO(#1807): this is still based on insertion order.
for table in self.timeless_tables.values() {
let cluster_key = table.cluster_key;
let table = table.inner.read();
let mut components_to_find: HashMap<ComponentName, usize> = table
.columns
.keys()
.filter(|c| **c != table.cluster_key)
.filter(|c| **c != cluster_key)
.filter(|c| !dont_protect.contains(*c))
.map(|c| (*c, target_count))
.collect();
Expand Down Expand Up @@ -409,6 +409,9 @@ impl DataStore {

// Drop any empty timeless tables
self.timeless_tables.retain(|_, table| {
let entity_path = &table.ent_path;
let mut table = table.inner.write();

// If any column is non-empty, we need to keep this table…
for num in &table.col_num_instances {
if num.get() != 0 {
Expand All @@ -418,7 +421,7 @@ impl DataStore {

// …otherwise we can drop it.

let entity_path = table.ent_path.clone();
let entity_path = entity_path.clone();

for i in 0..table.col_row_id.len() {
let row_id = table.col_row_id[i];
Expand Down Expand Up @@ -683,41 +686,41 @@ impl PersistentIndexedTable {
let PersistentIndexedTable {
ent_path,
cluster_key: _,
inner,
} = self;

let inner = &mut *inner.write();
inner.sort();

let PersistentIndexedTableInner {
col_insert_id,
col_row_id,
col_num_instances,
columns,
} = self;
is_sorted,
} = inner;

let mut diff: Option<StoreDiff> = None;

// TODO(#1807): Timeless data isn't sorted, so we need to do a full scan here.
// Speed this up when we implement #1807.
if let Some(row_index) = col_row_id
.iter()
.enumerate()
.find(|(_, r)| **r == row_id)
.map(|(index, _)| index)
{
if let Ok(row_index) = col_row_id.binary_search(&row_id) {
*is_sorted = row_index.saturating_add(1) == col_row_id.len();

// col_row_id
// TODO(jleibs) Use swap_remove once we have a notion of sorted
let removed_row_id = col_row_id.remove(row_index);
let removed_row_id = col_row_id.swap_remove(row_index);
debug_assert_eq!(row_id, removed_row_id);
dropped_num_bytes += removed_row_id.total_size_bytes();

// col_insert_id (if present)
if !col_insert_id.is_empty() {
// TODO(jleibs) Use swap_remove once we have a notion of sorted
dropped_num_bytes += col_insert_id.remove(row_index).total_size_bytes();
dropped_num_bytes += col_insert_id.swap_remove(row_index).total_size_bytes();
}

// col_num_instances
// TODO(jleibs) Use swap_remove once we have a notion of sorted
dropped_num_bytes += col_num_instances.remove(row_index).total_size_bytes();
dropped_num_bytes += col_num_instances.swap_remove(row_index).total_size_bytes();

// each data column
for column in columns.values_mut() {
let cell = column.0.remove(row_index);
let cell = column.0.swap_remove(row_index);

// TODO(#1809): once datatype deduplication is in, we should really not count
// autogenerated keys as part of the memory stats (same on write path).
Expand Down
12 changes: 9 additions & 3 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use re_types_core::ComponentName;

use crate::{
store::InsertIdVec, ArrayExt, DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner,
PersistentIndexedTable,
PersistentIndexedTable, PersistentIndexedTableInner,
};

// TODO(#1692): all of this stuff should be defined by Data{Cell,Row,Table}, not the store.
Expand Down Expand Up @@ -173,13 +173,19 @@ impl PersistentIndexedTable {
let Self {
ent_path: _,
cluster_key: _,
inner,
} = self;

let inner = &*inner.read();
let PersistentIndexedTableInner {
col_insert_id,
col_row_id,
col_num_instances,
columns,
} = self;
is_sorted,
} = inner;

let num_rows = self.num_rows() as usize;
let num_rows = inner.num_rows() as usize;

let insert_ids = config
.store_insert_ids
Expand Down
Loading
Loading