From 9a994a01422e7f3ef800c6448ec0e245424d00fc Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 4 Sep 2024 10:29:45 +0200 Subject: [PATCH] Dataframe queries 3: dense range (#7341) Implements the dense range dataframe APIs. Examples: ``` cargo r --all-features -p re_dataframe --example range -- /tmp/data.rrd /helix/structure/scaffolding/beads cargo r --all-features -p re_dataframe --example range -- /tmp/data.rrd /helix/structure/scaffolding/beads /helix/structure/scaffolding/** ``` ```rust use itertools::Itertools as _; use re_chunk_store::{ ChunkStore, ChunkStoreConfig, ComponentColumnDescriptor, RangeQueryExpression, Timeline, VersionPolicy, }; use re_dataframe::QueryEngine; use re_log_types::{ResolvedTimeRange, StoreKind}; fn main() -> anyhow::Result<()> { let args = std::env::args().collect_vec(); let get_arg = |i| { let Some(value) = args.get(i) else { eprintln!( "Usage: {} [entity_path_expr]", args.first().map_or("$BIN", |s| s.as_str()) ); std::process::exit(1); }; value }; let path_to_rrd = get_arg(1); let entity_path_pov = get_arg(2).as_str(); let entity_path_expr = args.get(3).map_or("/**", |s| s.as_str()); let stores = ChunkStore::from_rrd_filepath( &ChunkStoreConfig::DEFAULT, path_to_rrd, VersionPolicy::Warn, )?; for (store_id, store) in &stores { if store_id.kind != StoreKind::Recording { continue; } let cache = re_dataframe::external::re_query::Caches::new(store); let engine = QueryEngine { store, cache: &cache, }; let query = RangeQueryExpression { entity_path_expr: entity_path_expr.into(), timeline: Timeline::log_tick(), time_range: ResolvedTimeRange::new(0, 30), pov: ComponentColumnDescriptor::new::( entity_path_pov.into(), ), }; let query_handle = engine.range(&query, None /* columns */); eprintln!("{query}:"); for batch in query_handle.into_iter() { eprintln!("{batch}"); } } Ok(()) } ``` * Fixes #7284 --- Dataframe APIs PR series: - #7338 - #7339 - #7340 - #7341 - #7345 --- crates/store/re_chunk_store/src/dataframe.rs | 2 +- crates/store/re_dataframe/examples/range.rs | 62 +++++ crates/store/re_dataframe/src/engine.rs | 13 +- crates/store/re_dataframe/src/lib.rs | 2 + crates/store/re_dataframe/src/range.rs | 259 +++++++++++++++++++ 5 files changed, 327 insertions(+), 11 deletions(-) create mode 100644 crates/store/re_dataframe/examples/range.rs create mode 100644 crates/store/re_dataframe/src/range.rs diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index dcee8a27c219..16e0c68c055c 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -598,7 +598,7 @@ impl ChunkStore { /// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...); /// * third, the component columns in lexical order (`Color`, `Radius, ...`). /// - /// This does not run a full-blown query, but rather just inspects [`Chunk`]-level metadata, + /// This does not run a full-blown query, but rather just inspects `Chunk`-level metadata, /// which can lead to false positives, but makes this very cheap to compute. pub fn schema_for_query(&self, query: &QueryExpression) -> Vec { re_tracing::profile_function!(format!("{query:?}")); diff --git a/crates/store/re_dataframe/examples/range.rs b/crates/store/re_dataframe/examples/range.rs new file mode 100644 index 000000000000..9e844c5828b9 --- /dev/null +++ b/crates/store/re_dataframe/examples/range.rs @@ -0,0 +1,62 @@ +use itertools::Itertools as _; + +use re_chunk_store::{ + ChunkStore, ChunkStoreConfig, ComponentColumnDescriptor, RangeQueryExpression, Timeline, + VersionPolicy, +}; +use re_dataframe::QueryEngine; +use re_log_types::{ResolvedTimeRange, StoreKind}; + +fn main() -> anyhow::Result<()> { + let args = std::env::args().collect_vec(); + + let get_arg = |i| { + let Some(value) = args.get(i) else { + eprintln!( + "Usage: {} [entity_path_expr]", + args.first().map_or("$BIN", |s| s.as_str()) + ); + std::process::exit(1); + }; + value + }; + + let path_to_rrd = get_arg(1); + let entity_path_pov = get_arg(2).as_str(); + let entity_path_expr = args.get(3).map_or("/**", |s| s.as_str()); + + let stores = ChunkStore::from_rrd_filepath( + &ChunkStoreConfig::DEFAULT, + path_to_rrd, + VersionPolicy::Warn, + )?; + + for (store_id, store) in &stores { + if store_id.kind != StoreKind::Recording { + continue; + } + + let cache = re_dataframe::external::re_query::Caches::new(store); + let engine = QueryEngine { + store, + cache: &cache, + }; + + let query = RangeQueryExpression { + entity_path_expr: entity_path_expr.into(), + timeline: Timeline::log_tick(), + time_range: ResolvedTimeRange::new(0, 30), + pov: ComponentColumnDescriptor::new::( + entity_path_pov.into(), + ), + }; + + let query_handle = engine.range(&query, None /* columns */); + eprintln!("{query}:"); + for batch in query_handle.into_iter() { + eprintln!("{batch}"); + } + } + + Ok(()) +} diff --git a/crates/store/re_dataframe/src/engine.rs b/crates/store/re_dataframe/src/engine.rs index b8959b468d89..ad3c318ac688 100644 --- a/crates/store/re_dataframe/src/engine.rs +++ b/crates/store/re_dataframe/src/engine.rs @@ -4,7 +4,7 @@ use re_chunk_store::{ }; use re_query::Caches; -use crate::LatestAtQueryHandle; +use crate::{LatestAtQueryHandle, RangeQueryHandle}; // --- @@ -15,8 +15,6 @@ use crate::LatestAtQueryHandle; // TODO(cmc): add an `arrow` feature to transportchunk in a follow-up pr and call it a day. pub type RecordBatch = TransportChunk; -pub struct RangeQueryHandle<'a>(&'a ()); - /// A generic handle to a query that is ready to be executed. pub enum QueryHandle<'a> { LatestAt(LatestAtQueryHandle<'a>), @@ -80,7 +78,7 @@ impl QueryEngine<'_> { /// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...); /// * third, the component columns in lexical order (`Color`, `Radius, ...`). /// - /// This does not run a full-blown query, but rather just inspects [`Chunk`]-level metadata, + /// This does not run a full-blown query, but rather just inspects `Chunk`-level metadata, /// which can lead to false positives, but makes this very cheap to compute. #[inline] pub fn schema_for_query(&self, query: &QueryExpression) -> Vec { @@ -93,7 +91,6 @@ impl QueryEngine<'_> { /// * [`Self::latest_at`] /// * [`Self::range`] #[inline] - #[allow(clippy::unimplemented, clippy::needless_pass_by_value)] pub fn query( &self, query: &QueryExpression, @@ -145,15 +142,11 @@ impl QueryEngine<'_> { /// for a range of data on the `frame` timeline, but still include the `log_time` timeline in /// the result. #[inline] - #[allow(clippy::unimplemented, clippy::needless_pass_by_value)] pub fn range( &self, query: &RangeQueryExpression, columns: Option>, ) -> RangeQueryHandle<'_> { - _ = self; - _ = query; - _ = columns; - unimplemented!("TODO(cmc)") + RangeQueryHandle::new(self, query.clone(), columns) } } diff --git a/crates/store/re_dataframe/src/lib.rs b/crates/store/re_dataframe/src/lib.rs index 9ded485f5731..4cac3cb7007e 100644 --- a/crates/store/re_dataframe/src/lib.rs +++ b/crates/store/re_dataframe/src/lib.rs @@ -2,9 +2,11 @@ mod engine; mod latest_at; +mod range; pub use self::engine::{QueryEngine, QueryHandle, RecordBatch}; pub use self::latest_at::LatestAtQueryHandle; +pub use self::range::RangeQueryHandle; pub mod external { pub use re_chunk; diff --git a/crates/store/re_dataframe/src/range.rs b/crates/store/re_dataframe/src/range.rs new file mode 100644 index 000000000000..14239783670d --- /dev/null +++ b/crates/store/re_dataframe/src/range.rs @@ -0,0 +1,259 @@ +use std::{collections::VecDeque, sync::OnceLock}; + +use ahash::HashMap; +use arrow2::{ + array::{Array as ArrowArray, ListArray as ArrowListArray}, + chunk::Chunk as ArrowChunk, + datatypes::Schema as ArrowSchema, +}; +use itertools::Itertools; + +use re_chunk::{Chunk, LatestAtQuery, RangeQuery}; +use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, RangeQueryExpression}; + +use crate::{QueryEngine, RecordBatch}; + +// --- + +/// A handle to a range query, ready to be executed. +/// +/// Cheaply created via [`QueryEngine::range`]. +/// +/// See [`RangeQueryHandle::next_page`]. +// +// TODO(cmc): pagination support +// TODO(cmc): intra-timestamp decimation support +pub struct RangeQueryHandle<'a> { + /// Handle to the [`QueryEngine`]. + pub(crate) engine: &'a QueryEngine<'a>, + + /// The original query expression used to instantiate this handle. + pub(crate) query: RangeQueryExpression, + + /// The user-specified schema that describes any data returned through this handle, if any. + pub(crate) user_columns: Option>, + + /// Internal private state. Lazily computed. + /// + /// It is important that handles stay cheap to create. + state: OnceLock, +} + +/// Internal private state. Lazily computed. +struct RangeQuerytHandleState { + /// The final schema. + columns: Vec, + + /// All the [`Chunk`]s for the active point-of-view. + /// + /// These are already sorted and vertically sliced according to the query. + pov_chunks: Option>, +} + +impl<'a> RangeQueryHandle<'a> { + pub(crate) fn new( + engine: &'a QueryEngine<'a>, + query: RangeQueryExpression, + user_columns: Option>, + ) -> Self { + Self { + engine, + query, + user_columns, + state: Default::default(), + } + } +} + +impl RangeQueryHandle<'_> { + /// Lazily initialize internal private state. + /// + /// It is important that handles stay cheap to create. + fn init(&self) -> &RangeQuerytHandleState { + self.state.get_or_init(|| { + re_tracing::profile_scope!("init"); + + let columns = { + re_tracing::profile_scope!("compute schema"); + + self.user_columns.clone().unwrap_or_else(|| { + self.engine + .store + .schema_for_query(&self.query.clone().into()) + }) + }; + + let pov_chunks = { + re_tracing::profile_scope!("gather pov timestamps"); + + let query = RangeQuery::new(self.query.timeline, self.query.time_range) + .keep_extra_timelines(true) // we want all the timelines we can get! + .keep_extra_components(false); + + let results = self.engine.cache.range( + self.engine.store, + &query, + &self.query.pov.entity_path, + [self.query.pov.component_name], + ); + + results + .components + .into_iter() + .find_map(|(component_name, chunks)| { + (component_name == self.query.pov.component_name).then_some(chunks) + }) + .map(Into::into) + }; + + RangeQuerytHandleState { + columns, + pov_chunks, + } + }) + } + + /// All results returned by this handle will strictly follow this schema. + /// + /// Columns that do not yield any data will still be present in the results, filled with null values. + pub fn schema(&self) -> &[ColumnDescriptor] { + &self.init().columns + } + + /// Partially executes the range query until the next natural page of results. + /// + /// Returns a single [`RecordBatch`] containing as many rows as available in the page, or + /// `None` if all the dataset has been returned. + /// Each cell in the result corresponds to the latest known value at that particular point in time + /// for each respective `ColumnDescriptor`. + /// + /// The schema of the returned [`RecordBatch`] is guaranteed to match the one returned by + /// [`Self::schema`]. + /// Columns that do not yield any data will still be present in the results, filled with null values. + /// + /// "Natural pages" refers to pages of data that match 1:1 to the underlying storage. + /// The size of each page cannot be known in advance, as it depends on unspecified + /// implementation details. + /// This is the most performant way to iterate over the dataset. + /// + /// ```ignore + /// while let Some(batch) = query_handle.next_page() { + /// // … + /// } + /// ``` + pub fn next_page(&mut self) -> Option { + re_tracing::profile_function!(format!("{:?}", self.query)); + + _ = self.init(); + let pov_chunk = self.state.get_mut()?.pov_chunks.as_mut()?.pop_front()?; + let pov_time_column = pov_chunk.timelines().get(&self.query.timeline)?; + let columns = self.schema(); + + // TODO(cmc): There are more efficient, albeit infinitely more complicated ways to do this. + // Let's first implement all features (multi-PoV, pagination, timestamp streaming, etc) and + // see if this ever becomes an issue before going down this road. + // + // TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice. + let list_arrays: HashMap<&ComponentColumnDescriptor, ArrowListArray> = { + re_tracing::profile_scope!("queries"); + + columns + .iter() + .filter_map(|descr| match descr { + ColumnDescriptor::Component(descr) => Some(descr), + _ => None, + }) + .filter_map(|descr| { + let arrays = pov_time_column + .times() + .map(|time| { + let query = LatestAtQuery::new(self.query.timeline, time); + + let results = self.engine.cache.latest_at( + self.engine.store, + &query, + &descr.entity_path, + [descr.component_name], + ); + + results + .components + .get(&descr.component_name) + .and_then(|unit| { + unit.component_batch_raw(&descr.component_name).clone() + }) + }) + .collect_vec(); + let arrays = arrays + .iter() + .map(|array| array.as_ref().map(|array| &**array as &dyn ArrowArray)) + .collect_vec(); + + let list_array = + re_chunk::util::arrays_to_list_array(descr.datatype.clone(), &arrays); + + if cfg!(debug_assertions) { + #[allow(clippy::unwrap_used)] // want to crash in dev + Some((descr, list_array.unwrap())) + } else { + // NOTE: Technically cannot ever happen, but I'd rather that than an uwnrap. + list_array.map(|list_array| (descr, list_array)) + } + }) + .collect() + }; + + // NOTE: Keep in mind this must match the ordering specified by `Self::schema`. + let packed_arrays = { + re_tracing::profile_scope!("packing"); + + columns + .iter() + .map(|descr| match descr { + ColumnDescriptor::Control(_descr) => pov_chunk.row_ids_array().to_boxed(), + + ColumnDescriptor::Time(descr) => { + let time_column = pov_chunk.timelines().get(&descr.timeline).cloned(); + time_column.map_or_else( + || { + arrow2::array::new_null_array( + descr.datatype.clone(), + pov_chunk.num_rows(), + ) + }, + |time_column| time_column.times_array().to_boxed(), + ) + } + + ColumnDescriptor::Component(descr) => list_arrays.get(descr).map_or_else( + || { + arrow2::array::new_null_array( + descr.datatype.clone(), + pov_time_column.num_rows(), + ) + }, + |list_array| list_array.to_boxed(), + ), + }) + .collect_vec() + }; + + Some(RecordBatch { + schema: ArrowSchema { + fields: columns + .iter() + .map(ColumnDescriptor::to_arrow_field) + .collect(), + metadata: Default::default(), + }, + data: ArrowChunk::new(packed_arrays), + }) + } +} + +impl<'a> RangeQueryHandle<'a> { + #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work + pub fn into_iter(mut self) -> impl Iterator + 'a { + std::iter::from_fn(move || self.next_page()) + } +}