From b568403b7ae913b9cff87c5174aa72003c2c9999 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 29 Aug 2024 16:59:41 +0200 Subject: [PATCH] implement latest-at dataframe queries --- .../store/re_dataframe/examples/latest_at.rs | 64 +++++ crates/store/re_dataframe/src/engine.rs | 17 +- crates/store/re_dataframe/src/latest_at.rs | 233 ++++++++++++++++++ crates/store/re_dataframe/src/lib.rs | 2 + 4 files changed, 306 insertions(+), 10 deletions(-) create mode 100644 crates/store/re_dataframe/examples/latest_at.rs create mode 100644 crates/store/re_dataframe/src/latest_at.rs diff --git a/crates/store/re_dataframe/examples/latest_at.rs b/crates/store/re_dataframe/examples/latest_at.rs new file mode 100644 index 0000000000000..38edfc44121d8 --- /dev/null +++ b/crates/store/re_dataframe/examples/latest_at.rs @@ -0,0 +1,64 @@ +use itertools::Itertools as _; + +use re_chunk::{TimeInt, Timeline}; +use re_chunk_store::{ChunkStore, ChunkStoreConfig, LatestAtQueryExpression, VersionPolicy}; +use re_dataframe::QueryEngine; +use re_log_types::StoreKind; + +fn main() -> anyhow::Result<()> { + let args = std::env::args().collect_vec(); + + let Some(path_to_rrd) = args.get(1) else { + eprintln!( + "Usage: {} ", + args.first().map_or("$BIN", |s| s.as_str()) + ); + std::process::exit(1); + }; + + let stores = ChunkStore::from_rrd_filepath( + &ChunkStoreConfig::DEFAULT, + path_to_rrd, + VersionPolicy::Warn, + )?; + + for (store_id, store) in &stores { + if store_id.kind != StoreKind::Recording { + continue; + } + + let cache = re_dataframe::external::re_query::Caches::new(store); + let engine = QueryEngine { + store, + cache: &cache, + }; + + { + let query = LatestAtQueryExpression { + entity_path_expr: "/**".into(), + timeline: Timeline::log_tick(), + at: TimeInt::new_temporal(30), + }; + + let query_handle = engine.latest_at(&query, None /* columns */); + let batch = query_handle.get(); + + eprintln!("{query}:\n{batch}"); + } + eprintln!("---"); + { + let query = LatestAtQueryExpression { + entity_path_expr: "/helix/structure/scaffolding/**".into(), + timeline: Timeline::log_tick(), + at: TimeInt::new_temporal(30), + }; + + let query_handle = engine.latest_at(&query, None /* columns */); + let batch = query_handle.get(); + + eprintln!("{query}:\n{batch}"); + } + } + + Ok(()) +} diff --git a/crates/store/re_dataframe/src/engine.rs b/crates/store/re_dataframe/src/engine.rs index 0cc390ba1eceb..b8959b468d899 100644 --- a/crates/store/re_dataframe/src/engine.rs +++ b/crates/store/re_dataframe/src/engine.rs @@ -4,6 +4,8 @@ use re_chunk_store::{ }; use re_query::Caches; +use crate::LatestAtQueryHandle; + // --- // TODO(#3741): `arrow2` has no concept of a `RecordBatch`, so for now we just use our trustworthy @@ -13,7 +15,6 @@ use re_query::Caches; // TODO(cmc): add an `arrow` feature to transportchunk in a follow-up pr and call it a day. pub type RecordBatch = TransportChunk; -pub struct LatestAtQueryHandle<'a>(&'a ()); pub struct RangeQueryHandle<'a>(&'a ()); /// A generic handle to a query that is ready to be executed. @@ -98,10 +99,10 @@ impl QueryEngine<'_> { query: &QueryExpression, columns: Option>, ) -> QueryHandle<'_> { - _ = self; - _ = query; - _ = columns; - unimplemented!("TODO(cmc)") + match query { + QueryExpression::LatestAt(query) => self.latest_at(query, columns).into(), + QueryExpression::Range(query) => self.range(query, columns).into(), + } } /// Creates a new [`LatestAtQueryHandle`], which can be used to perform a latest-at query. @@ -120,16 +121,12 @@ impl QueryEngine<'_> { /// for a range of data on the `frame` timeline, but still include the `log_time` timeline in /// the result. #[inline] - #[allow(clippy::unimplemented, clippy::needless_pass_by_value)] pub fn latest_at( &self, query: &LatestAtQueryExpression, columns: Option>, ) -> LatestAtQueryHandle<'_> { - _ = self; - _ = query; - _ = columns; - unimplemented!("TODO(cmc)") + LatestAtQueryHandle::new(self, query.clone(), columns) } /// Creates a new [`RangeQueryHandle`], which can be used to perform a range query. diff --git a/crates/store/re_dataframe/src/latest_at.rs b/crates/store/re_dataframe/src/latest_at.rs new file mode 100644 index 0000000000000..a11a79716719e --- /dev/null +++ b/crates/store/re_dataframe/src/latest_at.rs @@ -0,0 +1,233 @@ +use std::sync::OnceLock; + +use ahash::HashMap; +use arrow2::{ + array::Array as ArrowArray, chunk::Chunk as ArrowChunk, datatypes::Schema as ArrowSchema, +}; +use itertools::Itertools; + +use re_chunk::{LatestAtQuery, TimeInt, Timeline, UnitChunkShared}; +use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, LatestAtQueryExpression}; + +use crate::{QueryEngine, RecordBatch}; + +// --- + +/// A handle to a latest-at query, ready to be executed. +/// +/// Cheaply created via [`QueryEngine::latest_at`]. +/// +/// See [`LatestAtQueryHandle::get`]. +pub struct LatestAtQueryHandle<'a> { + /// Handle to the [`QueryEngine`]. + engine: &'a QueryEngine<'a>, + + /// The original query expression used to instantiate this handle. + query: LatestAtQueryExpression, + + /// The user-specified schema that describes any data returned through this handle, if any. + user_columns: Option>, + + /// Internal private state. Lazily computed. + /// + /// It is important that handles stay cheap to create. + state: OnceLock, +} + +/// Internal private state. Lazily computed. +struct LatestAtQueryHandleState { + /// The final schema. + columns: Vec, +} + +impl<'a> LatestAtQueryHandle<'a> { + pub(crate) fn new( + engine: &'a QueryEngine<'a>, + query: LatestAtQueryExpression, + user_columns: Option>, + ) -> Self { + Self { + engine, + query, + user_columns, + state: Default::default(), + } + } +} + +impl LatestAtQueryHandle<'_> { + /// All results returned by this handle will strictly follow this schema. + /// + /// Columns that do not yield any data will still be present in the results, filled with null values. + pub fn schema(&self) -> &[ColumnDescriptor] { + let state = self.state.get_or_init(|| { + let columns = { + re_tracing::profile_scope!("compute schema"); + + self.user_columns + .clone() + .unwrap_or_else(|| { + self.engine + .store + .schema_for_query(&self.query.clone().into()) + }) + .into_iter() + // NOTE: We drop `RowId`, as it doesn't make any sense in a compound row like the + // one we are returning. + .filter(|descr| !matches!(descr, ColumnDescriptor::Control(_))) + .collect() + }; + + LatestAtQueryHandleState { columns } + }); + + &state.columns + } + + /// Performs the latest-at query. + /// + /// Returns a single [`RecordBatch`] containing a single row, where each cell corresponds to + /// the latest known value at that particular point in time for each respective `ColumnDescriptor`. + /// + /// The schema of the returned [`RecordBatch`] is guaranteed to match the one returned by + /// [`Self::schema`]. + /// Columns that do not yield any data will still be present in the results, filled with null values. + pub fn get(&self) -> RecordBatch { + re_tracing::profile_function!(format!("{:?}", self.query)); + + let columns = self.schema(); + + let schema = ArrowSchema { + fields: columns + .iter() + .map(ColumnDescriptor::to_arrow_field) + .collect(), + + // TODO(#6889): properly some sorbet stuff we want to get in there at some point. + metadata: Default::default(), + }; + + let all_units: HashMap<&ComponentColumnDescriptor, UnitChunkShared> = { + re_tracing::profile_scope!("queries"); + + // TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice. + let query = LatestAtQuery::new(self.query.timeline, self.query.at); + columns + .iter() + .filter_map(|descr| match descr { + ColumnDescriptor::Component(descr) => { + let results = self.engine.cache.latest_at( + self.engine.store, + &query, + &descr.entity_path, + [descr.component_name], + ); + + results + .components + .get(&descr.component_name) + .cloned() + .map(|chunk| (descr, chunk)) + } + + _ => None, + }) + .collect() + }; + + let mut max_time_per_timeline = HashMap::::default(); + { + re_tracing::profile_scope!("compound times"); + + let timelines = columns + .iter() + .filter_map(|descr| match descr { + ColumnDescriptor::Time(descr) => Some(descr.timeline), + _ => None, + }) + .collect_vec(); + + for unit in all_units.values() { + for &timeline in &timelines { + if let Some((time, _)) = unit.index(&timeline) { + max_time_per_timeline + .entry(timeline) + .and_modify(|(cur_time, cur_unit)| { + if *cur_time < time { + *cur_time = time; + *cur_unit = unit.clone(); + } + }) + .or_insert_with(|| (time, unit.clone())); + } + } + } + } + + // NOTE: Keep in mind this must match the ordering specified by `Self::schema`. + let packed_arrays = { + re_tracing::profile_scope!("packing"); + + columns + .iter() + .filter_map(|descr| match descr { + ColumnDescriptor::Control(_) => { + if cfg!(debug_assertions) { + unreachable!("filtered out during schema computation"); + } else { + // NOTE: Technically cannot ever happen, but I'd rather that than an uwnrap. + None + } + } + + ColumnDescriptor::Time(descr) => { + let time_column = max_time_per_timeline + .remove(&descr.timeline) + .and_then(|(_, chunk)| chunk.timelines().get(&descr.timeline).cloned()); + + Some(if cfg!(debug_assertions) { + #[allow(clippy::unwrap_used)] + time_column.unwrap().times_array().to_boxed() + } else { + // NOTE: Technically cannot ever happen, but I'd rather that than an uwnrap. + time_column.map_or_else( + || arrow2::array::new_null_array(descr.datatype.clone(), 1), + |time_column| time_column.times_array().to_boxed(), + ) + }) + } + + ColumnDescriptor::Component(descr) => Some( + all_units + .get(descr) + .and_then(|chunk| chunk.components().get(&descr.component_name)) + .map_or_else( + || arrow2::array::new_null_array(descr.datatype.clone(), 1), + |list_array| list_array.to_boxed(), + ), + ), + }) + .collect() + }; + + RecordBatch { + schema, + data: ArrowChunk::new(packed_arrays), + } + } +} + +impl<'a> LatestAtQueryHandle<'a> { + #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work + pub fn into_iter(self) -> impl Iterator + 'a { + let mut yielded = false; + std::iter::from_fn(move || { + if yielded { + None + } else { + yielded = true; + Some(self.get()) + } + }) + } +} diff --git a/crates/store/re_dataframe/src/lib.rs b/crates/store/re_dataframe/src/lib.rs index 7f1b2dc39ceac..9ded485f5731e 100644 --- a/crates/store/re_dataframe/src/lib.rs +++ b/crates/store/re_dataframe/src/lib.rs @@ -1,8 +1,10 @@ //! The Rerun public data APIs. Get dataframes back from your Rerun datastore. mod engine; +mod latest_at; pub use self::engine::{QueryEngine, QueryHandle, RecordBatch}; +pub use self::latest_at::LatestAtQueryHandle; pub mod external { pub use re_chunk;