From 5155636fe12686169051b68063753df3dfccdcad Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 4 Sep 2024 10:45:48 +0200 Subject: [PATCH] Dataframe queries 4: paginated dense range (#7345) Implements the paginated dense range dataframe APIs. If there's no off-by-one anywhere in there, I will eat my hat. Getting this in the hands of people is the highest prio though, I'll add tests later. ![image](https://github.com/user-attachments/assets/e865ba62-21db-41c1-9899-35a0e7aea134) ![image](https://github.com/user-attachments/assets/32934ba8-2673-401a-aafc-409dfbe9b2c5) * Fixes #7284 --- Dataframe APIs PR series: - #7338 - #7339 - #7340 - #7341 - #7345 --- .../re_dataframe/examples/range_paginated.rs | 105 +++++++++++++++++ crates/store/re_dataframe/src/range.rs | 109 +++++++++++++++++- 2 files changed, 209 insertions(+), 5 deletions(-) create mode 100644 crates/store/re_dataframe/examples/range_paginated.rs diff --git a/crates/store/re_dataframe/examples/range_paginated.rs b/crates/store/re_dataframe/examples/range_paginated.rs new file mode 100644 index 000000000000..1efeb9484f67 --- /dev/null +++ b/crates/store/re_dataframe/examples/range_paginated.rs @@ -0,0 +1,105 @@ +#![allow(clippy::unwrap_used)] + +use itertools::Itertools as _; + +use re_chunk_store::{ + ChunkStore, ChunkStoreConfig, ComponentColumnDescriptor, RangeQueryExpression, Timeline, + VersionPolicy, +}; +use re_dataframe::{QueryEngine, RecordBatch}; +use re_log_types::{ResolvedTimeRange, StoreKind}; + +fn main() -> anyhow::Result<()> { + let args = std::env::args().collect_vec(); + + let get_arg = |i| { + let Some(value) = args.get(i) else { + eprintln!( + "Usage: {} [entity_path_expr]", + args.first().map_or("$BIN", |s| s.as_str()) + ); + std::process::exit(1); + }; + value + }; + + let path_to_rrd = get_arg(1); + let entity_path_pov = get_arg(2).as_str(); + let entity_path_expr = args.get(3).map_or("/**", |s| s.as_str()); + + let stores = ChunkStore::from_rrd_filepath( + &ChunkStoreConfig::ALL_DISABLED, + path_to_rrd, + VersionPolicy::Warn, + )?; + + for (store_id, store) in &stores { + if store_id.kind != StoreKind::Recording { + continue; + } + + let cache = re_dataframe::external::re_query::Caches::new(store); + let engine = QueryEngine { + store, + cache: &cache, + }; + + let query = RangeQueryExpression { + entity_path_expr: entity_path_expr.into(), + timeline: Timeline::log_tick(), + time_range: ResolvedTimeRange::new(0, 30), + pov: ComponentColumnDescriptor::new::( + entity_path_pov.into(), + ), + }; + + let query_handle = engine.range(&query, None /* columns */); + println!("{query}:"); + println!( + "num_chunks:{} num_rows:{}", + query_handle.num_chunks(), + query_handle.num_rows() + ); + + let (offset, len) = (0, 4); + println!("offset:{offset} len:{len}"); + concat_and_print(query_handle.get(offset, len)); + + let (offset, len) = (2, 4); + println!("offset:{offset} len:{len}"); + concat_and_print(query_handle.get(offset, len)); + + let (offset, len) = (10, 5); + println!("offset:{offset} len:{len}"); + concat_and_print(query_handle.get(offset, len)); + + let (offset, len) = (0, 15); + println!("offset:{offset} len:{len}"); + concat_and_print(query_handle.get(offset, len)); + } + + Ok(()) +} + +fn concat_and_print(chunks: Vec) { + use re_chunk::external::arrow2::{ + chunk::Chunk as ArrowChunk, compute::concatenate::concatenate, + }; + + let chunk = chunks.into_iter().reduce(|acc, chunk| RecordBatch { + schema: chunk.schema.clone(), + data: ArrowChunk::new( + acc.data + .iter() + .zip(chunk.data.iter()) + .map(|(l, r)| concatenate(&[&**l, &**r]).unwrap()) + .collect(), + ), + }); + + if let Some(chunk) = chunk { + println!("{chunk}"); + } else { + println!(""); + } +} diff --git a/crates/store/re_dataframe/src/range.rs b/crates/store/re_dataframe/src/range.rs index 14239783670d..822acddb5753 100644 --- a/crates/store/re_dataframe/src/range.rs +++ b/crates/store/re_dataframe/src/range.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, sync::OnceLock}; +use std::sync::{atomic::AtomicU64, OnceLock}; use ahash::HashMap; use arrow2::{ @@ -47,7 +47,12 @@ struct RangeQuerytHandleState { /// All the [`Chunk`]s for the active point-of-view. /// /// These are already sorted and vertically sliced according to the query. - pov_chunks: Option>, + pov_chunks: Option>, + + /// Tracks the current page index. Used for [`RangeQueryHandle::next_page`]. + // + // NOTE: The state is behind a `OnceLock`, the atomic just make some things simpler down the road. + cur_page: AtomicU64, } impl<'a> RangeQueryHandle<'a> { @@ -103,12 +108,12 @@ impl RangeQueryHandle<'_> { .find_map(|(component_name, chunks)| { (component_name == self.query.pov.component_name).then_some(chunks) }) - .map(Into::into) }; RangeQuerytHandleState { columns, pov_chunks, + cur_page: AtomicU64::new(0), } }) } @@ -144,8 +149,102 @@ impl RangeQueryHandle<'_> { pub fn next_page(&mut self) -> Option { re_tracing::profile_function!(format!("{:?}", self.query)); - _ = self.init(); - let pov_chunk = self.state.get_mut()?.pov_chunks.as_mut()?.pop_front()?; + let state = self.init(); + let cur_page = state.cur_page.load(std::sync::atomic::Ordering::Relaxed); + let pov_chunk = state.pov_chunks.as_ref()?.get(cur_page as usize)?; + _ = state + .cur_page + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + self.dense_batch_at_pov(pov_chunk) + } + + /// Partially executes the range query in order to return the specified range of rows. + /// + /// Returns a vector of [`RecordBatch`]es: as many as required to fill the specified range. + /// Each [`RecordBatch`] will correspond a "natural page" of data, even the first and last batch, + /// although they might be cut off at the edge. + /// Each cell in the result corresponds to the latest known value at that particular point in time + /// for each respective `ColumnDescriptor`. + /// + /// The schema of the returned [`RecordBatch`]es is guaranteed to match the one returned by + /// [`Self::schema`]. + /// Columns that do not yield any data will still be present in the results, filled with null values. + /// + /// "Natural pages" refers to pages of data that match 1:1 to the underlying storage. + /// The size of each page cannot be known in advance, as it depends on unspecified + /// implementation details. + /// This is the most performant way to iterate over the dataset. + // + // TODO(cmc): This could be turned into an actual lazy iterator at some point. + pub fn get(&self, offset: u64, mut len: u64) -> Vec { + let mut results = Vec::new(); + + let state = self.init(); + let Some(pov_chunks) = state.pov_chunks.as_ref() else { + return results; + }; + let mut pov_chunks = pov_chunks.iter(); + + let mut cur_offset = 0; + let Some(mut cur_pov_chunk) = pov_chunks.next().cloned() else { + return results; + }; + + // Fast-forward until the first relevant PoV chunk. + // + // TODO(cmc): should keep an extra sorted datastructure and use a binsearch instead. + while (cur_offset + cur_pov_chunk.num_rows() as u64) < offset { + cur_offset += cur_pov_chunk.num_rows() as u64; + + let Some(next_pov_chunk) = pov_chunks.next().cloned() else { + return results; + }; + cur_pov_chunk = next_pov_chunk; + } + + // Fast-forward to until the first relevant row in the PoV chunk. + let mut offset = if cur_offset < offset { + offset.saturating_sub(cur_offset) + } else { + 0 + }; + + // Repeatedly compute dense ranges until we've returned `len` rows. + while len > 0 { + cur_pov_chunk = cur_pov_chunk.row_sliced(offset as _, len as _); + results.extend(self.dense_batch_at_pov(&cur_pov_chunk)); + + offset = 0; // always start at the first row after the first chunk + len = len.saturating_sub(cur_pov_chunk.num_rows() as u64); + + let Some(next_pov_chunk) = pov_chunks.next().cloned() else { + break; + }; + cur_pov_chunk = next_pov_chunk; + } + + results + } + + /// How many chunks / natural pages of data will be returned? + #[inline] + pub fn num_chunks(&self) -> u64 { + self.init() + .pov_chunks + .as_ref() + .map_or(0, |pov_chunks| pov_chunks.len() as _) + } + + /// How many rows of data will be returned? + #[inline] + pub fn num_rows(&self) -> u64 { + self.init().pov_chunks.as_ref().map_or(0, |pov_chunks| { + pov_chunks.iter().map(|chunk| chunk.num_rows() as u64).sum() + }) + } + + fn dense_batch_at_pov(&self, pov_chunk: &Chunk) -> Option { let pov_time_column = pov_chunk.timelines().get(&self.query.timeline)?; let columns = self.schema();