Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Remove redundant copying of batches after FilterExec #835

Merged
merged 15 commits into from
Aug 16, 2024

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Aug 16, 2024

Which issue does this PR close?

Closes #757

Rationale for this change

We were often creating a filtered batch in FilterExec and then making a copy of that batch in CopyExec, resulting in redundant copying of data in the case where FilterExec had already created a new batch with the filtered data. It was necessary to have the CopyExec because FilterExec can pass through input batches in the case where the predicate evaluates to true for all rows and this is not safe in Comet because we re-use arrays in the ScanExec.

This PR introduces a customized version of FilterExec that uses Arrow's take_record_batch kernel to always create new batches, even in the case where the predicate evaluates to true for all rows. This removes the need to wrap FilterExec in a CopyExec.

This reduces our TPC-DS time by ~25 seconds.

tpcds_allqueries

The majority of queries are faster with this change:

tpcds_queries_speedup

What changes are included in this PR?

  • Copy DataFusion's FilterExec into Comet and make a one line change so that FilterExec never passes through input batches -- I will suggest some changes upstream so that we can customize rather than duplicate
  • Stop wrapping FilterExec in CopyExec

How are these changes tested?

  • Existing tests
  • I ran TPC-DS without error

@andygrove andygrove requested a review from viirya August 16, 2024 12:27
@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 33.95%. Comparing base (380f03d) to head (9458cfe).
Report is 1 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main     #835      +/-   ##
============================================
+ Coverage     33.83%   33.95%   +0.12%     
- Complexity      870      880      +10     
============================================
  Files           112      112              
  Lines         42970    42993      +23     
  Branches       9466     9473       +7     
============================================
+ Hits          14538    14598      +60     
+ Misses        25446    25404      -42     
- Partials       2986     2991       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@andygrove andygrove changed the title perf: Remove redundant copies of batches after FilterExec perf: Remove redundant copying of batches after FilterExec Aug 16, 2024
Comment on lines 362 to 378
// BEGIN Comet changes
pub fn filter_record_batch(
record_batch: &RecordBatch,
predicate: &BooleanArray,
) -> std::result::Result<RecordBatch, ArrowError> {
// turn predicate into selection vector
let mut sv = Int32Builder::with_capacity(predicate.true_count());
for i in 0..predicate.len() {
if !predicate.is_null(i) && predicate.value(i) {
sv.append_value(i as i32);
}
}
let sv = sv.finish();
// note that this does not unpack dictionary-encoded arrays
take_record_batch(record_batch, &sv)
}
// END Comet changes
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the modification to FilterExec

@andygrove
Copy link
Member Author

@mbutrovich you may be interested in reviewing this PR

predicate: &BooleanArray,
) -> std::result::Result<RecordBatch, ArrowError> {
// turn predicate into selection vector
let mut sv = Int32Builder::with_capacity(predicate.true_count());
Copy link

@Dandandan Dandandan Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably special casing the edge case on predicate.true_count() == record_batch.num_rows() and defaulting to filter otherwise would avoid regressing "normal filtering"

}
let sv = sv.finish();
// note that this does not unpack dictionary-encoded arrays
take_record_batch(record_batch, &sv)
Copy link

@Dandandan Dandandan Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a faster way to copy is to use MutableArrayData, like done in concatenate:

https://docs.rs/arrow-select/52.2.0/src/arrow_select/concat.rs.html#180

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is done inCopyExec already - this should be faster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this PR is to no longer need to use CopyExec after a FilterExec (to avoid copying twice in some cases)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes got it. It might be possible though to reuse the same code from copyexec in here for copying the recordbatch?

}
let sv = sv.finish();
// note that this does not unpack dictionary-encoded arrays
take_record_batch(record_batch, &sv)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think take_record_batch actually copies values based on indices. So even the batch is not filtered (all selected), it will copy the batch. Is it different to CopyExec?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. The goal is to ensure that FilterExec always copies instead of only copying most of the time, so that we avoid also having a CopyExec that performs yet another copy.

I have implemented the suggestion from @Dandandan to specialize for the case where all rows are selected

@andygrove
Copy link
Member Author

Thanks @Dandandan @viirya for the feedback so far. I have now added criterion benchmarks before I start addressing the feedback. Here are the current results.

filter/arrow_filter_record_batch - few
                        time:   [15.546 µs 15.601 µs 15.682 µs]
Found 9 outliers among 100 measurements (9.00%)
  3 (3.00%) high mild
  6 (6.00%) high severe
filter/arrow_filter_record_batch - many
                        time:   [74.935 µs 75.129 µs 75.324 µs]
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild
filter/arrow_filter_record_batch - all
                        time:   [396.57 ns 397.80 ns 399.05 ns]
filter/comet_filter - few
                        time:   [21.772 µs 22.012 µs 22.337 µs]
Found 14 outliers among 100 measurements (14.00%)
  2 (2.00%) high mild
  12 (12.00%) high severe
Benchmarking filter/comet_filter - many: Warming up for 500.00 ms
Warning: Unable to complete 100 samples in 500.0ms. You may wish to increase target time to 762.3ms, enable flat sampling, or reduce sample count to 50.
filter/comet_filter - many
                        time:   [146.44 µs 147.25 µs 148.01 µs]
Found 12 outliers among 100 measurements (12.00%)
  12 (12.00%) low mild
Benchmarking filter/comet_filter - all: Warming up for 500.00 ms
Warning: Unable to complete 100 samples in 500.0ms. You may wish to increase target time to 895.1ms, enable flat sampling, or reduce sample count to 50.
filter/comet_filter - all
                        time:   [177.25 µs 183.04 µs 190.27 µs]
Found 13 outliers among 100 measurements (13.00%)
  2 (2.00%) high mild
  11 (11.00%) high severe

// turn predicate into selection vector
let mut sv = Int32Builder::with_capacity(predicate.true_count());
for i in 0..predicate.len() {
if !predicate.is_null(i) && predicate.value(i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible special case that we should measure the impact of: we could elide !predicate.is_null(i) if we know there are no nulls in the predicate array. You end up with duplicated loops (one with and without the null check) but depending on the branch predictor this may be worth it.

}

// BEGIN Comet changes
pub fn comet_filter_record_batch(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn comet_filter_record_batch(
fn comet_filter_record_batch(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it has to be public so that we can access it from the criterion benchmark but I will double check this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

@andygrove
Copy link
Member Author

After addressing the first round of feedback, we now have:

pub fn comet_filter_record_batch(
    record_batch: &RecordBatch,
    predicate: &BooleanArray,
) -> std::result::Result<RecordBatch, ArrowError> {
    if predicate.true_count() == record_batch.num_rows() {
        // special case where we just make an exact copy
        let arrays: Vec<ArrayRef> = record_batch
            .columns()
            .iter()
            .map(|array| {
                let capacity = array.len();
                let data = array.to_data();
                let mut mutable = MutableArrayData::new(vec![&data], false, capacity);
                mutable.extend(0, 0, capacity);
                make_array(mutable.freeze())
            })
            .collect();
        let options = RecordBatchOptions::new().with_row_count(Some(record_batch.num_rows()));
        RecordBatch::try_new_with_options(record_batch.schema().clone(), arrays, &options)
    } else {
        filter_record_batch(record_batch, predicate)
    }
}

New benchmark results:

filter/comet_filter - few
                        time:   [14.650 µs 14.727 µs 14.831 µs]
                        change: [-36.702% -35.128% -33.681%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high severe
filter/comet_filter - many
                        time:   [75.962 µs 76.172 µs 76.381 µs]
                        change: [-48.681% -48.501% -48.303%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe
filter/comet_filter - all
                        time:   [34.497 µs 34.628 µs 34.764 µs]
                        change: [-80.854% -80.527% -80.256%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  6 (6.00%) low mild
  1 (1.00%) high mild
  1 (1.00%) high severe

This certainly looks a lot better. I am running TPC-DS again to make sure this really is always copying. I had tried an approach like this in the past but ran into data corruption issues.

let options = RecordBatchOptions::new().with_row_count(Some(record_batch.num_rows()));
RecordBatch::try_new_with_options(record_batch.schema().clone(), arrays, &options)
} else {
filter_record_batch(record_batch, predicate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it is good to use this arrow-rs kernel instead of call take kernel, as filter_record_batch has some optimizations for filter selectivity.

@@ -44,6 +44,7 @@ datafusion = { default-features = false, git = "https://github.com/apache/datafu
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", features = ["crypto_expressions"] }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need datafusion-execution?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use use datafusion::execution::TaskContext. I guess we were just pulling this in transitively before via the datafusion crate rather than being explicit.

We may want to avoid bringing in the core datafusion crate and just depend directly on the crates that we need.

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
@andygrove
Copy link
Member Author

The results are even better now 🎉

tpcds_allqueries

tpcds_queries_speedup

.map(|array| {
let capacity = array.len();
let data = array.to_data();
let mut mutable = MutableArrayData::new(vec![&data], false, capacity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a Rust question for myself to dig into: is there a reason we can't do array.to_data().clone() and then call make_array on that? It seems inefficient to allocate a resizable data structure, truncate it with extend() then freeze() it, but I am still very new to this. Thankfully there are good benchmarks in this PR for me to explore that on my own. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clone will have a shallow copy with the buffers are still shared, but we need actually copying the data.

Copy link
Contributor

@mbutrovich mbutrovich Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought copy() would make a shallow copy, while clone() would deep copy the data? If that were the case, we could possibly clone() the whole RecordBatch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer implementation under the arrays holds references. clone will just copy the references without copying data.

Copy link
Contributor

@mbutrovich mbutrovich Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So RecordBatch stores a Vec of reference counted arrays columns: Vec<Arc<dyn Array>>? That makes sense to me that if we clone() RecordBatch we just bump ref counts, and we don't get the desired result. But the underlying ArrayData should be safe to clone() because its underlying Vec will deep copy its buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see the reference counted stuff all the way down inside Buffer. I understand why it won't work now. Thanks!

Copy link
Member

@viirya viirya Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the snippet of Buffer implementation in arrow-rs:

pub struct Buffer {
    /// the internal byte buffer.
    data: Arc<Bytes>,
    ...
}

And ArrayData:

pub struct ArrayData {
    ...
    buffers: Vec<Buffer>,
    ...
}

When you clone it, Buffer won't copy data for you but you only get a clone of Arc, i.e., a new reference to the existing data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was necessary to have the CopyExec because FilterExec can pass through input batches in the case where the predicate evaluates to true for all rows and this is not safe in Comet because we re-use arrays in the ScanExec.

I wonder if you can use the machinery with unary_mut to know when it is safe to reuse the arrays and when a copy is required.

So that would look like the scan using something like unary_mut, which can check if there are other existing references or not.

For example, see the examples here https://docs.rs/arrow/latest/arrow/array/struct.PrimitiveArray.html#method.unary_mut ?

@andygrove
Copy link
Member Author

andygrove commented Aug 16, 2024

I created a PR against DataFusion to add a flag to FilterExec so that we can switch back to using the upstream version, assumign this PR gets accepted.

apache/datafusion#12039

@andygrove
Copy link
Member Author

Thanks for the reviews @viirya @Dandandan @mbutrovich

@andygrove andygrove merged commit 3f826a3 into apache:main Aug 16, 2024
74 checks passed
@andygrove andygrove deleted the filter-always-copy branch August 16, 2024 22:42
himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
* Use custom FilterExec that always uses take with a selection vector

* Remove CopyExec around FilterExec

* remove CopyExec on FilterExec inputs to joins

* remove copy before sort in some cases

* add comments

* cargo fmt

* bug fix: check for null when building selection vector

* revert

* use arrow kernel

* remove unused imports

* add criterion benchmark

* address initial feedback

* add ASF header

* fix missing imports

* Update native/core/src/execution/operators/filter.rs

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>

---------

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Avoid copying batches twice in scan->filter->join
6 participants