Skip to content

Commit

Permalink
Filter empty from scan batches (#1996)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Jan 17, 2025
1 parent 2f04a70 commit c00256a
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 14 deletions.
8 changes: 5 additions & 3 deletions vortex-file/src/exec/inline.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures_util::future::BoxFuture;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use futures_util::{FutureExt, StreamExt};
use vortex_array::ArrayData;
use vortex_error::VortexResult;

Expand All @@ -12,8 +12,10 @@ pub struct InlineDriver;
impl ExecDriver for InlineDriver {
fn drive(
&self,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<ArrayData>>>,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<Option<ArrayData>>>>,
) -> BoxStream<'static, VortexResult<ArrayData>> {
stream.then(|future| future).boxed()
stream
.filter_map(|future| future.map(|result| result.transpose()))
.boxed()
}
}
2 changes: 1 addition & 1 deletion vortex-file/src/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ use vortex_error::VortexResult;
pub trait ExecDriver: Send + Sync {
fn drive(
&self,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<ArrayData>>>,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<Option<ArrayData>>>>,
) -> BoxStream<'static, VortexResult<ArrayData>>;
}
5 changes: 4 additions & 1 deletion vortex-file/src/exec/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::future::ready;

use futures_util::future::BoxFuture;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
Expand All @@ -13,7 +15,7 @@ pub struct TokioDriver(pub Handle);
impl ExecDriver for TokioDriver {
fn drive(
&self,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<ArrayData>>>,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<Option<ArrayData>>>>,
) -> BoxStream<'static, VortexResult<ArrayData>> {
let handle = self.0.clone();

Expand All @@ -29,6 +31,7 @@ impl ExecDriver for TokioDriver {
Ok(result) => result,
Err(e) => Err(vortex_err!("Failed to join Tokio result {}", e)),
})
.filter_map(|r| ready(r.transpose()))
.boxed()
}
}
2 changes: 1 addition & 1 deletion vortex-file/src/open/split_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl SplitBy {
.into_iter()
.tuple_windows()
.map(|(start, end)| start..end)
.collect()
.collect::<Vec<_>>()
}
SplitBy::RowCount(n) => {
let row_count = layout.row_count();
Expand Down
2 changes: 2 additions & 0 deletions vortex-layout/src/layouts/flat/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ impl ExprEvaluator for FlatReader {
row_mask: RowMask,
expr: ExprRef,
) -> VortexResult<ArrayData> {
assert!(row_mask.true_count() > 0);

// Fetch all the array buffers.
let mut buffers = try_join_all(
self.layout()
Expand Down
18 changes: 10 additions & 8 deletions vortex-scan/src/range_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ops::{BitAnd, Range};
use std::sync::Arc;

use vortex_array::compute::{fill_null, FilterMask};
use vortex_array::{ArrayData, Canonical, IntoArrayData};
use vortex_array::ArrayData;
use vortex_error::{VortexExpect, VortexResult};
use vortex_expr::ExprRef;

Expand All @@ -22,12 +22,12 @@ enum State {
// Then we project the selected rows.
Project((FilterMask, ExprRef)),
// And then we're done.
Ready(ArrayData),
Ready(Option<ArrayData>),
}

pub enum NextOp {
/// The finished result of the scan.
Ready(ArrayData),
Ready(Option<ArrayData>),
/// The next expression to evaluate.
/// The caller **must** first apply the mask before evaluating the expression.
Eval((Range<u64>, FilterMask, ExprRef)),
Expand Down Expand Up @@ -124,8 +124,7 @@ impl RangeScanner {
// Then move onto the projection
if mask.true_count() == 0 {
// If the mask is empty, then we're done.
self.state =
State::Ready(Canonical::empty(self.scan.result_dtype())?.into_array());
self.state = State::Ready(None);
} else if !conjuncts_rev.is_empty() {
self.mask = mask;
let mask = if self.mask.selectivity() < APPLY_FILTER_SELECTIVITY_THRESHOLD {
Expand All @@ -141,15 +140,16 @@ impl RangeScanner {
}
State::Project(_) => {
// We're done.
self.state = State::Ready(result);
assert!(!result.is_empty(), "projected array cannot be empty");
self.state = State::Ready(Some(result));
}
State::Ready(_) => {}
}
Ok(self)
}

/// Evaluate the [`RangeScanner`] operation using a synchronous expression evaluator.
pub fn evaluate<E>(mut self, evaluator: E) -> VortexResult<ArrayData>
pub fn evaluate<E>(mut self, evaluator: E) -> VortexResult<Option<ArrayData>>
where
E: Fn(RowMask, ExprRef) -> VortexResult<ArrayData>,
{
Expand All @@ -165,7 +165,7 @@ impl RangeScanner {
}

/// Evaluate the [`RangeScanner`] operation using an async expression evaluator.
pub async fn evaluate_async<E, F>(mut self, evaluator: E) -> VortexResult<ArrayData>
pub async fn evaluate_async<E, F>(mut self, evaluator: E) -> VortexResult<Option<ArrayData>>
where
E: Fn(RowMask, ExprRef) -> F,
F: Future<Output = VortexResult<ArrayData>>,
Expand Down Expand Up @@ -246,6 +246,7 @@ mod tests {

filter(&arr, mask.filter_mask())
})
.unwrap()
.unwrap();

assert!(res.as_struct_array().is_some());
Expand Down Expand Up @@ -301,6 +302,7 @@ mod tests {

filter(&arr, mask.filter_mask())
})
.unwrap()
.unwrap();

assert!(res.as_struct_array().is_some());
Expand Down

0 comments on commit c00256a

Please sign in to comment.