From 88252ccd98f357f2b409873433857923c4c25955 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 20:10:43 -0600 Subject: [PATCH 01/15] Use custom FilterExec that always uses take with a selection vector --- native/Cargo.lock | 1 + native/Cargo.toml | 1 + native/core/Cargo.toml | 1 + .../core/src/execution/datafusion/planner.rs | 3 +- native/core/src/execution/operators/filter.rs | 445 ++++++++++++++++++ native/core/src/execution/operators/mod.rs | 2 + 6 files changed, 451 insertions(+), 2 deletions(-) create mode 100644 native/core/src/execution/operators/filter.rs diff --git a/native/Cargo.lock b/native/Cargo.lock index 5d8dce283..27bc3828c 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -881,6 +881,7 @@ dependencies = [ "datafusion-comet-proto", "datafusion-comet-spark-expr", "datafusion-common", + "datafusion-execution", "datafusion-expr", "datafusion-functions-nested", "datafusion-physical-expr", diff --git a/native/Cargo.toml b/native/Cargo.toml index a41934fe3..9977ceece 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -44,6 +44,7 @@ datafusion = { default-features = false, git = "https://github.com/apache/datafu datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", features = ["crypto_expressions"] } datafusion-functions-nested = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } +datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 55c46e1d6..45252f879 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -69,6 +69,7 @@ datafusion-common = { workspace = true } datafusion = { workspace = true } datafusion-functions-nested = { workspace = true } datafusion-expr = { workspace = true } +datafusion-execution = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-expr = { workspace = true } once_cell = "1.18.0" diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 122a24ed3..d6ebc66a7 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -19,7 +19,7 @@ use super::expressions::EvalMode; use crate::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun; -use crate::execution::operators::CopyMode; +use crate::execution::operators::{CopyMode, FilterExec}; use crate::{ errors::ExpressionError, execution::{ @@ -73,7 +73,6 @@ use datafusion::{ physical_optimizer::join_selection::swap_hash_join, physical_plan::{ aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy}, - filter::FilterExec, joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec}, limit::LocalLimitExec, projection::ProjectionExec, diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs new file mode 100644 index 000000000..22b299373 --- /dev/null +++ b/native/core/src/execution/operators/filter.rs @@ -0,0 +1,445 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{ready, Context, Poll}; + +use datafusion::physical_plan::{ + metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, + ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, +}; + +use arrow::compute::take; +use arrow::datatypes::{DataType, SchemaRef}; +use arrow::record_batch::RecordBatch; +use arrow_array::builder::Int32Builder; +use arrow_array::{BooleanArray, RecordBatchOptions}; +use arrow_schema::ArrowError; +use datafusion_common::cast::as_boolean_array; +use datafusion_common::stats::Precision; +use datafusion_common::{internal_err, plan_err, DataFusionError, Result}; +use datafusion_execution::TaskContext; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::BinaryExpr; +use datafusion_physical_expr::intervals::utils::check_support; +use datafusion_physical_expr::utils::collect_columns; +use datafusion_physical_expr::{ + analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, +}; + +use futures::stream::{Stream, StreamExt}; +use log::trace; + +/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to +/// include in its output batches. +#[derive(Debug)] +pub struct FilterExec { + /// The expression to filter on. This expression must evaluate to a boolean value. + predicate: Arc, + /// The input plan + input: Arc, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, + /// Selectivity for statistics. 0 = no rows, 100 = all rows + default_selectivity: u8, + /// Properties equivalence properties, partitioning, etc. + cache: PlanProperties, +} + +impl FilterExec { + /// Create a FilterExec on an input + pub fn try_new( + predicate: Arc, + input: Arc, + ) -> Result { + match predicate.data_type(input.schema().as_ref())? { + DataType::Boolean => { + let default_selectivity = 20; + let cache = Self::compute_properties(&input, &predicate, default_selectivity)?; + Ok(Self { + predicate, + input: Arc::clone(&input), + metrics: ExecutionPlanMetricsSet::new(), + default_selectivity, + cache, + }) + } + other => { + plan_err!("Filter predicate must return BOOLEAN values, got {other:?}") + } + } + } + + pub fn with_default_selectivity( + mut self, + default_selectivity: u8, + ) -> Result { + if default_selectivity > 100 { + return plan_err!( + "Default filter selectivity value needs to be less than or equal to 100" + ); + } + self.default_selectivity = default_selectivity; + Ok(self) + } + + /// The expression to filter on. This expression must evaluate to a boolean value. + pub fn predicate(&self) -> &Arc { + &self.predicate + } + + /// The input plan + pub fn input(&self) -> &Arc { + &self.input + } + + /// The default selectivity + pub fn default_selectivity(&self) -> u8 { + self.default_selectivity + } + + /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. + fn statistics_helper( + input: &Arc, + predicate: &Arc, + default_selectivity: u8, + ) -> Result { + let input_stats = input.statistics()?; + let schema = input.schema(); + if !check_support(predicate, &schema) { + let selectivity = default_selectivity as f64 / 100.0; + let mut stats = input_stats.into_inexact(); + stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); + stats.total_byte_size = stats + .total_byte_size + .with_estimated_selectivity(selectivity); + return Ok(stats); + } + + let num_rows = input_stats.num_rows; + let total_byte_size = input_stats.total_byte_size; + let input_analysis_ctx = + AnalysisContext::try_from_statistics(&input.schema(), &input_stats.column_statistics)?; + + let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?; + + // Estimate (inexact) selectivity of predicate + let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); + let num_rows = num_rows.with_estimated_selectivity(selectivity); + let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); + + let column_statistics = + collect_new_statistics(&input_stats.column_statistics, analysis_ctx.boundaries); + Ok(Statistics { + num_rows, + total_byte_size, + column_statistics, + }) + } + + fn extend_constants( + input: &Arc, + predicate: &Arc, + ) -> Vec { + let mut res_constants = Vec::new(); + let input_eqs = input.equivalence_properties(); + + let conjunctions = split_conjunction(predicate); + for conjunction in conjunctions { + if let Some(binary) = conjunction.as_any().downcast_ref::() { + if binary.op() == &Operator::Eq { + // Filter evaluates to single value for all partitions + if input_eqs.is_expr_constant(binary.left()) { + res_constants + .push(ConstExpr::from(binary.right()).with_across_partitions(true)) + } else if input_eqs.is_expr_constant(binary.right()) { + res_constants + .push(ConstExpr::from(binary.left()).with_across_partitions(true)) + } + } + } + } + res_constants + } + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + input: &Arc, + predicate: &Arc, + default_selectivity: u8, + ) -> Result { + // Combine the equal predicates with the input equivalence properties + // to construct the equivalence properties: + let stats = Self::statistics_helper(input, predicate, default_selectivity)?; + let mut eq_properties = input.equivalence_properties().clone(); + let (equal_pairs, _) = collect_columns_from_predicate(predicate); + for (lhs, rhs) in equal_pairs { + eq_properties.add_equal_conditions(lhs, rhs)? + } + // Add the columns that have only one viable value (singleton) after + // filtering to constants. + let constants = collect_columns(predicate) + .into_iter() + .filter(|column| stats.column_statistics[column.index()].is_singleton()) + .map(|column| { + let expr = Arc::new(column) as _; + ConstExpr::new(expr).with_across_partitions(true) + }); + // this is for statistics + eq_properties = eq_properties.add_constants(constants); + // this is for logical constant (for example: a = '1', then a could be marked as a constant) + // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0) + eq_properties = eq_properties.add_constants(Self::extend_constants(input, predicate)); + Ok(PlanProperties::new( + eq_properties, + input.output_partitioning().clone(), // Output Partitioning + input.execution_mode(), // Execution Mode + )) + } +} + +impl DisplayAs for FilterExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "FilterExec: {}", self.predicate) + } + } + } +} + +impl ExecutionPlan for FilterExec { + fn name(&self) -> &'static str { + "FilterExec" + } + + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn maintains_input_order(&self) -> Vec { + // tell optimizer this operator doesn't reorder its input + vec![true] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0)) + .and_then(|e| { + let selectivity = e.default_selectivity(); + e.with_default_selectivity(selectivity) + }) + .map(|e| Arc::new(e) as _) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + trace!( + "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", + partition, + context.session_id(), + context.task_id() + ); + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + Ok(Box::pin(FilterExecStream { + schema: self.input.schema(), + predicate: Arc::clone(&self.predicate), + input: self.input.execute(partition, context)?, + baseline_metrics, + })) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + /// The output statistics of a filtering operation can be estimated if the + /// predicate's selectivity value can be determined for the incoming data. + fn statistics(&self) -> Result { + Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity) + } +} + +/// This function ensures that all bounds in the `ExprBoundaries` vector are +/// converted to closed bounds. If a lower/upper bound is initially open, it +/// is adjusted by using the next/previous value for its data type to convert +/// it into a closed bound. +fn collect_new_statistics( + input_column_stats: &[ColumnStatistics], + analysis_boundaries: Vec, +) -> Vec { + analysis_boundaries + .into_iter() + .enumerate() + .map( + |( + idx, + ExprBoundaries { + interval, + distinct_count, + .. + }, + )| { + let (lower, upper) = interval.into_bounds(); + let (min_value, max_value) = if lower.eq(&upper) { + (Precision::Exact(lower), Precision::Exact(upper)) + } else { + (Precision::Inexact(lower), Precision::Inexact(upper)) + }; + ColumnStatistics { + null_count: input_column_stats[idx].null_count.clone().to_inexact(), + max_value, + min_value, + distinct_count: distinct_count.to_inexact(), + } + }, + ) + .collect() +} + +/// The FilterExec streams wraps the input iterator and applies the predicate expression to +/// determine which rows to include in its output batches +struct FilterExecStream { + /// Output schema, which is the same as the input schema for this operator + schema: SchemaRef, + /// The expression to filter on. This expression must evaluate to a boolean value. + predicate: Arc, + /// The input partition to filter. + input: SendableRecordBatchStream, + /// runtime metrics recording + baseline_metrics: BaselineMetrics, +} + +pub(crate) fn batch_filter( + batch: &RecordBatch, + predicate: &Arc, +) -> Result { + predicate + .evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + .and_then(|array| { + Ok(match as_boolean_array(&array) { + // apply filter array to record batch + Ok(filter_array) => filter_record_batch(batch, filter_array)?, + Err(_) => { + return internal_err!("Cannot create filter_array from non-boolean predicates"); + } + }) + }) +} + +pub fn filter_record_batch( + record_batch: &RecordBatch, + predicate: &BooleanArray, +) -> std::result::Result { + // turn predicate into selection vector + let mut sv = Int32Builder::with_capacity(predicate.true_count()); + for i in 0..predicate.len() { + if predicate.value(i) { + sv.append_value(i as i32); + } + } + let sv = sv.finish(); + + let filtered_arrays = record_batch + .columns() + .iter() + .map(|a| take(a, &sv, None)) + .collect::, _>>()?; + let options = RecordBatchOptions::default().with_row_count(Some(sv.len())); + RecordBatch::try_new_with_options(record_batch.schema(), filtered_arrays, &options) +} + +impl Stream for FilterExecStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll; + loop { + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = self.baseline_metrics.elapsed_compute().timer(); + let filtered_batch = batch_filter(&batch, &self.predicate)?; + timer.done(); + // skip entirely filtered batches + if filtered_batch.num_rows() == 0 { + continue; + } + poll = Poll::Ready(Some(Ok(filtered_batch))); + break; + } + value => { + poll = Poll::Ready(value); + break; + } + } + } + self.baseline_metrics.record_poll(poll) + } + + fn size_hint(&self) -> (usize, Option) { + // same number of record batches + self.input.size_hint() + } +} + +impl RecordBatchStream for FilterExecStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +/// Return the equals Column-Pairs and Non-equals Column-Pairs +fn collect_columns_from_predicate(predicate: &Arc) -> EqualAndNonEqual { + let mut eq_predicate_columns = Vec::::new(); + let mut ne_predicate_columns = Vec::::new(); + + let predicates = split_conjunction(predicate); + predicates.into_iter().for_each(|p| { + if let Some(binary) = p.as_any().downcast_ref::() { + match binary.op() { + Operator::Eq => eq_predicate_columns.push((binary.left(), binary.right())), + Operator::NotEq => ne_predicate_columns.push((binary.left(), binary.right())), + _ => {} + } + } + }); + + (eq_predicate_columns, ne_predicate_columns) +} + +/// Pair of `Arc`s +pub type PhysicalExprPairRef<'a> = (&'a Arc, &'a Arc); + +/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates +pub type EqualAndNonEqual<'a> = (Vec>, Vec>); diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index 09e05ef26..17842d107 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -22,9 +22,11 @@ use std::fmt::Debug; use jni::objects::GlobalRef; pub use copy::*; +pub use filter::FilterExec; pub use scan::*; mod copy; +mod filter; mod scan; /// Error returned during executing operators. From 67ef326bf1d64386dfa36000d52598f49f886706 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 21:34:37 -0600 Subject: [PATCH 02/15] Remove CopyExec around FilterExec --- native/core/src/execution/datafusion/planner.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index d6ebc66a7..81c7e69e0 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1781,7 +1781,6 @@ impl From for DataFusionError { fn can_reuse_input_batch(op: &Arc) -> bool { if op.as_any().is::() || op.as_any().is::() - || op.as_any().is::() { can_reuse_input_batch(op.children()[0]) } else { From 500500698a4e0db19eb0bc1e35d5726e55fc33be Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 22:28:27 -0600 Subject: [PATCH 03/15] remove CopyExec on FilterExec inputs to joins --- native/core/src/execution/datafusion/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 81c7e69e0..9c15ea309 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1211,13 +1211,13 @@ impl PhysicalPlanner { let left = if can_reuse_input_batch(&left) { Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy)) } else { - Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone)) + left }; let right = if can_reuse_input_batch(&right) { Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy)) } else { - Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone)) + right }; Ok(( From 160deb83f3016263e1cc63f1ef09e04d07caf9e1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 06:18:47 -0600 Subject: [PATCH 04/15] remove copy before sort in some cases --- native/core/src/execution/datafusion/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 9c15ea309..662fa8006 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -862,7 +862,7 @@ impl PhysicalPlanner { let copy_exec = if can_reuse_input_batch(&child) { Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy)) } else { - Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone)) + child }; Ok(( From 9d5df9da4ef6109da3e97b720c6f13794f49dcff Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 06:24:16 -0600 Subject: [PATCH 05/15] add comments --- native/core/src/execution/operators/filter.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 22b299373..283613398 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -47,8 +47,9 @@ use datafusion_physical_expr::{ use futures::stream::{Stream, StreamExt}; use log::trace; -/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to -/// include in its output batches. +/// This is a copy of DataFusion's FilterExec with one modification to ensure that input +/// batches are never passed through unchanged. The changes are between the comments +/// `BEGIN Comet change` and `END Comet change`. #[derive(Debug)] pub struct FilterExec { /// The expression to filter on. This expression must evaluate to a boolean value. @@ -358,6 +359,7 @@ pub(crate) fn batch_filter( }) } +// BEGIN Comet changes pub fn filter_record_batch( record_batch: &RecordBatch, predicate: &BooleanArray, @@ -379,6 +381,7 @@ pub fn filter_record_batch( let options = RecordBatchOptions::default().with_row_count(Some(sv.len())); RecordBatch::try_new_with_options(record_batch.schema(), filtered_arrays, &options) } +// END Comet changes impl Stream for FilterExecStream { type Item = Result; From 4267f57f025a1515ff6fb2ed7b6e724114ad895e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 06:45:38 -0600 Subject: [PATCH 06/15] cargo fmt --- native/core/src/execution/datafusion/planner.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 662fa8006..d5543ec34 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1779,9 +1779,7 @@ impl From for DataFusionError { /// modification. This is used to determine if we need to copy the input batch to avoid /// data corruption from reusing the input batch. fn can_reuse_input_batch(op: &Arc) -> bool { - if op.as_any().is::() - || op.as_any().is::() - { + if op.as_any().is::() || op.as_any().is::() { can_reuse_input_batch(op.children()[0]) } else { op.as_any().is::() From 9458cfe09b309df5d5a2407e20552c43a512781b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 07:51:00 -0600 Subject: [PATCH 07/15] bug fix: check for null when building selection vector --- native/core/src/execution/operators/filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 283613398..000b62855 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -30,7 +30,7 @@ use arrow::compute::take; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::builder::Int32Builder; -use arrow_array::{BooleanArray, RecordBatchOptions}; +use arrow_array::{Array, BooleanArray, RecordBatchOptions}; use arrow_schema::ArrowError; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; @@ -367,7 +367,7 @@ pub fn filter_record_batch( // turn predicate into selection vector let mut sv = Int32Builder::with_capacity(predicate.true_count()); for i in 0..predicate.len() { - if predicate.value(i) { + if !predicate.is_null(i) && predicate.value(i) { sv.append_value(i as i32); } } From b0c951b2c67947b58d88aeb7395232e0b21ca925 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 10:16:23 -0600 Subject: [PATCH 08/15] revert --- native/core/src/execution/datafusion/planner.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index d5543ec34..24ea41422 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -862,7 +862,7 @@ impl PhysicalPlanner { let copy_exec = if can_reuse_input_batch(&child) { Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy)) } else { - child + Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone)) }; Ok(( @@ -1211,13 +1211,13 @@ impl PhysicalPlanner { let left = if can_reuse_input_batch(&left) { Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy)) } else { - left + Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone)) }; let right = if can_reuse_input_batch(&right) { Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy)) } else { - right + Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone)) }; Ok(( From d38bacda9c569e0bc91af55b410ce2e65aa7761e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 10:19:25 -0600 Subject: [PATCH 09/15] use arrow kernel --- native/core/src/execution/operators/filter.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 000b62855..5845f28f9 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -26,7 +26,7 @@ use datafusion::physical_plan::{ PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use arrow::compute::take; +use arrow::compute::{take, take_record_batch}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::builder::Int32Builder; @@ -372,14 +372,8 @@ pub fn filter_record_batch( } } let sv = sv.finish(); - - let filtered_arrays = record_batch - .columns() - .iter() - .map(|a| take(a, &sv, None)) - .collect::, _>>()?; - let options = RecordBatchOptions::default().with_row_count(Some(sv.len())); - RecordBatch::try_new_with_options(record_batch.schema(), filtered_arrays, &options) + // note that this does not unpack dictionary-encoded arrays + take_record_batch(record_batch, &sv) } // END Comet changes From adfd6cb439ec9d4296192caeec8dfe336b094424 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 10:27:43 -0600 Subject: [PATCH 10/15] remove unused imports --- native/core/src/execution/operators/filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 5845f28f9..976bb8155 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -26,11 +26,11 @@ use datafusion::physical_plan::{ PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use arrow::compute::{take, take_record_batch}; +use arrow::compute::take_record_batch; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::builder::Int32Builder; -use arrow_array::{Array, BooleanArray, RecordBatchOptions}; +use arrow_array::{Array, BooleanArray}; use arrow_schema::ArrowError; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; From 3edac2728c1e16102c3d3eccf62357ca2a106699 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 13:10:24 -0600 Subject: [PATCH 11/15] add criterion benchmark --- native/core/Cargo.toml | 4 + native/core/benches/filter.rs | 94 +++++++++++++++++++ native/core/src/execution/operators/filter.rs | 4 +- native/core/src/execution/operators/mod.rs | 1 + 4 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 native/core/benches/filter.rs diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 45252f879..13a6a3695 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -119,3 +119,7 @@ harness = false [[bench]] name = "parquet_decode" harness = false + +[[bench]] +name = "filter" +harness = false diff --git a/native/core/benches/filter.rs b/native/core/benches/filter.rs new file mode 100644 index 000000000..1e6cca968 --- /dev/null +++ b/native/core/benches/filter.rs @@ -0,0 +1,94 @@ +use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; +use arrow::compute::filter_record_batch; +use arrow::datatypes::{DataType, Field, Schema}; +use comet::execution::operators::comet_filter_record_batch; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use std::sync::Arc; +use std::time::Duration; + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("filter"); + + let num_rows = 8192; + let num_int_cols = 4; + let num_string_cols = 4; + + let batch = create_record_batch(num_rows, num_int_cols, num_string_cols); + + // create some different predicates + let mut predicate_select_few = BooleanBuilder::with_capacity(num_rows); + let mut predicate_select_many = BooleanBuilder::with_capacity(num_rows); + let mut predicate_select_all = BooleanBuilder::with_capacity(num_rows); + for i in 0..num_rows { + predicate_select_few.append_value(i % 10 == 0); + predicate_select_many.append_value(i % 10 > 0); + predicate_select_all.append_value(true); + } + let predicate_select_few = predicate_select_few.finish(); + let predicate_select_many = predicate_select_many.finish(); + let predicate_select_all = predicate_select_all.finish(); + + // baseline uses Arrow's filter_record_batch method + group.bench_function("arrow_filter_record_batch - few", |b| { + b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) + }); + group.bench_function("arrow_filter_record_batch - many", |b| { + b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) + }); + group.bench_function("arrow_filter_record_batch - all", |b| { + b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) + }); + + group.bench_function("comet_filter - few", |b| { + b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) + }); + group.bench_function("comet_filter - many", |b| { + b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) + }); + group.bench_function("comet_filter - all", |b| { + b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) + }); + + group.finish(); +} + +fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: i32) -> RecordBatch { + let mut int32_builder = Int32Builder::with_capacity(num_rows); + let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32); + for i in 0..num_rows { + int32_builder.append_value(i as i32); + string_builder.append_value(format!("this is string #{i}")); + } + let int32_array = Arc::new(int32_builder.finish()); + let string_array = Arc::new(string_builder.finish()); + + let mut fields = vec![]; + let mut columns: Vec = vec![]; + let mut i = 0; + for _ in 0..num_int_cols { + fields.push(Field::new(format!("c{i}"), DataType::Int32, false)); + columns.push(int32_array.clone()); // note this is just copying a reference to the array + i += 1; + } + for _ in 0..num_string_cols { + fields.push(Field::new(format!("c{i}"), DataType::Utf8, false)); + columns.push(string_array.clone()); // note this is just copying a reference to the array + i += 1; + } + let schema = Schema::new(fields); + let batch = RecordBatch::try_new(Arc::new(schema), columns).unwrap(); + batch +} + +fn config() -> Criterion { + Criterion::default() + .measurement_time(Duration::from_millis(500)) + .warm_up_time(Duration::from_millis(500)) +} + +criterion_group! { + name = benches; + config = config(); + targets = criterion_benchmark +} +criterion_main!(benches); diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 976bb8155..d180a7908 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -351,7 +351,7 @@ pub(crate) fn batch_filter( .and_then(|array| { Ok(match as_boolean_array(&array) { // apply filter array to record batch - Ok(filter_array) => filter_record_batch(batch, filter_array)?, + Ok(filter_array) => comet_filter_record_batch(batch, filter_array)?, Err(_) => { return internal_err!("Cannot create filter_array from non-boolean predicates"); } @@ -360,7 +360,7 @@ pub(crate) fn batch_filter( } // BEGIN Comet changes -pub fn filter_record_batch( +pub fn comet_filter_record_batch( record_batch: &RecordBatch, predicate: &BooleanArray, ) -> std::result::Result { diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index 17842d107..bdc233e94 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -22,6 +22,7 @@ use std::fmt::Debug; use jni::objects::GlobalRef; pub use copy::*; +pub use filter::comet_filter_record_batch; pub use filter::FilterExec; pub use scan::*; From 37c883b5e08359bec239714c3f28168d8aa22d32 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 13:24:29 -0600 Subject: [PATCH 12/15] address initial feedback --- native/core/benches/filter.rs | 15 ++++----- native/core/src/execution/operators/filter.rs | 32 ++++++++++++------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/native/core/benches/filter.rs b/native/core/benches/filter.rs index 1e6cca968..40fa21b4b 100644 --- a/native/core/benches/filter.rs +++ b/native/core/benches/filter.rs @@ -29,23 +29,23 @@ fn criterion_benchmark(c: &mut Criterion) { let predicate_select_all = predicate_select_all.finish(); // baseline uses Arrow's filter_record_batch method - group.bench_function("arrow_filter_record_batch - few", |b| { + group.bench_function("arrow_filter_record_batch - few rows selected", |b| { b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) }); - group.bench_function("arrow_filter_record_batch - many", |b| { + group.bench_function("arrow_filter_record_batch - many rows selected", |b| { b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) }); - group.bench_function("arrow_filter_record_batch - all", |b| { + group.bench_function("arrow_filter_record_batch - all rows selected", |b| { b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) }); - group.bench_function("comet_filter - few", |b| { + group.bench_function("comet_filter_record_batch - few rows selected", |b| { b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) }); - group.bench_function("comet_filter - many", |b| { + group.bench_function("comet_filter_record_batch - many rows selected", |b| { b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) }); - group.bench_function("comet_filter - all", |b| { + group.bench_function("comet_filter_record_batch - all rows selected", |b| { b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) }); @@ -76,8 +76,7 @@ fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: i32) i += 1; } let schema = Schema::new(fields); - let batch = RecordBatch::try_new(Arc::new(schema), columns).unwrap(); - batch + RecordBatch::try_new(Arc::new(schema), columns).unwrap() } fn config() -> Criterion { diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index d180a7908..4eb6d342c 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -26,11 +26,11 @@ use datafusion::physical_plan::{ PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use arrow::compute::take_record_batch; +use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; -use arrow_array::builder::Int32Builder; -use arrow_array::{Array, BooleanArray}; +use arrow_array::{make_array, Array, ArrayRef, BooleanArray, RecordBatchOptions}; +use arrow_data::transform::MutableArrayData; use arrow_schema::ArrowError; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; @@ -364,16 +364,24 @@ pub fn comet_filter_record_batch( record_batch: &RecordBatch, predicate: &BooleanArray, ) -> std::result::Result { - // turn predicate into selection vector - let mut sv = Int32Builder::with_capacity(predicate.true_count()); - for i in 0..predicate.len() { - if !predicate.is_null(i) && predicate.value(i) { - sv.append_value(i as i32); - } + if predicate.true_count() == record_batch.num_rows() { + // special case where we just make an exact copy + let arrays: Vec = record_batch + .columns() + .iter() + .map(|array| { + let capacity = array.len(); + let data = array.to_data(); + let mut mutable = MutableArrayData::new(vec![&data], false, capacity); + mutable.extend(0, 0, capacity); + make_array(mutable.freeze()) + }) + .collect(); + let options = RecordBatchOptions::new().with_row_count(Some(record_batch.num_rows())); + RecordBatch::try_new_with_options(record_batch.schema().clone(), arrays, &options) + } else { + filter_record_batch(record_batch, predicate) } - let sv = sv.finish(); - // note that this does not unpack dictionary-encoded arrays - take_record_batch(record_batch, &sv) } // END Comet changes From 8bc5e8ac7ee0a28513227a019fa075ef7660444a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 13:29:44 -0600 Subject: [PATCH 13/15] add ASF header --- native/core/benches/filter.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/native/core/benches/filter.rs b/native/core/benches/filter.rs index 40fa21b4b..815ca09dc 100644 --- a/native/core/benches/filter.rs +++ b/native/core/benches/filter.rs @@ -1,4 +1,20 @@ -use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; + use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, Schema}; use comet::execution::operators::comet_filter_record_batch; From 14e4b1688040877449a3a3e24e891303e8be5764 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 13:43:15 -0600 Subject: [PATCH 14/15] fix missing imports --- native/core/benches/filter.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/native/core/benches/filter.rs b/native/core/benches/filter.rs index 815ca09dc..10ad5a9e7 100644 --- a/native/core/benches/filter.rs +++ b/native/core/benches/filter.rs @@ -17,6 +17,8 @@ use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, Schema}; +use arrow_array::builder::{BooleanBuilder, Int32Builder, StringBuilder}; +use arrow_array::{ArrayRef, RecordBatch}; use comet::execution::operators::comet_filter_record_batch; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use std::sync::Arc; From de305d57a94a50f3eccdec0713bae7b543161a43 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 13:44:09 -0600 Subject: [PATCH 15/15] Update native/core/src/execution/operators/filter.rs Co-authored-by: Liang-Chi Hsieh --- native/core/src/execution/operators/filter.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 4eb6d342c..902529ba5 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -360,6 +360,8 @@ pub(crate) fn batch_filter( } // BEGIN Comet changes +// `FilterExec` could modify input batch or return input batch without change. Instead of always +// adding `CopyExec` on top of it, we only copy input batch for the special case. pub fn comet_filter_record_batch( record_batch: &RecordBatch, predicate: &BooleanArray,