Skip to content

Commit

Permalink
GC improvements 1: fix timeless tables not being RowId-ordered (#4395)
Browse files Browse the repository at this point in the history
Fixes a long-standing bug: timeless tables not being sorted by `RowId`,
which means they effectively always return incorrect results for
out-of-order data (yes, that is a thing even in a timeless context).

This _worsens_  GC performance for timeless tables, but:
1. The performance of incorrect code hardly matters to begin with, and
2. this is ground work for turning timeless tables in ringbuffers in an
upcoming PR, which will massively improve performance.

- Fixes #1807 

### Benchmarks

Hint: it's even worse!

```
group                                                     gc_improvements_0                       gc_improvements_1
-----                                                     -----------------                       -----------------
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024    1.00   1084.0±4.47ms 54.1 KElem/sec     1.03   1117.2±9.07ms 52.4 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048    1.00       2.1±0.02s 27.6 KElem/sec     1.01       2.1±0.01s 27.3 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256     1.00    465.8±2.50ms 125.8 KElem/sec    1.01    471.5±4.76ms 124.3 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512     1.00    655.3±2.61ms 89.4 KElem/sec     1.02    666.7±6.64ms 87.9 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/default          1.00    652.8±4.12ms 89.8 KElem/sec     1.02    665.6±4.67ms 88.0 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024         1.00       2.4±0.05s 24.2 KElem/sec     3.35       8.1±0.10s  7.2 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048         1.00       2.4±0.03s 24.1 KElem/sec     3.30       8.0±0.09s  7.3 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256          1.00       2.5±0.08s 23.5 KElem/sec     3.23       8.1±0.11s  7.3 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512          1.00       2.4±0.02s 24.5 KElem/sec     3.38       8.1±0.11s  7.3 KElem/sec
.../timeless_logs/drop_at_least=0.3/default               1.00       2.4±0.03s 24.4 KElem/sec     3.35       8.1±0.07s  7.3 KElem/sec

```

---

Part of the GC improvements series:
- #4394
- #4395
- #4396
- #4397
- #4398
- #4399
- #4400
- #4401
  • Loading branch information
teh-cmc authored Dec 2, 2023
1 parent 9a9d990 commit 75c232b
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 90 deletions.
3 changes: 3 additions & 0 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub(crate) use self::store::{
MetadataRegistry, PersistentIndexedTable,
};

#[allow(unused_imports)] // only used with some sets of feature flags atm
pub(crate) use self::store::PersistentIndexedTableInner;

// Re-exports
#[doc(no_inline)]
pub use arrow2::io::ipc::read::{StreamReader, StreamState};
Expand Down
53 changes: 45 additions & 8 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,7 @@ impl Default for IndexedBucketInner {
/// ```text
/// cargo test -p re_arrow_store -- --nocapture datastore_internal_repr
/// ```
//
// TODO(#1807): timeless should be row-id ordered too then
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PersistentIndexedTable {
/// The entity this table is related to, for debugging purposes.
pub ent_path: EntityPath,
Expand All @@ -572,6 +570,32 @@ pub struct PersistentIndexedTable {
/// place.
pub cluster_key: ComponentName,

// To simplify interior mutability.
pub inner: RwLock<PersistentIndexedTableInner>,
}

impl Clone for PersistentIndexedTable {
fn clone(&self) -> Self {
Self {
ent_path: self.ent_path.clone(),
cluster_key: self.cluster_key,
inner: RwLock::new(self.inner.read().clone()),
}
}
}

impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
Self {
cluster_key,
ent_path,
inner: RwLock::new(PersistentIndexedTableInner::default()),
}
}
}

#[derive(Debug, Clone)]
pub struct PersistentIndexedTableInner {
/// The entire column of insertion IDs, if enabled in [`DataStoreConfig`].
///
/// Keeps track of insertion order from the point-of-view of the [`DataStore`].
Expand All @@ -592,21 +616,34 @@ pub struct PersistentIndexedTable {
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// Are the rows in this table sorted?
///
/// Querying a [`PersistentIndexedTable`] will always trigger a sort if the rows within
/// aren't already sorted.
pub is_sorted: bool,
}

impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
impl Default for PersistentIndexedTableInner {
fn default() -> Self {
Self {
cluster_key,
ent_path,
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
is_sorted: true,
}
}
}

impl PersistentIndexedTableInner {
#[inline]
pub fn is_empty(&self) -> bool {
self.col_num_instances.is_empty()
self.num_rows() == 0
}

#[inline]
pub fn num_rows(&self) -> u64 {
self.col_row_id.len() as u64
}
}
11 changes: 9 additions & 2 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use nohash_hasher::IntMap;
use re_log_types::{DataCellColumn, DataTable, DataTableResult, NumInstances, RowId, Timeline};
use re_types_core::ComponentName;

use crate::store::{IndexedBucket, IndexedBucketInner, PersistentIndexedTable};
use crate::store::{
IndexedBucket, IndexedBucketInner, PersistentIndexedTable, PersistentIndexedTableInner,
};

// ---

Expand Down Expand Up @@ -66,11 +68,16 @@ impl PersistentIndexedTable {
let Self {
ent_path: _,
cluster_key,
inner,
} = self;

let PersistentIndexedTableInner {
col_insert_id,
col_row_id,
col_num_instances,
columns,
} = self;
is_sorted: _,
} = &*inner.read();

serialize(
cluster_key,
Expand Down
13 changes: 10 additions & 3 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use re_log_types::{
};

use crate::{
store::{IndexedBucketInner, PersistentIndexedTable},
store::{IndexedBucketInner, PersistentIndexedTable, PersistentIndexedTableInner},
DataStore, IndexedBucket,
};

Expand Down Expand Up @@ -71,18 +71,25 @@ impl DataStore {
let PersistentIndexedTable {
ent_path,
cluster_key: _,
inner,
} = table;

let inner = &*inner.read();
let PersistentIndexedTableInner {
col_insert_id: _,
col_row_id,
col_num_instances,
columns,
} = table;
is_sorted,
} = inner;
debug_assert!(is_sorted);

DataTable {
table_id: TableId::random(),
col_row_id: col_row_id.clone(),
col_timelines: Default::default(),
col_entity_path: std::iter::repeat_with(|| ent_path.clone())
.take(table.num_rows() as _)
.take(inner.num_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone().into_iter().collect(), // shallow
Expand Down
13 changes: 2 additions & 11 deletions crates/re_arrow_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,12 @@ impl std::fmt::Display for IndexedBucket {
impl std::fmt::Display for PersistentIndexedTable {
#[allow(clippy::string_add)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
ent_path,
cluster_key: _,
col_insert_id: _,
col_row_id: _,
col_num_instances: _,
columns: _,
} = self;

f.write_fmt(format_args!("entity: {ent_path}\n"))?;
f.write_fmt(format_args!("entity: {}\n", self.ent_path))?;

f.write_fmt(format_args!(
"size: {} across {} rows\n",
format_bytes(self.total_size_bytes() as _),
format_number(self.num_rows() as _),
format_number(self.inner.read().num_rows() as _),
))?;

let (schema, columns) = self.serialize().map_err(|err| {
Expand Down
51 changes: 27 additions & 24 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use re_log_types::{EntityPath, RowId, TimeInt, TimeRange, Timeline};
use re_types_core::{ComponentName, SizeBytes as _};

use crate::{
store::{ClusterCellCache, IndexedBucketInner, IndexedTable, PersistentIndexedTable},
store::{
ClusterCellCache, IndexedBucketInner, IndexedTable, PersistentIndexedTable,
PersistentIndexedTableInner,
},
DataStore, DataStoreStats, StoreDiff, StoreDiffKind, StoreEvent,
};

Expand Down Expand Up @@ -297,10 +300,6 @@ impl DataStore {
}

/// For each `EntityPath`, `Timeline`, `Component` find the N latest [`RowId`]s.
///
/// These are the rows that must be protected so as not to impact a latest-at query.
/// Note that latest for Timeless is currently based on insertion-order rather than
/// tuid. [See: #1807](https://github.com/rerun-io/rerun/issues/1807)
//
// TODO(jleibs): More complex functionality might required expanding this to also
// *ignore* specific entities, components, timelines, etc. for this protection.
Expand Down Expand Up @@ -363,12 +362,13 @@ impl DataStore {
}

// Find all protected rows in timeless tables
// TODO(#1807): this is still based on insertion order.
for table in self.timeless_tables.values() {
let cluster_key = table.cluster_key;
let table = table.inner.read();
let mut components_to_find: HashMap<ComponentName, usize> = table
.columns
.keys()
.filter(|c| **c != table.cluster_key)
.filter(|c| **c != cluster_key)
.filter(|c| !dont_protect.contains(*c))
.map(|c| (*c, target_count))
.collect();
Expand Down Expand Up @@ -409,6 +409,9 @@ impl DataStore {

// Drop any empty timeless tables
self.timeless_tables.retain(|_, table| {
let entity_path = &table.ent_path;
let mut table = table.inner.write();

// If any column is non-empty, we need to keep this table…
for num in &table.col_num_instances {
if num.get() != 0 {
Expand All @@ -418,7 +421,7 @@ impl DataStore {

// …otherwise we can drop it.

let entity_path = table.ent_path.clone();
let entity_path = entity_path.clone();

for i in 0..table.col_row_id.len() {
let row_id = table.col_row_id[i];
Expand Down Expand Up @@ -683,41 +686,41 @@ impl PersistentIndexedTable {
let PersistentIndexedTable {
ent_path,
cluster_key: _,
inner,
} = self;

let inner = &mut *inner.write();
inner.sort();

let PersistentIndexedTableInner {
col_insert_id,
col_row_id,
col_num_instances,
columns,
} = self;
is_sorted,
} = inner;

let mut diff: Option<StoreDiff> = None;

// TODO(#1807): Timeless data isn't sorted, so we need to do a full scan here.
// Speed this up when we implement #1807.
if let Some(row_index) = col_row_id
.iter()
.enumerate()
.find(|(_, r)| **r == row_id)
.map(|(index, _)| index)
{
if let Ok(row_index) = col_row_id.binary_search(&row_id) {
*is_sorted = row_index.saturating_add(1) == col_row_id.len();

// col_row_id
// TODO(jleibs) Use swap_remove once we have a notion of sorted
let removed_row_id = col_row_id.remove(row_index);
let removed_row_id = col_row_id.swap_remove(row_index);
debug_assert_eq!(row_id, removed_row_id);
dropped_num_bytes += removed_row_id.total_size_bytes();

// col_insert_id (if present)
if !col_insert_id.is_empty() {
// TODO(jleibs) Use swap_remove once we have a notion of sorted
dropped_num_bytes += col_insert_id.remove(row_index).total_size_bytes();
dropped_num_bytes += col_insert_id.swap_remove(row_index).total_size_bytes();
}

// col_num_instances
// TODO(jleibs) Use swap_remove once we have a notion of sorted
dropped_num_bytes += col_num_instances.remove(row_index).total_size_bytes();
dropped_num_bytes += col_num_instances.swap_remove(row_index).total_size_bytes();

// each data column
for column in columns.values_mut() {
let cell = column.0.remove(row_index);
let cell = column.0.swap_remove(row_index);

// TODO(#1809): once datatype deduplication is in, we should really not count
// autogenerated keys as part of the memory stats (same on write path).
Expand Down
12 changes: 9 additions & 3 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use re_types_core::ComponentName;

use crate::{
store::InsertIdVec, ArrayExt, DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner,
PersistentIndexedTable,
PersistentIndexedTable, PersistentIndexedTableInner,
};

// TODO(#1692): all of this stuff should be defined by Data{Cell,Row,Table}, not the store.
Expand Down Expand Up @@ -173,13 +173,19 @@ impl PersistentIndexedTable {
let Self {
ent_path: _,
cluster_key: _,
inner,
} = self;

let inner = &*inner.read();
let PersistentIndexedTableInner {
col_insert_id,
col_row_id,
col_num_instances,
columns,
} = self;
is_sorted,
} = inner;

let num_rows = self.num_rows() as usize;
let num_rows = inner.num_rows() as usize;

let insert_ids = config
.store_insert_ids
Expand Down
Loading

0 comments on commit 75c232b

Please sign in to comment.