Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataframe queries 4: paginated dense range #7345

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions crates/store/re_dataframe/examples/range_paginated.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#![allow(clippy::unwrap_used)]

use itertools::Itertools as _;

use re_chunk_store::{
ChunkStore, ChunkStoreConfig, ComponentColumnDescriptor, RangeQueryExpression, Timeline,
VersionPolicy,
};
use re_dataframe::{QueryEngine, RecordBatch};
use re_log_types::{ResolvedTimeRange, StoreKind};

fn main() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();

let get_arg = |i| {
let Some(value) = args.get(i) else {
eprintln!(
"Usage: {} <path_to_rrd_with_position3ds> <entity_path_pov> [entity_path_expr]",
args.first().map_or("$BIN", |s| s.as_str())
);
std::process::exit(1);
};
value
};

let path_to_rrd = get_arg(1);
let entity_path_pov = get_arg(2).as_str();
let entity_path_expr = args.get(3).map_or("/**", |s| s.as_str());

let stores = ChunkStore::from_rrd_filepath(
&ChunkStoreConfig::ALL_DISABLED,
path_to_rrd,
VersionPolicy::Warn,
)?;

for (store_id, store) in &stores {
if store_id.kind != StoreKind::Recording {
continue;
}

let cache = re_dataframe::external::re_query::Caches::new(store);
let engine = QueryEngine {
store,
cache: &cache,
};

let query = RangeQueryExpression {
entity_path_expr: entity_path_expr.into(),
timeline: Timeline::log_tick(),
time_range: ResolvedTimeRange::new(0, 30),
pov: ComponentColumnDescriptor::new::<re_types::components::Position3D>(
entity_path_pov.into(),
),
};

let query_handle = engine.range(&query, None /* columns */);
println!("{query}:");
println!(
"num_chunks:{} num_rows:{}",
query_handle.num_chunks(),
query_handle.num_rows()
);

let (offset, len) = (0, 4);
println!("offset:{offset} len:{len}");
concat_and_print(query_handle.get(offset, len));

let (offset, len) = (2, 4);
println!("offset:{offset} len:{len}");
concat_and_print(query_handle.get(offset, len));

let (offset, len) = (10, 5);
println!("offset:{offset} len:{len}");
concat_and_print(query_handle.get(offset, len));

let (offset, len) = (0, 15);
println!("offset:{offset} len:{len}");
concat_and_print(query_handle.get(offset, len));
}

Ok(())
}

fn concat_and_print(chunks: Vec<RecordBatch>) {
use re_chunk::external::arrow2::{
chunk::Chunk as ArrowChunk, compute::concatenate::concatenate,
};

let chunk = chunks.into_iter().reduce(|acc, chunk| RecordBatch {
schema: chunk.schema.clone(),
data: ArrowChunk::new(
acc.data
.iter()
.zip(chunk.data.iter())
.map(|(l, r)| concatenate(&[&**l, &**r]).unwrap())
.collect(),
),
});

if let Some(chunk) = chunk {
println!("{chunk}");
} else {
println!("<empty>");
}
}
109 changes: 104 additions & 5 deletions crates/store/re_dataframe/src/range.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::VecDeque, sync::OnceLock};
use std::sync::{atomic::AtomicU64, OnceLock};

use ahash::HashMap;
use arrow2::{
Expand Down Expand Up @@ -47,7 +47,12 @@ struct RangeQuerytHandleState {
/// All the [`Chunk`]s for the active point-of-view.
///
/// These are already sorted and vertically sliced according to the query.
pov_chunks: Option<VecDeque<Chunk>>,
pov_chunks: Option<Vec<Chunk>>,

/// Tracks the current page index. Used for [`RangeQueryHandle::next_page`].
//
// NOTE: The state is behind a `OnceLock`, the atomic just make some things simpler down the road.
cur_page: AtomicU64,
}

impl<'a> RangeQueryHandle<'a> {
Expand Down Expand Up @@ -103,12 +108,12 @@ impl RangeQueryHandle<'_> {
.find_map(|(component_name, chunks)| {
(component_name == self.query.pov.component_name).then_some(chunks)
})
.map(Into::into)
};

RangeQuerytHandleState {
columns,
pov_chunks,
cur_page: AtomicU64::new(0),
}
})
}
Expand Down Expand Up @@ -144,8 +149,102 @@ impl RangeQueryHandle<'_> {
pub fn next_page(&mut self) -> Option<RecordBatch> {
re_tracing::profile_function!(format!("{:?}", self.query));

_ = self.init();
let pov_chunk = self.state.get_mut()?.pov_chunks.as_mut()?.pop_front()?;
let state = self.init();
let cur_page = state.cur_page.load(std::sync::atomic::Ordering::Relaxed);
let pov_chunk = state.pov_chunks.as_ref()?.get(cur_page as usize)?;
_ = state
.cur_page
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

self.dense_batch_at_pov(pov_chunk)
}

/// Partially executes the range query in order to return the specified range of rows.
///
/// Returns a vector of [`RecordBatch`]es: as many as required to fill the specified range.
/// Each [`RecordBatch`] will correspond a "natural page" of data, even the first and last batch,
/// although they might be cut off at the edge.
/// Each cell in the result corresponds to the latest known value at that particular point in time
/// for each respective `ColumnDescriptor`.
///
/// The schema of the returned [`RecordBatch`]es is guaranteed to match the one returned by
/// [`Self::schema`].
/// Columns that do not yield any data will still be present in the results, filled with null values.
///
/// "Natural pages" refers to pages of data that match 1:1 to the underlying storage.
/// The size of each page cannot be known in advance, as it depends on unspecified
/// implementation details.
/// This is the most performant way to iterate over the dataset.
//
// TODO(cmc): This could be turned into an actual lazy iterator at some point.
pub fn get(&self, offset: u64, mut len: u64) -> Vec<RecordBatch> {
let mut results = Vec::new();

let state = self.init();
let Some(pov_chunks) = state.pov_chunks.as_ref() else {
return results;
};
let mut pov_chunks = pov_chunks.iter();

let mut cur_offset = 0;
let Some(mut cur_pov_chunk) = pov_chunks.next().cloned() else {
return results;
};

// Fast-forward until the first relevant PoV chunk.
//
// TODO(cmc): should keep an extra sorted datastructure and use a binsearch instead.
while (cur_offset + cur_pov_chunk.num_rows() as u64) < offset {
cur_offset += cur_pov_chunk.num_rows() as u64;

let Some(next_pov_chunk) = pov_chunks.next().cloned() else {
return results;
};
cur_pov_chunk = next_pov_chunk;
}

// Fast-forward to until the first relevant row in the PoV chunk.
let mut offset = if cur_offset < offset {
offset.saturating_sub(cur_offset)
} else {
0
};

// Repeatedly compute dense ranges until we've returned `len` rows.
while len > 0 {
cur_pov_chunk = cur_pov_chunk.row_sliced(offset as _, len as _);
results.extend(self.dense_batch_at_pov(&cur_pov_chunk));

offset = 0; // always start at the first row after the first chunk
len = len.saturating_sub(cur_pov_chunk.num_rows() as u64);

let Some(next_pov_chunk) = pov_chunks.next().cloned() else {
break;
};
cur_pov_chunk = next_pov_chunk;
}

results
}

/// How many chunks / natural pages of data will be returned?
#[inline]
pub fn num_chunks(&self) -> u64 {
self.init()
.pov_chunks
.as_ref()
.map_or(0, |pov_chunks| pov_chunks.len() as _)
}

/// How many rows of data will be returned?
#[inline]
pub fn num_rows(&self) -> u64 {
self.init().pov_chunks.as_ref().map_or(0, |pov_chunks| {
pov_chunks.iter().map(|chunk| chunk.num_rows() as u64).sum()
})
}

fn dense_batch_at_pov(&self, pov_chunk: &Chunk) -> Option<RecordBatch> {
let pov_time_column = pov_chunk.timelines().get(&self.query.timeline)?;
let columns = self.schema();

Expand Down
Loading