Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re_datastore: component chunks & streamlining batches #584

Merged
merged 28 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
940364b
get is supposed to return a row, not a [row]
teh-cmc Dec 18, 2022
fcf6d5a
unwrap note
teh-cmc Dec 18, 2022
1a86bee
the bench too
teh-cmc Dec 18, 2022
9e22ac1
self review
teh-cmc Dec 18, 2022
2fac6d2
doc test also
teh-cmc Dec 18, 2022
8682229
and re_query ofc!
teh-cmc Dec 18, 2022
b7e5fd5
slicing is _very_ slow, don't do it if you don't have to
teh-cmc Dec 18, 2022
da816d1
no more col_arrays in re_query
teh-cmc Dec 18, 2022
b22eecc
there's actually no need for concatenating at all
teh-cmc Dec 16, 2022
4a7b7ef
incrementally compute and cache bucket sizes
teh-cmc Dec 17, 2022
f88f248
cleaning up and documenting existing limitations
teh-cmc Dec 17, 2022
2bcd47e
introducing bucket retirement
teh-cmc Dec 17, 2022
c8b40b6
issue ref
teh-cmc Dec 17, 2022
36ce4db
some more doc stuff
teh-cmc Dec 17, 2022
751c2e8
self-review
teh-cmc Dec 17, 2022
6776365
polars/fmt should always be there for tests
teh-cmc Dec 18, 2022
9652013
streamlining batch support
teh-cmc Dec 18, 2022
bce700e
take list header into account
teh-cmc Dec 18, 2022
5c6fec8
it's fine
teh-cmc Dec 18, 2022
e97eab6
self-review
teh-cmc Dec 18, 2022
37cd9b2
just something i want to keep around for later
teh-cmc Dec 18, 2022
4682c60
(un)wrapping lists is a bit slow... and slicing them is _extremely_ s…
teh-cmc Dec 18, 2022
5abfffe
merge cmc/datastore/get_a_single_row (#590)
teh-cmc Dec 18, 2022
02170b9
no more col_arrays in re_query
teh-cmc Dec 18, 2022
d3a33cf
addressing PR comments, I hope
teh-cmc Dec 18, 2022
9091e29
Merge remote-tracking branch 'origin/main' into cmc/datastore/get_rid…
teh-cmc Dec 18, 2022
af046b5
missed a couple
teh-cmc Dec 18, 2022
5d19e5f
addressed PR comments
teh-cmc Dec 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 90 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ thiserror.workspace = true
criterion = "0.4"
itertools = "0.10"
mimalloc = "0.1"
polars-core = { workspace = true, features = [
"dtype-date",
"dtype-time",
"dtype-struct",
"fmt",
] }
tracing-subscriber = "0.3"

[lib]
Expand Down
65 changes: 59 additions & 6 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,60 @@ pub struct ComponentBucket {
/// The offset of this bucket in the global table.
pub(crate) row_offset: RowIndex,

/// Has this bucket been archived yet?
///
/// For every `ComponentTable`, there can only be one active bucket at a time (i.e. the bucket
/// that is currently accepting write requests), all the others are archived.
/// When the currently active bucket is full, it is archived in turn, and a new bucket is
/// created to take its place.
///
/// Archiving a bucket is a good opportunity to run some maintenance tasks on it, e.g.
/// compaction (concatenating all chunks down to a single one).
/// Currently, an archived bucket is guaranteed to have these properties:
/// - the bucket is full (it has reached the maximum allowed length and/or size),
/// - the bucket has been compacted,
/// - the bucket is only used for reads.
pub(crate) archived: bool,

/// The time ranges (plural!) covered by this bucket.
/// Buckets are never sorted over time, so these time ranges can grow arbitrarily large.
///
/// These are only used for garbage collection.
pub(crate) time_ranges: HashMap<Timeline, TimeRange>,

/// All the data for this bucket. This is a single column!
pub(crate) data: Box<dyn Array>,
/// All the data for this bucket: many rows of a single column.
///
/// Each chunk is a list of arrays of structs, i.e. `ListArray<StructArray>`:
/// - the list layer corresponds to the different rows,
/// - the array layer corresponds to the different instances within a single row,
/// - and finally the struct layer holds the components themselves.
/// E.g.:
/// ```ignore
/// [
/// [{x: 8.687487, y: 1.9590926}, {x: 2.0559108, y: 0.1494348}, {x: 7.09219, y: 0.9616637}],
/// [{x: 7.158843, y: 0.68897724}, {x: 8.934421, y: 2.8420508}],
/// ]
/// ```
///
/// During the active lifespan of the bucket, this can contain any number of chunks,
/// depending on how the data was inserted (e.g. single insertions vs. batches).
/// All of these chunks get compacted into one contiguous array when the bucket is archived,
/// i.e. when the bucket is full and a new one is created.
///
/// Note that, as of today, we do not actually support batched insertion nor do we support
/// chunks of non-unit length (batches are inserted on a per-row basis internally).
/// As a result, chunks always contain one and only one row's worth of data, at least until
/// the bucket is archived and compacted.
/// See also #589.
pub(crate) chunks: Vec<Box<dyn Array>>,

/// The total number of rows present in this bucket, across all chunks.
pub(crate) total_rows: u64,
/// The size of this bucket in bytes, across all chunks.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
pub(crate) total_size_bytes: u64,
}

impl std::fmt::Display for ComponentBucket {
Expand All @@ -862,14 +908,16 @@ impl std::fmt::Display for ComponentBucket {
// - all buckets that follow are lazily instantiated when data get inserted
//
// TODO(#439): is that still true with deletion?
// TODO(#589): support for non-unit-length chunks
self.row_offset.as_u64()
+ self
.data
.chunks
.len()
.checked_sub(1)
.expect("buckets are never empty") as u64,
))?;

f.write_fmt(format_args!("archived: {}\n", self.archived))?;
f.write_str("time ranges:\n")?;
for (timeline, time_range) in &self.time_ranges {
f.write_fmt(format_args!(
Expand All @@ -878,7 +926,12 @@ impl std::fmt::Display for ComponentBucket {
))?;
}

let chunk = Chunk::new(vec![self.data()]);
let rows = {
use arrow2::compute::concatenate::concatenate;
let chunks = self.chunks.iter().map(|chunk| &**chunk).collect::<Vec<_>>();
vec![concatenate(&chunks).unwrap()]
};
let chunk = Chunk::new(rows);
f.write_str(&arrow2::io::print::write(&[chunk], &[self.name.as_str()]))?;

Ok(())
Expand All @@ -888,12 +941,12 @@ impl std::fmt::Display for ComponentBucket {
impl ComponentBucket {
/// Returns the number of rows stored across this bucket.
pub fn total_rows(&self) -> u64 {
self.data.len() as u64
self.total_rows
}

/// Returns the size of the data stored across this bucket, in bytes.
pub fn total_size_bytes(&self) -> u64 {
arrow2::compute::aggregate::estimated_bytes_size(&*self.data) as u64
self.total_size_bytes
}
}

Expand Down
37 changes: 23 additions & 14 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ impl DataStore {

/// Retrieves the data associated with a list of `components` at the specified `indices`.
///
/// If the associated data is found, it will be written to returned array at the appropriate
/// index, or `None` otherwise.
/// If the associated data is found, it will be written into the returned array at the
/// appropriate index, or `None` otherwise.
///
/// `row_indices` takes a list of options so that one can easily re-use the results obtained
/// from [`Self::query`].
Expand Down Expand Up @@ -574,30 +574,39 @@ impl ComponentTable {
}

impl ComponentBucket {
/// Get this `ComponentBucket`s debug name
/// Returns the name of the component stored in this bucket.
#[allow(dead_code)]
pub fn name(&self) -> &str {
&self.name
}

/// Returns a shallow clone of the row data for the given `row_idx`.
/// Returns a shallow clone of the row data present at the given `row_idx`.
pub fn get(&self, row_idx: RowIndex) -> Box<dyn Array> {
let row_idx = row_idx.as_u64() - self.row_offset.as_u64();
// This has to be safe to unwrap, otherwise it would never have made it past insertion.
self.data
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
.value(row_idx as _)
if self.archived {
debug_assert_eq!(self.chunks.len(), 1);
self.chunks[0]
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
.value(row_idx as _)
} else {
self.chunks[row_idx as usize]
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
.value(0)
}
}

/// Returns the entire data Array in this component
pub fn data(&self) -> Box<dyn Array> {
// shallow copy
self.data.clone()
/// Returns a shallow clone of all the chunks in this bucket.
#[allow(dead_code)]
pub fn data(&self) -> Vec<Box<dyn Array>> {
self.chunks.clone() // shallow
}

/// Return an iterator over the time ranges in this bucket
/// Return an iterator over the time ranges in this bucket.
#[allow(dead_code)]
pub fn iter_time_ranges(&self) -> impl Iterator<Item = (&Timeline, &TimeRange)> {
self.time_ranges.iter()
Expand Down
Loading