From c00256a461f984002c16be90b58b0df78942462a Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 17 Jan 2025 14:54:10 +0000 Subject: [PATCH] Filter empty from scan batches (#1996) --- vortex-file/src/exec/inline.rs | 8 +++++--- vortex-file/src/exec/mod.rs | 2 +- vortex-file/src/exec/tokio.rs | 5 ++++- vortex-file/src/open/split_by.rs | 2 +- vortex-layout/src/layouts/flat/eval_expr.rs | 2 ++ vortex-scan/src/range_scan.rs | 18 ++++++++++-------- 6 files changed, 23 insertions(+), 14 deletions(-) diff --git a/vortex-file/src/exec/inline.rs b/vortex-file/src/exec/inline.rs index 33dd96e82..1b1b31f99 100644 --- a/vortex-file/src/exec/inline.rs +++ b/vortex-file/src/exec/inline.rs @@ -1,6 +1,6 @@ use futures_util::future::BoxFuture; use futures_util::stream::BoxStream; -use futures_util::StreamExt; +use futures_util::{FutureExt, StreamExt}; use vortex_array::ArrayData; use vortex_error::VortexResult; @@ -12,8 +12,10 @@ pub struct InlineDriver; impl ExecDriver for InlineDriver { fn drive( &self, - stream: BoxStream<'static, BoxFuture<'static, VortexResult>>, + stream: BoxStream<'static, BoxFuture<'static, VortexResult>>>, ) -> BoxStream<'static, VortexResult> { - stream.then(|future| future).boxed() + stream + .filter_map(|future| future.map(|result| result.transpose())) + .boxed() } } diff --git a/vortex-file/src/exec/mod.rs b/vortex-file/src/exec/mod.rs index c17edeeff..796338fe7 100644 --- a/vortex-file/src/exec/mod.rs +++ b/vortex-file/src/exec/mod.rs @@ -18,6 +18,6 @@ use vortex_error::VortexResult; pub trait ExecDriver: Send + Sync { fn drive( &self, - stream: BoxStream<'static, BoxFuture<'static, VortexResult>>, + stream: BoxStream<'static, BoxFuture<'static, VortexResult>>>, ) -> BoxStream<'static, VortexResult>; } diff --git a/vortex-file/src/exec/tokio.rs b/vortex-file/src/exec/tokio.rs index cd6285456..0be845af7 100644 --- a/vortex-file/src/exec/tokio.rs +++ b/vortex-file/src/exec/tokio.rs @@ -1,3 +1,5 @@ +use std::future::ready; + use futures_util::future::BoxFuture; use futures_util::stream::BoxStream; use futures_util::StreamExt; @@ -13,7 +15,7 @@ pub struct TokioDriver(pub Handle); impl ExecDriver for TokioDriver { fn drive( &self, - stream: BoxStream<'static, BoxFuture<'static, VortexResult>>, + stream: BoxStream<'static, BoxFuture<'static, VortexResult>>>, ) -> BoxStream<'static, VortexResult> { let handle = self.0.clone(); @@ -29,6 +31,7 @@ impl ExecDriver for TokioDriver { Ok(result) => result, Err(e) => Err(vortex_err!("Failed to join Tokio result {}", e)), }) + .filter_map(|r| ready(r.transpose())) .boxed() } } diff --git a/vortex-file/src/open/split_by.rs b/vortex-file/src/open/split_by.rs index 5f27acdd7..7724f0c4a 100644 --- a/vortex-file/src/open/split_by.rs +++ b/vortex-file/src/open/split_by.rs @@ -32,7 +32,7 @@ impl SplitBy { .into_iter() .tuple_windows() .map(|(start, end)| start..end) - .collect() + .collect::>() } SplitBy::RowCount(n) => { let row_count = layout.row_count(); diff --git a/vortex-layout/src/layouts/flat/eval_expr.rs b/vortex-layout/src/layouts/flat/eval_expr.rs index fa8d1687a..5f4bee6bb 100644 --- a/vortex-layout/src/layouts/flat/eval_expr.rs +++ b/vortex-layout/src/layouts/flat/eval_expr.rs @@ -20,6 +20,8 @@ impl ExprEvaluator for FlatReader { row_mask: RowMask, expr: ExprRef, ) -> VortexResult { + assert!(row_mask.true_count() > 0); + // Fetch all the array buffers. let mut buffers = try_join_all( self.layout() diff --git a/vortex-scan/src/range_scan.rs b/vortex-scan/src/range_scan.rs index 17a1c846e..ca083704c 100644 --- a/vortex-scan/src/range_scan.rs +++ b/vortex-scan/src/range_scan.rs @@ -3,7 +3,7 @@ use std::ops::{BitAnd, Range}; use std::sync::Arc; use vortex_array::compute::{fill_null, FilterMask}; -use vortex_array::{ArrayData, Canonical, IntoArrayData}; +use vortex_array::ArrayData; use vortex_error::{VortexExpect, VortexResult}; use vortex_expr::ExprRef; @@ -22,12 +22,12 @@ enum State { // Then we project the selected rows. Project((FilterMask, ExprRef)), // And then we're done. - Ready(ArrayData), + Ready(Option), } pub enum NextOp { /// The finished result of the scan. - Ready(ArrayData), + Ready(Option), /// The next expression to evaluate. /// The caller **must** first apply the mask before evaluating the expression. Eval((Range, FilterMask, ExprRef)), @@ -124,8 +124,7 @@ impl RangeScanner { // Then move onto the projection if mask.true_count() == 0 { // If the mask is empty, then we're done. - self.state = - State::Ready(Canonical::empty(self.scan.result_dtype())?.into_array()); + self.state = State::Ready(None); } else if !conjuncts_rev.is_empty() { self.mask = mask; let mask = if self.mask.selectivity() < APPLY_FILTER_SELECTIVITY_THRESHOLD { @@ -141,7 +140,8 @@ impl RangeScanner { } State::Project(_) => { // We're done. - self.state = State::Ready(result); + assert!(!result.is_empty(), "projected array cannot be empty"); + self.state = State::Ready(Some(result)); } State::Ready(_) => {} } @@ -149,7 +149,7 @@ impl RangeScanner { } /// Evaluate the [`RangeScanner`] operation using a synchronous expression evaluator. - pub fn evaluate(mut self, evaluator: E) -> VortexResult + pub fn evaluate(mut self, evaluator: E) -> VortexResult> where E: Fn(RowMask, ExprRef) -> VortexResult, { @@ -165,7 +165,7 @@ impl RangeScanner { } /// Evaluate the [`RangeScanner`] operation using an async expression evaluator. - pub async fn evaluate_async(mut self, evaluator: E) -> VortexResult + pub async fn evaluate_async(mut self, evaluator: E) -> VortexResult> where E: Fn(RowMask, ExprRef) -> F, F: Future>, @@ -246,6 +246,7 @@ mod tests { filter(&arr, mask.filter_mask()) }) + .unwrap() .unwrap(); assert!(res.as_struct_array().is_some()); @@ -301,6 +302,7 @@ mod tests { filter(&arr, mask.filter_mask()) }) + .unwrap() .unwrap(); assert!(res.as_struct_array().is_some());