-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Remove some redundant copying of batches #816
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #816 +/- ##
============================================
- Coverage 33.94% 33.77% -0.17%
+ Complexity 874 863 -11
============================================
Files 112 112
Lines 42916 42864 -52
Branches 9464 9461 -3
============================================
- Hits 14567 14478 -89
- Misses 25379 25397 +18
- Partials 2970 2989 +19 ☔ View full report in Codecov by Sentry. |
@@ -200,3 +217,56 @@ impl RecordBatchStream for CopyStream { | |||
self.schema.clone() | |||
} | |||
} | |||
|
|||
/// Copy an Arrow Array | |||
fn copy_array(array: &dyn Array) -> ArrayRef { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function was moved from mod.rs
with no changes
/// is a dictionary array, we will cast the dictionary array to primitive type | ||
/// (i.e., unpack the dictionary array) and copy the primitive array. If the input | ||
/// array is a primitive array, we simply copy the array. | ||
fn copy_or_unpack_array(array: &Arc<dyn Array>, mode: &CopyMode) -> Result<ArrayRef, ArrowError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function was moved from mod.rs
but was also modified
@@ -1205,15 +1210,15 @@ impl PhysicalPlanner { | |||
// to copy the input batch to avoid the data corruption from reusing the input | |||
// batch. | |||
let left = if can_reuse_input_batch(&left) { | |||
Arc::new(CopyExec::new(left)) | |||
Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that can_reuse_input_batch
returns true
in fewer cases now, so we are less likely to create the deep-copy version of CopyExec.
Previously we would create a CopyExec
around any projection or filter, but now we only do so if the projection or filter is wrapping a scan.
} else { | ||
left | ||
Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to unpack dictionaries before joins currently
I did see a small performance improvement with this change when running TPC-DS but not enough to be statistically significant. |
@@ -1775,10 +1780,14 @@ impl From<ExpressionError> for DataFusionError { | |||
/// modification. This is used to determine if we need to copy the input batch to avoid | |||
/// data corruption from reusing the input batch. | |||
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool { | |||
op.as_any().is::<ScanExec>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, but our native scan actually reuses batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is still being checked here:
} else {
op.as_any().is::<ScanExec>()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, okay, I missed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me. Thanks @andygrove
Thanks for the review @viirya |
(cherry picked from commit acb7828)
Which issue does this PR close?
N/A
Rationale for this change
We use
CopyExec
to wrap inputs to operators that can cache batches. This is to avoid corruption issues if the input batches come from aScanExec
where arrays are re-used, or from some other operators that may be wrappingScanExec
and that can pass through input batches unchanged (projection, limit, or filter).CopyExec
also unpacks dictionary arrays (not just primitives, but Utf8 as well).When we have nested joins, I see that
CopyExec
is processing the output of the nested joins and this is redundant because joins do not re-use arrays.This PR aims to avoid deep copies of arrays in some cases where it is not necessary.
What changes are included in this PR?
How are these changes tested?
Existing tests