Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re_datastore: component chunks & streamlining batches #584

Merged
merged 28 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
940364b
get is supposed to return a row, not a [row]
teh-cmc Dec 18, 2022
fcf6d5a
unwrap note
teh-cmc Dec 18, 2022
1a86bee
the bench too
teh-cmc Dec 18, 2022
9e22ac1
self review
teh-cmc Dec 18, 2022
2fac6d2
doc test also
teh-cmc Dec 18, 2022
8682229
and re_query ofc!
teh-cmc Dec 18, 2022
b7e5fd5
slicing is _very_ slow, don't do it if you don't have to
teh-cmc Dec 18, 2022
da816d1
no more col_arrays in re_query
teh-cmc Dec 18, 2022
b22eecc
there's actually no need for concatenating at all
teh-cmc Dec 16, 2022
4a7b7ef
incrementally compute and cache bucket sizes
teh-cmc Dec 17, 2022
f88f248
cleaning up and documenting existing limitations
teh-cmc Dec 17, 2022
2bcd47e
introducing bucket retirement
teh-cmc Dec 17, 2022
c8b40b6
issue ref
teh-cmc Dec 17, 2022
36ce4db
some more doc stuff
teh-cmc Dec 17, 2022
751c2e8
self-review
teh-cmc Dec 17, 2022
6776365
polars/fmt should always be there for tests
teh-cmc Dec 18, 2022
9652013
streamlining batch support
teh-cmc Dec 18, 2022
bce700e
take list header into account
teh-cmc Dec 18, 2022
5c6fec8
it's fine
teh-cmc Dec 18, 2022
e97eab6
self-review
teh-cmc Dec 18, 2022
37cd9b2
just something i want to keep around for later
teh-cmc Dec 18, 2022
4682c60
(un)wrapping lists is a bit slow... and slicing them is _extremely_ s…
teh-cmc Dec 18, 2022
5abfffe
merge cmc/datastore/get_a_single_row (#590)
teh-cmc Dec 18, 2022
02170b9
no more col_arrays in re_query
teh-cmc Dec 18, 2022
d3a33cf
addressing PR comments, I hope
teh-cmc Dec 18, 2022
9091e29
Merge remote-tracking branch 'origin/main' into cmc/datastore/get_rid…
teh-cmc Dec 18, 2022
af046b5
missed a couple
teh-cmc Dec 18, 2022
5d19e5f
addressed PR comments
teh-cmc Dec 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 90 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ thiserror.workspace = true
criterion = "0.4"
itertools = "0.10"
mimalloc = "0.1"
polars-core = { workspace = true, features = [
"dtype-date",
"dtype-time",
"dtype-struct",
"fmt",
] }
tracing-subscriber = "0.3"

[lib]
Expand Down
6 changes: 2 additions & 4 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use arrow2::array::{Array, ListArray, StructArray};
use arrow2::array::{Array, StructArray};
use criterion::{criterion_group, criterion_main, Criterion};

use re_arrow_store::{DataStore, TimeQuery, TimelineQuery};
Expand Down Expand Up @@ -89,9 +89,7 @@ fn query_messages(store: &mut DataStore) -> Box<dyn Array> {
let mut results = store.get(&[component], &row_indices);

let row = std::mem::take(&mut results[0]).unwrap();
let list = row.as_any().downcast_ref::<ListArray<i32>>().unwrap();
let rects = list.value(0);
let rects = rects.as_any().downcast_ref::<StructArray>().unwrap();
let rects = row.as_any().downcast_ref::<StructArray>().unwrap();
assert_eq!(NUM_RECTS as usize, rects.len());

row
Expand Down
55 changes: 49 additions & 6 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,50 @@ pub struct ComponentBucket {
/// The offset of this bucket in the global table.
pub(crate) row_offset: RowIndex,

/// Has this bucket been retired yet?
///
/// At any given moment, all buckets except the currently active one have to be retired.
pub(crate) retired: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a description of retired actually means.

From the PR description it sounds like it means that it is full, or that is full and has also been compacted.

If so, perhaps full, compacted or !active is a better name? "Retired" makes me think it is no longer in use. Or is this common DB vernacular?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Retired" in this case means that it's been archived and is now read-only, so it'll only be used for the read path from now on.

Being retired implies a bunch of things, currently:

  • the bucket is now read-only,
  • the bucket has been compacted (its chunks have been concatenated),
  • the bucket is inactive as far as the write path is concerned,
  • and probably more in the future.

I definitely need to improve the doc there. As for the name... maybe archived then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added something along the lines in the doc-comment 🤞


/// The time ranges (plural!) covered by this bucket.
/// Buckets are never sorted over time, so these time ranges can grow arbitrarily large.
///
/// These are only used for garbage collection.
pub(crate) time_ranges: HashMap<Timeline, TimeRange>,

/// All the data for this bucket. This is a single column!
pub(crate) data: Box<dyn Array>,
/// All the data for this bucket: many rows of a single column.
///
/// Each chunk is a list of list of components, i.e. `ListArray<ListArray<StructArray>>`:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Each chunk is a list of list of components, i.e. `ListArray<ListArray<StructArray>>`:
/// Each chunk is a list of list of components, i.e. `Vec<ListArray<StructArray>>`:

would match the actual type a bit closer, confusing me slightly less :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type of Self::chunks is Vec<ListArray<ListArray<StructArray>>>, but the type of each chunk is indeed exactly ListArray<ListArray<StructArray>>.

Copy link
Member

@jleibs jleibs Dec 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Emil here: a chunk is (and should be) still just be a single ListArray of components. I don't think we should ever have a ListArray<ListArray>. A ListArray already has the double-list property you're describing:

| Component  |
+------------+
| [ a, b, c] |
| [ d, e]    |

The array index corresponds to the different rows.
The list index within a row corresponds to the different instances within a single row.

Copy link
Member Author

@teh-cmc teh-cmc Dec 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah no re-reading this now I can see you're definitely both right; got my head all messed up earlier.

Will fix all these docs first thing tomorrow thanks 👍

/// - the first list layer corresponds to the different rows,
/// - the second list layer corresponds to the different instances within a single row,
/// - and finally the struct layer is the component itself.
/// E.g.:
/// ```ignore
/// [
/// [{x: 8.687487, y: 1.9590926}, {x: 2.0559108, y: 0.1494348}, {x: 7.09219, y: 0.9616637}],
/// [{x: 7.158843, y: 0.68897724}, {x: 8.934421, y: 2.8420508}],
/// ]
/// ```
///
/// During the active lifespan of the bucket, this can contain any number of chunks,
/// depending on how the data was inserted (e.g. single insertions vs. batches).
/// All of these chunks get compacted into one contiguous array when the bucket is retired,
/// i.e. when the bucket is full and a new one is created.
///
/// Note that, as of today, we do not actually support batched insertion nor do we support
/// chunks of non-unit length (batches are inserted on a per-row basis internally).
/// As a result, chunks always contain one and only one row's worth of data, at least until
/// the bucket is retired and compacted.
/// See also #589.
pub(crate) chunks: Vec<Box<dyn Array>>,

/// The total number of rows present in this bucket, across all chunks.
pub(crate) total_rows: u64,
/// The size of this bucket in bytes, across all chunks.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
pub(crate) total_size_bytes: u64,
}

impl std::fmt::Display for ComponentBucket {
Expand All @@ -862,14 +898,16 @@ impl std::fmt::Display for ComponentBucket {
// - all buckets that follow are lazily instantiated when data get inserted
//
// TODO(#439): is that still true with deletion?
// TODO(#589): support for non-unit-length chunks
self.row_offset.as_u64()
+ self
.data
.chunks
.len()
.checked_sub(1)
.expect("buckets are never empty") as u64,
))?;

f.write_fmt(format_args!("retired: {}\n", self.retired))?;
f.write_str("time ranges:\n")?;
for (timeline, time_range) in &self.time_ranges {
f.write_fmt(format_args!(
Expand All @@ -878,7 +916,12 @@ impl std::fmt::Display for ComponentBucket {
))?;
}

let chunk = Chunk::new(vec![self.data()]);
let rows = {
use arrow2::compute::concatenate::concatenate;
let chunks = self.chunks.iter().map(|chunk| &**chunk).collect::<Vec<_>>();
vec![concatenate(&chunks).unwrap()]
};
let chunk = Chunk::new(rows);
f.write_str(&arrow2::io::print::write(&[chunk], &[self.name.as_str()]))?;

Ok(())
Expand All @@ -888,12 +931,12 @@ impl std::fmt::Display for ComponentBucket {
impl ComponentBucket {
/// Returns the number of rows stored across this bucket.
pub fn total_rows(&self) -> u64 {
self.data.len() as u64
self.total_rows
}

/// Returns the size of the data stored across this bucket, in bytes.
pub fn total_size_bytes(&self) -> u64 {
arrow2::compute::aggregate::estimated_bytes_size(&*self.data) as u64
self.total_size_bytes
}
}

Expand Down
38 changes: 25 additions & 13 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::atomic::Ordering;

use arrow2::{
array::{Array, Int64Array, MutableArray, UInt64Array, UInt64Vec},
array::{Array, Int64Array, ListArray, MutableArray, UInt64Array, UInt64Vec},
datatypes::{DataType, TimeUnit},
};

Expand Down Expand Up @@ -110,8 +110,7 @@ impl DataStore {
/// .map(|(&component, col)| Series::try_from((component, col)).unwrap())
/// .collect();
///
/// let df = DataFrame::new(series).unwrap();
/// df.explode(df.get_column_names()).unwrap()
/// DataFrame::new(series).unwrap()
/// };
///
/// df
Expand Down Expand Up @@ -176,8 +175,8 @@ impl DataStore {

/// Retrieves the data associated with a list of `components` at the specified `indices`.
///
/// If the associated data is found, it will be written to returned array at the appropriate
/// index, or `None` otherwise.
/// If the associated data is found, it will be written into the returned array at the
/// appropriate index, or `None` otherwise.
///
/// `row_indices` takes a list of options so that one can easily re-use the results obtained
/// from [`Self::query`].
Expand Down Expand Up @@ -575,25 +574,38 @@ impl ComponentTable {
}

impl ComponentBucket {
/// Get this `ComponentBucket`s debug name
/// Returns the name of the component stored in this bucket.
#[allow(dead_code)]
pub fn name(&self) -> &str {
&self.name
}

// Panics on out-of-bounds
/// Returns a shallow clone of the row data present at the given `row_idx`.
pub fn get(&self, row_idx: RowIndex) -> Box<dyn Array> {
let row_idx = row_idx.as_u64() - self.row_offset.as_u64();
self.data.slice(row_idx as usize, 1)
// This has to be safe to unwrap, otherwise it would never have made it past insertion.
if self.retired {
self.chunks[0]
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
.value(row_idx as _)
} else {
self.chunks[row_idx as usize]
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
.value(0)
}
}

/// Returns the entire data Array in this component
pub fn data(&self) -> Box<dyn Array> {
// shallow copy
self.data.clone()
/// Returns a shallow clone of all the chunks in this bucket.
#[allow(dead_code)]
pub fn data(&self) -> Vec<Box<dyn Array>> {
self.chunks.clone() // shallow
}

/// Return an iterator over the time ranges in this bucket
/// Return an iterator over the time ranges in this bucket.
#[allow(dead_code)]
pub fn iter_time_ranges(&self) -> impl Iterator<Item = (&Timeline, &TimeRange)> {
self.time_ranges.iter()
Expand Down
Loading