Skip to content

Commit

Permalink
Datastore: revamp bench suite (#1733)
Browse files Browse the repository at this point in the history
* revamp bench suite

* i kinda forgot to, you know, put the code in

* make sure nothing we dont get surprised by size limits

* tired copy pasting is dangerous copy pasting

* bruh
  • Loading branch information
teh-cmc authored Mar 30, 2023
1 parent 6d219e6 commit 680f791
Showing 1 changed file with 195 additions and 86 deletions.
281 changes: 195 additions & 86 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,77 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time
use re_log_types::{
component_types::{InstanceKey, Rect2D},
datagen::{build_frame_nr, build_some_instances, build_some_rects},
Component as _, ComponentName, DataRow, EntityPath, MsgId, TimeType, Timeline,
Component as _, ComponentName, DataRow, DataTable, EntityPath, MsgId, TimeType, Timeline,
};

criterion_group!(benches, insert, latest_at, latest_at_missing, range);
criterion_main!(benches);

// ---

#[cfg(not(debug_assertions))]
const NUM_FRAMES: i64 = 100;
const NUM_ROWS: i64 = 1_000;
#[cfg(not(debug_assertions))]
const NUM_RECTS: i64 = 100;
const NUM_INSTANCES: i64 = 1_000;

// `cargo test` also runs the benchmark setup code, so make sure they run quickly:
#[cfg(debug_assertions)]
const NUM_FRAMES: i64 = 1;
const NUM_ROWS: i64 = 1;
#[cfg(debug_assertions)]
const NUM_RECTS: i64 = 1;
const NUM_INSTANCES: i64 = 1;

// --- Benchmarks ---

// TODO(cmc): need additional benches for full tables

fn insert(c: &mut Criterion) {
{
let rows = build_rows(NUM_RECTS as usize);
let mut group = c.benchmark_group("datastore/insert/batch/rects");
for packed in [false, true] {
let mut group = c.benchmark_group(format!(
"datastore/num_rows={NUM_ROWS}/num_instances={NUM_INSTANCES}/packed={packed}/insert"
));
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
(NUM_INSTANCES * NUM_ROWS) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_rows(Default::default(), InstanceKey::name(), rows.iter()));

let table = build_table(NUM_INSTANCES as usize, packed);

// Default config
group.bench_function("default", |b| {
b.iter(|| insert_table(Default::default(), InstanceKey::name(), &table));
});

// Emulate more or less buckets
let num_rows_per_bucket = [0, 2, 32, 2048];
for num_rows_per_bucket in num_rows_per_bucket {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
insert_table(
DataStoreConfig {
index_bucket_nb_rows: num_rows_per_bucket,
component_bucket_nb_rows: num_rows_per_bucket,
index_bucket_size_bytes: u64::MAX,
component_bucket_size_bytes: u64::MAX,
..Default::default()
},
InstanceKey::name(),
&table,
)
});
});
}
}
}

fn latest_at_batch(c: &mut Criterion) {
{
let rows = build_rows(NUM_RECTS as usize);
let store = insert_rows(Default::default(), InstanceKey::name(), rows.iter());
let mut group = c.benchmark_group("datastore/latest_at/batch/rects");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("query", |b| {
fn latest_at(c: &mut Criterion) {
for packed in [false, true] {
let mut group = c.benchmark_group(format!(
"datastore/num_rows={NUM_ROWS}/num_instances={NUM_INSTANCES}/packed={packed}/latest_at"
));
group.throughput(criterion::Throughput::Elements(NUM_INSTANCES as _));

let table = build_table(NUM_INSTANCES as usize, packed);

// Default config
group.bench_function("default", |b| {
let store = insert_table(Default::default(), InstanceKey::name(), &table);
b.iter(|| {
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = results[0]
Expand All @@ -56,40 +87,59 @@ fn latest_at_batch(c: &mut Criterion) {
.as_any()
.downcast_ref::<UnionArray>()
.unwrap();
assert_eq!(NUM_RECTS as usize, rects.len());
assert_eq!(NUM_INSTANCES as usize, rects.len());
});
});

// Emulate more or less buckets
let num_rows_per_bucket = [0, 2, 32, 2048];
for num_rows_per_bucket in num_rows_per_bucket {
let store = insert_table(
DataStoreConfig {
index_bucket_nb_rows: num_rows_per_bucket,
component_bucket_nb_rows: num_rows_per_bucket,
index_bucket_size_bytes: u64::MAX,
component_bucket_size_bytes: u64::MAX,
..Default::default()
},
InstanceKey::name(),
&table,
);
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = results[0]
.as_ref()
.unwrap()
.as_any()
.downcast_ref::<UnionArray>()
.unwrap();
assert_eq!(NUM_INSTANCES as usize, rects.len());
});
});
}
}
}

fn latest_at_missing_components(c: &mut Criterion) {
// Simulate the worst possible case: many many buckets.
let config = DataStoreConfig {
index_bucket_size_bytes: 0,
index_bucket_nb_rows: 0,
..Default::default()
};

{
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(config.clone(), InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/latest_at/missing_components");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("primary", |b| {
fn latest_at_missing(c: &mut Criterion) {
for packed in [false, true] {
let mut group = c.benchmark_group(format!(
"datastore/num_rows={NUM_ROWS}/num_instances={NUM_INSTANCES}/packed={packed}/latest_at_missing"
));
group.throughput(criterion::Throughput::Elements(NUM_INSTANCES as _));

let table = build_table(NUM_INSTANCES as usize, packed);

// Default config
let store = insert_table(Default::default(), InstanceKey::name(), &table);
group.bench_function("primary/default", |b| {
b.iter(|| {
let results =
latest_data_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
assert!(results[0].is_none());
});
});
}

{
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(config, InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/latest_at/missing_components");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("secondaries", |b| {
group.bench_function("secondaries/default", |b| {
b.iter(|| {
let results = latest_data_at(
&store,
Expand All @@ -105,69 +155,131 @@ fn latest_at_missing_components(c: &mut Criterion) {
assert!(results[2].is_none());
});
});

// Emulate more or less buckets
let num_rows_per_bucket = [0, 2, 32, 2048];
for num_rows_per_bucket in num_rows_per_bucket {
let store = insert_table(
DataStoreConfig {
index_bucket_nb_rows: num_rows_per_bucket,
component_bucket_nb_rows: num_rows_per_bucket,
index_bucket_size_bytes: u64::MAX,
component_bucket_size_bytes: u64::MAX,
..Default::default()
},
InstanceKey::name(),
&table,
);
group.bench_function(format!("primary/bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let results =
latest_data_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
assert!(results[0].is_none());
});
});
group.bench_function(format!("secondaries/bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let results = latest_data_at(
&store,
Rect2D::name(),
&[
"non_existing_component1".into(),
"non_existing_component2".into(),
"non_existing_component3".into(),
],
);
assert!(results[0].is_none());
assert!(results[1].is_none());
assert!(results[2].is_none());
});
});
}
}
}

fn range_batch(c: &mut Criterion) {
{
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(Default::default(), InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/range/batch/rects");
fn range(c: &mut Criterion) {
for packed in [false, true] {
let mut group = c.benchmark_group(format!(
"datastore/num_rows={NUM_ROWS}/num_instances={NUM_INSTANCES}/packed={packed}/range"
));
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
(NUM_INSTANCES * NUM_ROWS) as _,
));
group.bench_function("query", |b| {
b.iter(|| {
let msgs = range_data(&store, [Rect2D::name()]);
for (cur_time, (time, results)) in msgs.enumerate() {
let time = time.unwrap();
assert_eq!(cur_time as i64, time.as_i64());

let rects = results[0]
.as_ref()
.unwrap()
.as_any()
.downcast_ref::<UnionArray>()
.unwrap();
assert_eq!(NUM_RECTS as usize, rects.len());
}
});
let table = build_table(NUM_INSTANCES as usize, packed);

// Default config
group.bench_function("default", |b| {
b.iter(|| insert_table(Default::default(), InstanceKey::name(), &table));
});

// Emulate more or less buckets
let num_rows_per_bucket = [0, 2, 32, 2048];
for num_rows_per_bucket in num_rows_per_bucket {
let store = insert_table(
DataStoreConfig {
index_bucket_nb_rows: num_rows_per_bucket,
component_bucket_nb_rows: num_rows_per_bucket,
index_bucket_size_bytes: u64::MAX,
component_bucket_size_bytes: u64::MAX,
..Default::default()
},
InstanceKey::name(),
&table,
);
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let msgs = range_data(&store, [Rect2D::name()]);
for (cur_time, (time, results)) in msgs.enumerate() {
let time = time.unwrap();
assert_eq!(cur_time as i64, time.as_i64());

let rects = results[0]
.as_ref()
.unwrap()
.as_any()
.downcast_ref::<UnionArray>()
.unwrap();
assert_eq!(NUM_INSTANCES as usize, rects.len());
}
});
});
}
}
}

criterion_group!(
benches,
insert,
latest_at_batch,
latest_at_missing_components,
range_batch,
);
criterion_main!(benches);

// --- Helpers ---

fn build_rows(n: usize) -> Vec<DataRow> {
(0..NUM_FRAMES)
.map(move |frame_idx| {
fn build_table(n: usize, packed: bool) -> DataTable {
let mut table = DataTable::from_rows(
MsgId::ZERO,
(0..NUM_ROWS).map(move |frame_idx| {
DataRow::from_cells2(
MsgId::random(),
"rects",
[build_frame_nr(frame_idx.into())],
n as _,
(build_some_instances(n), build_some_rects(n)),
)
})
.collect()
}),
);

// Do a serialization roundtrip to pack everything in contiguous memory.
if packed {
let (schema, columns) = table.serialize().unwrap();
table = DataTable::deserialize(MsgId::ZERO, &schema, &columns).unwrap();
}

table
}

fn insert_rows<'a>(
fn insert_table(
config: DataStoreConfig,
cluster_key: ComponentName,
rows: impl Iterator<Item = &'a DataRow>,
table: &DataTable,
) -> DataStore {
let mut store = DataStore::new(cluster_key, config);
rows.for_each(|row| store.insert_row(row).unwrap());
store.insert_table(table).unwrap();
store
}

Expand All @@ -177,7 +289,7 @@ fn latest_data_at<const N: usize>(
secondaries: &[ComponentName; N],
) -> [Option<Box<dyn Array>>; N] {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let timeline_query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES / 2).into());
let timeline_query = LatestAtQuery::new(timeline_frame_nr, (NUM_ROWS / 2).into());
let ent_path = EntityPath::from("rects");

let row_indices = store
Expand All @@ -191,10 +303,7 @@ fn range_data<const N: usize>(
components: [ComponentName; N],
) -> impl Iterator<Item = (Option<TimeInt>, [Option<Box<dyn Array>>; N])> + '_ {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery::new(
timeline_frame_nr,
TimeRange::new(0.into(), NUM_FRAMES.into()),
);
let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(0.into(), NUM_ROWS.into()));
let ent_path = EntityPath::from("rects");

store
Expand Down

0 comments on commit 680f791

Please sign in to comment.