Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataframe queries 3: dense range #7341

Merged
merged 4 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ impl ChunkStore {
/// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...);
/// * third, the component columns in lexical order (`Color`, `Radius, ...`).
///
/// This does not run a full-blown query, but rather just inspects [`Chunk`]-level metadata,
/// This does not run a full-blown query, but rather just inspects `Chunk`-level metadata,
/// which can lead to false positives, but makes this very cheap to compute.
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
re_tracing::profile_function!(format!("{query:?}"));
Expand Down
62 changes: 62 additions & 0 deletions crates/store/re_dataframe/examples/range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use itertools::Itertools as _;

use re_chunk_store::{
ChunkStore, ChunkStoreConfig, ComponentColumnDescriptor, RangeQueryExpression, Timeline,
VersionPolicy,
};
use re_dataframe::QueryEngine;
use re_log_types::{ResolvedTimeRange, StoreKind};

fn main() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();

let get_arg = |i| {
let Some(value) = args.get(i) else {
eprintln!(
"Usage: {} <path_to_rrd_with_position3ds> <entity_path_pov> [entity_path_expr]",
args.first().map_or("$BIN", |s| s.as_str())
);
std::process::exit(1);
};
value
};

let path_to_rrd = get_arg(1);
let entity_path_pov = get_arg(2).as_str();
let entity_path_expr = args.get(3).map_or("/**", |s| s.as_str());

let stores = ChunkStore::from_rrd_filepath(
&ChunkStoreConfig::DEFAULT,
path_to_rrd,
VersionPolicy::Warn,
)?;

for (store_id, store) in &stores {
if store_id.kind != StoreKind::Recording {
continue;
}

let cache = re_dataframe::external::re_query::Caches::new(store);
let engine = QueryEngine {
store,
cache: &cache,
};

let query = RangeQueryExpression {
entity_path_expr: entity_path_expr.into(),
timeline: Timeline::log_tick(),
time_range: ResolvedTimeRange::new(0, 30),
pov: ComponentColumnDescriptor::new::<re_types::components::Position3D>(
entity_path_pov.into(),
),
};

let query_handle = engine.range(&query, None /* columns */);
eprintln!("{query}:");
for batch in query_handle.into_iter() {
eprintln!("{batch}");
}
}

Ok(())
}
13 changes: 3 additions & 10 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use re_chunk_store::{
};
use re_query::Caches;

use crate::LatestAtQueryHandle;
use crate::{LatestAtQueryHandle, RangeQueryHandle};

// ---

Expand All @@ -15,8 +15,6 @@ use crate::LatestAtQueryHandle;
// TODO(cmc): add an `arrow` feature to transportchunk in a follow-up pr and call it a day.
pub type RecordBatch = TransportChunk;

pub struct RangeQueryHandle<'a>(&'a ());

/// A generic handle to a query that is ready to be executed.
pub enum QueryHandle<'a> {
LatestAt(LatestAtQueryHandle<'a>),
Expand Down Expand Up @@ -80,7 +78,7 @@ impl QueryEngine<'_> {
/// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...);
/// * third, the component columns in lexical order (`Color`, `Radius, ...`).
///
/// This does not run a full-blown query, but rather just inspects [`Chunk`]-level metadata,
/// This does not run a full-blown query, but rather just inspects `Chunk`-level metadata,
/// which can lead to false positives, but makes this very cheap to compute.
#[inline]
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
Expand All @@ -93,7 +91,6 @@ impl QueryEngine<'_> {
/// * [`Self::latest_at`]
/// * [`Self::range`]
#[inline]
#[allow(clippy::unimplemented, clippy::needless_pass_by_value)]
pub fn query(
&self,
query: &QueryExpression,
Expand Down Expand Up @@ -145,15 +142,11 @@ impl QueryEngine<'_> {
/// for a range of data on the `frame` timeline, but still include the `log_time` timeline in
/// the result.
#[inline]
#[allow(clippy::unimplemented, clippy::needless_pass_by_value)]
pub fn range(
&self,
query: &RangeQueryExpression,
columns: Option<Vec<ColumnDescriptor>>,
) -> RangeQueryHandle<'_> {
_ = self;
_ = query;
_ = columns;
unimplemented!("TODO(cmc)")
RangeQueryHandle::new(self, query.clone(), columns)
}
}
2 changes: 2 additions & 0 deletions crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

mod engine;
mod latest_at;
mod range;

pub use self::engine::{QueryEngine, QueryHandle, RecordBatch};
pub use self::latest_at::LatestAtQueryHandle;
pub use self::range::RangeQueryHandle;

pub mod external {
pub use re_chunk;
Expand Down
259 changes: 259 additions & 0 deletions crates/store/re_dataframe/src/range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
use std::{collections::VecDeque, sync::OnceLock};

use ahash::HashMap;
use arrow2::{
array::{Array as ArrowArray, ListArray as ArrowListArray},
chunk::Chunk as ArrowChunk,
datatypes::Schema as ArrowSchema,
};
use itertools::Itertools;

use re_chunk::{Chunk, LatestAtQuery, RangeQuery};
use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, RangeQueryExpression};

use crate::{QueryEngine, RecordBatch};

// ---

/// A handle to a range query, ready to be executed.
///
/// Cheaply created via [`QueryEngine::range`].
///
/// See [`RangeQueryHandle::next_page`].
//
// TODO(cmc): pagination support
// TODO(cmc): intra-timestamp decimation support
pub struct RangeQueryHandle<'a> {
/// Handle to the [`QueryEngine`].
pub(crate) engine: &'a QueryEngine<'a>,

/// The original query expression used to instantiate this handle.
pub(crate) query: RangeQueryExpression,

/// The user-specified schema that describes any data returned through this handle, if any.
pub(crate) user_columns: Option<Vec<ColumnDescriptor>>,

/// Internal private state. Lazily computed.
///
/// It is important that handles stay cheap to create.
state: OnceLock<RangeQuerytHandleState>,
}

/// Internal private state. Lazily computed.
struct RangeQuerytHandleState {
/// The final schema.
columns: Vec<ColumnDescriptor>,

/// All the [`Chunk`]s for the active point-of-view.
///
/// These are already sorted and vertically sliced according to the query.
pov_chunks: Option<VecDeque<Chunk>>,
}

impl<'a> RangeQueryHandle<'a> {
pub(crate) fn new(
engine: &'a QueryEngine<'a>,
query: RangeQueryExpression,
user_columns: Option<Vec<ColumnDescriptor>>,
) -> Self {
Self {
engine,
query,
user_columns,
state: Default::default(),
}
}
}

impl RangeQueryHandle<'_> {
/// Lazily initialize internal private state.
///
/// It is important that handles stay cheap to create.
fn init(&self) -> &RangeQuerytHandleState {
self.state.get_or_init(|| {
re_tracing::profile_scope!("init");

let columns = {
re_tracing::profile_scope!("compute schema");

self.user_columns.clone().unwrap_or_else(|| {
self.engine
.store
.schema_for_query(&self.query.clone().into())
})
};

let pov_chunks = {
re_tracing::profile_scope!("gather pov timestamps");

let query = RangeQuery::new(self.query.timeline, self.query.time_range)
.keep_extra_timelines(true) // we want all the timelines we can get!
.keep_extra_components(false);

let results = self.engine.cache.range(
self.engine.store,
&query,
&self.query.pov.entity_path,
[self.query.pov.component_name],
);

results
.components
.into_iter()
.find_map(|(component_name, chunks)| {
(component_name == self.query.pov.component_name).then_some(chunks)
})
.map(Into::into)
};

RangeQuerytHandleState {
columns,
pov_chunks,
}
})
}

/// All results returned by this handle will strictly follow this schema.
///
/// Columns that do not yield any data will still be present in the results, filled with null values.
pub fn schema(&self) -> &[ColumnDescriptor] {
&self.init().columns
}

/// Partially executes the range query until the next natural page of results.
///
/// Returns a single [`RecordBatch`] containing as many rows as available in the page, or
/// `None` if all the dataset has been returned.
/// Each cell in the result corresponds to the latest known value at that particular point in time
/// for each respective `ColumnDescriptor`.
///
/// The schema of the returned [`RecordBatch`] is guaranteed to match the one returned by
/// [`Self::schema`].
/// Columns that do not yield any data will still be present in the results, filled with null values.
///
/// "Natural pages" refers to pages of data that match 1:1 to the underlying storage.
/// The size of each page cannot be known in advance, as it depends on unspecified
/// implementation details.
/// This is the most performant way to iterate over the dataset.
///
/// ```ignore
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
/// while let Some(batch) = query_handle.next_page() {
/// // …
/// }
/// ```
pub fn next_page(&mut self) -> Option<RecordBatch> {
re_tracing::profile_function!(format!("{:?}", self.query));

_ = self.init();
let pov_chunk = self.state.get_mut()?.pov_chunks.as_mut()?.pop_front()?;
let pov_time_column = pov_chunk.timelines().get(&self.query.timeline)?;
let columns = self.schema();

// TODO(cmc): There are more efficient, albeit infinitely more complicated ways to do this.
// Let's first implement all features (multi-PoV, pagination, timestamp streaming, etc) and
// see if this ever becomes an issue before going down this road.
Comment on lines +152 to +154
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. We'll always need this as a worst-case fallback path, but I can definitely imagine this being replaced with a series of checks that determine whether particular happy-path join operations are applicable with more efficient implementations for those cases.

//
// TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice.
let list_arrays: HashMap<&ComponentColumnDescriptor, ArrowListArray<i32>> = {
re_tracing::profile_scope!("queries");

columns
.iter()
.filter_map(|descr| match descr {
ColumnDescriptor::Component(descr) => Some(descr),
_ => None,
})
.filter_map(|descr| {
let arrays = pov_time_column
.times()
.map(|time| {
let query = LatestAtQuery::new(self.query.timeline, time);

let results = self.engine.cache.latest_at(
self.engine.store,
&query,
&descr.entity_path,
[descr.component_name],
);

results
.components
.get(&descr.component_name)
.and_then(|unit| {
unit.component_batch_raw(&descr.component_name).clone()
})
})
.collect_vec();
let arrays = arrays
.iter()
.map(|array| array.as_ref().map(|array| &**array as &dyn ArrowArray))
.collect_vec();

let list_array =
re_chunk::util::arrays_to_list_array(descr.datatype.clone(), &arrays);

if cfg!(debug_assertions) {
#[allow(clippy::unwrap_used)] // want to crash in dev
Some((descr, list_array.unwrap()))
} else {
// NOTE: Technically cannot ever happen, but I'd rather that than an uwnrap.
list_array.map(|list_array| (descr, list_array))
}
})
.collect()
};

// NOTE: Keep in mind this must match the ordering specified by `Self::schema`.
let packed_arrays = {
re_tracing::profile_scope!("packing");

columns
.iter()
.map(|descr| match descr {
ColumnDescriptor::Control(_descr) => pov_chunk.row_ids_array().to_boxed(),

ColumnDescriptor::Time(descr) => {
let time_column = pov_chunk.timelines().get(&descr.timeline).cloned();
time_column.map_or_else(
|| {
arrow2::array::new_null_array(
descr.datatype.clone(),
pov_chunk.num_rows(),
)
},
|time_column| time_column.times_array().to_boxed(),
)
}

ColumnDescriptor::Component(descr) => list_arrays.get(descr).map_or_else(
|| {
arrow2::array::new_null_array(
descr.datatype.clone(),
pov_time_column.num_rows(),
)
},
|list_array| list_array.to_boxed(),
),
})
.collect_vec()
};

Some(RecordBatch {
schema: ArrowSchema {
fields: columns
.iter()
.map(ColumnDescriptor::to_arrow_field)
.collect(),
metadata: Default::default(),
},
data: ArrowChunk::new(packed_arrays),
})
}
}

impl<'a> RangeQueryHandle<'a> {
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn into_iter(mut self) -> impl Iterator<Item = RecordBatch> + 'a {
std::iter::from_fn(move || self.next_page())
}
}
Loading