Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Remove redundant copying of batches after FilterExec #835

Merged
merged 15 commits into from
Aug 16, 2024
Merged
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ datafusion = { default-features = false, git = "https://github.com/apache/datafu
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", features = ["crypto_expressions"] }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need datafusion-execution?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use use datafusion::execution::TaskContext. I guess we were just pulling this in transitively before via the datafusion crate rather than being explicit.

We may want to avoid bringing in the core datafusion crate and just depend directly on the crates that we need.

datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
Expand Down
5 changes: 5 additions & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ datafusion-common = { workspace = true }
datafusion = { workspace = true }
datafusion-functions-nested = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-expr = { workspace = true }
once_cell = "1.18.0"
Expand Down Expand Up @@ -118,3 +119,7 @@ harness = false
[[bench]]
name = "parquet_decode"
harness = false

[[bench]]
name = "filter"
harness = false
111 changes: 111 additions & 0 deletions native/core/benches/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder};

use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::builder::{BooleanBuilder, Int32Builder, StringBuilder};
use arrow_array::{ArrayRef, RecordBatch};
use comet::execution::operators::comet_filter_record_batch;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use std::sync::Arc;
use std::time::Duration;

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("filter");

let num_rows = 8192;
let num_int_cols = 4;
let num_string_cols = 4;

let batch = create_record_batch(num_rows, num_int_cols, num_string_cols);

// create some different predicates
let mut predicate_select_few = BooleanBuilder::with_capacity(num_rows);
let mut predicate_select_many = BooleanBuilder::with_capacity(num_rows);
let mut predicate_select_all = BooleanBuilder::with_capacity(num_rows);
for i in 0..num_rows {
predicate_select_few.append_value(i % 10 == 0);
predicate_select_many.append_value(i % 10 > 0);
predicate_select_all.append_value(true);
}
let predicate_select_few = predicate_select_few.finish();
let predicate_select_many = predicate_select_many.finish();
let predicate_select_all = predicate_select_all.finish();

// baseline uses Arrow's filter_record_batch method
group.bench_function("arrow_filter_record_batch - few rows selected", |b| {
b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_few)))
});
group.bench_function("arrow_filter_record_batch - many rows selected", |b| {
b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_many)))
});
group.bench_function("arrow_filter_record_batch - all rows selected", |b| {
b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_all)))
});

group.bench_function("comet_filter_record_batch - few rows selected", |b| {
b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_few)))
});
group.bench_function("comet_filter_record_batch - many rows selected", |b| {
b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_many)))
});
group.bench_function("comet_filter_record_batch - all rows selected", |b| {
b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_all)))
});

group.finish();
}

fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: i32) -> RecordBatch {
let mut int32_builder = Int32Builder::with_capacity(num_rows);
let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
for i in 0..num_rows {
int32_builder.append_value(i as i32);
string_builder.append_value(format!("this is string #{i}"));
}
let int32_array = Arc::new(int32_builder.finish());
let string_array = Arc::new(string_builder.finish());

let mut fields = vec![];
let mut columns: Vec<ArrayRef> = vec![];
let mut i = 0;
for _ in 0..num_int_cols {
fields.push(Field::new(format!("c{i}"), DataType::Int32, false));
columns.push(int32_array.clone()); // note this is just copying a reference to the array
i += 1;
}
for _ in 0..num_string_cols {
fields.push(Field::new(format!("c{i}"), DataType::Utf8, false));
columns.push(string_array.clone()); // note this is just copying a reference to the array
i += 1;
}
let schema = Schema::new(fields);
RecordBatch::try_new(Arc::new(schema), columns).unwrap()
}

fn config() -> Criterion {
Criterion::default()
.measurement_time(Duration::from_millis(500))
.warm_up_time(Duration::from_millis(500))
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
8 changes: 2 additions & 6 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use super::expressions::EvalMode;
use crate::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun;
use crate::execution::operators::CopyMode;
use crate::execution::operators::{CopyMode, FilterExec};
use crate::{
errors::ExpressionError,
execution::{
Expand Down Expand Up @@ -73,7 +73,6 @@ use datafusion::{
physical_optimizer::join_selection::swap_hash_join,
physical_plan::{
aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy},
filter::FilterExec,
joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec},
limit::LocalLimitExec,
projection::ProjectionExec,
Expand Down Expand Up @@ -1780,10 +1779,7 @@ impl From<ExpressionError> for DataFusionError {
/// modification. This is used to determine if we need to copy the input batch to avoid
/// data corruption from reusing the input batch.
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
if op.as_any().is::<ProjectionExec>()
|| op.as_any().is::<LocalLimitExec>()
|| op.as_any().is::<FilterExec>()
{
if op.as_any().is::<ProjectionExec>() || op.as_any().is::<LocalLimitExec>() {
can_reuse_input_batch(op.children()[0])
} else {
op.as_any().is::<ScanExec>()
Expand Down
Loading
Loading