Skip to content

Commit

Permalink
perf: Remove some redundant copying of batches (apache#816)
Browse files Browse the repository at this point in the history
(cherry picked from commit acb7828)
  • Loading branch information
andygrove authored and huaxingao committed Aug 22, 2024
1 parent 1e8693f commit 86b261c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 77 deletions.
27 changes: 18 additions & 9 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use super::expressions::EvalMode;
use crate::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun;
use crate::execution::operators::CopyMode;
use crate::{
errors::ExpressionError,
execution::{
Expand Down Expand Up @@ -859,7 +860,11 @@ impl PhysicalPlanner {

let fetch = sort.fetch.map(|num| num as usize);

let copy_exec = Arc::new(CopyExec::new(child));
let copy_exec = if can_reuse_input_batch(&child) {
Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy))
} else {
Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone))
};

Ok((
scans,
Expand Down Expand Up @@ -949,8 +954,8 @@ impl PhysicalPlanner {
// the data corruption. Note that we only need to copy the input batch
// if the child operator is `ScanExec`, because other operators after `ScanExec`
// will create new arrays for the output batch.
let child = if child.as_any().is::<ScanExec>() {
Arc::new(CopyExec::new(child))
let child = if can_reuse_input_batch(&child) {
Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy))
} else {
child
};
Expand Down Expand Up @@ -1205,15 +1210,15 @@ impl PhysicalPlanner {
// to copy the input batch to avoid the data corruption from reusing the input
// batch.
let left = if can_reuse_input_batch(&left) {
Arc::new(CopyExec::new(left))
Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy))
} else {
left
Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone))
};

let right = if can_reuse_input_batch(&right) {
Arc::new(CopyExec::new(right))
Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy))
} else {
right
Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone))
};

Ok((
Expand Down Expand Up @@ -1775,10 +1780,14 @@ impl From<ExpressionError> for DataFusionError {
/// modification. This is used to determine if we need to copy the input batch to avoid
/// data corruption from reusing the input batch.
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
op.as_any().is::<ScanExec>()
if op.as_any().is::<ProjectionExec>()
|| op.as_any().is::<LocalLimitExec>()
|| op.as_any().is::<ProjectionExec>()
|| op.as_any().is::<FilterExec>()
{
can_reuse_input_batch(op.children()[0])
} else {
op.as_any().is::<ScanExec>()
}
}

/// Collects the indices of the columns in the input schema that are used in the expression
Expand Down
88 changes: 79 additions & 9 deletions native/core/src/execution/operators/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,25 @@
// specific language governing permissions and limitations
// under the License.

use arrow::compute::{cast_with_options, CastOptions};
use futures::{Stream, StreamExt};
use std::{
any::Any,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use futures::{Stream, StreamExt};

use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
use arrow_array::{
downcast_dictionary_array, make_array, Array, ArrayRef, RecordBatch, RecordBatchOptions,
};
use arrow_data::transform::MutableArrayData;
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaRef};

use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};

use super::copy_or_cast_array;

/// An utility execution node which makes deep copies of input batches.
///
/// In certain scenarios like sort, DF execution nodes only make shallow copy of input batches.
Expand All @@ -44,10 +45,20 @@ pub struct CopyExec {
schema: SchemaRef,
cache: PlanProperties,
metrics: ExecutionPlanMetricsSet,
mode: CopyMode,
}

#[derive(Debug, PartialEq, Clone)]
pub enum CopyMode {
UnpackOrDeepCopy,
UnpackOrClone,
}

impl CopyExec {
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
pub fn new(input: Arc<dyn ExecutionPlan>, mode: CopyMode) -> Self {
// change schema to remove dictionary types because CopyExec always unpacks
// dictionaries

let fields: Vec<Field> = input
.schema()
.fields
Expand All @@ -73,6 +84,7 @@ impl CopyExec {
schema,
cache,
metrics: ExecutionPlanMetricsSet::default(),
mode,
}
}
}
Expand All @@ -81,7 +93,7 @@ impl DisplayAs for CopyExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "CopyExec")
write!(f, "CopyExec [{:?}]", self.mode)
}
}
}
Expand Down Expand Up @@ -111,6 +123,7 @@ impl ExecutionPlan for CopyExec {
schema: self.schema.clone(),
cache: self.cache.clone(),
metrics: self.metrics.clone(),
mode: self.mode.clone(),
}))
}

Expand All @@ -125,6 +138,7 @@ impl ExecutionPlan for CopyExec {
self.schema(),
child_stream,
partition,
self.mode.clone(),
)))
}

Expand All @@ -149,6 +163,7 @@ struct CopyStream {
schema: SchemaRef,
child_stream: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
mode: CopyMode,
}

impl CopyStream {
Expand All @@ -157,11 +172,13 @@ impl CopyStream {
schema: SchemaRef,
child_stream: SendableRecordBatchStream,
partition: usize,
mode: CopyMode,
) -> Self {
Self {
schema,
child_stream,
baseline_metrics: BaselineMetrics::new(&exec.metrics, partition),
mode,
}
}

Expand All @@ -172,7 +189,7 @@ impl CopyStream {
let vectors = batch
.columns()
.iter()
.map(|v| copy_or_cast_array(v))
.map(|v| copy_or_unpack_array(v, &self.mode))
.collect::<Result<Vec<ArrayRef>, _>>()?;

let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
Expand Down Expand Up @@ -200,3 +217,56 @@ impl RecordBatchStream for CopyStream {
self.schema.clone()
}
}

/// Copy an Arrow Array
fn copy_array(array: &dyn Array) -> ArrayRef {
let capacity = array.len();
let data = array.to_data();

let mut mutable = MutableArrayData::new(vec![&data], false, capacity);

mutable.extend(0, 0, capacity);

if matches!(array.data_type(), DataType::Dictionary(_, _)) {
let copied_dict = make_array(mutable.freeze());
let ref_copied_dict = &copied_dict;

downcast_dictionary_array!(
ref_copied_dict => {
// Copying dictionary value array
let values = ref_copied_dict.values();
let data = values.to_data();

let mut mutable = MutableArrayData::new(vec![&data], false, values.len());
mutable.extend(0, 0, values.len());

let copied_dict = ref_copied_dict.with_values(make_array(mutable.freeze()));
Arc::new(copied_dict)
}
t => unreachable!("Should not reach here: {}", t)
)
} else {
make_array(mutable.freeze())
}
}

/// Copy an Arrow Array or cast to primitive type if it is a dictionary array.
/// This is used for `CopyExec` to copy/cast the input array. If the input array
/// is a dictionary array, we will cast the dictionary array to primitive type
/// (i.e., unpack the dictionary array) and copy the primitive array. If the input
/// array is a primitive array, we simply copy the array.
fn copy_or_unpack_array(array: &Arc<dyn Array>, mode: &CopyMode) -> Result<ArrayRef, ArrowError> {
match array.data_type() {
DataType::Dictionary(_, value_type) => {
let options = CastOptions::default();
cast_with_options(array, value_type.as_ref(), &options)
}
_ => {
if mode == &CopyMode::UnpackOrDeepCopy {
Ok(copy_array(array))
} else {
Ok(Arc::clone(array))
}
}
}
}
62 changes: 3 additions & 59 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,15 @@

//! Operators
use arrow::{
array::{make_array, Array, ArrayRef, MutableArrayData},
datatypes::DataType,
downcast_dictionary_array,
};
use std::fmt::Debug;

use arrow::compute::{cast_with_options, CastOptions};
use arrow_schema::ArrowError;
use jni::objects::GlobalRef;
use std::{fmt::Debug, sync::Arc};

mod scan;
pub use copy::*;
pub use scan::*;

mod copy;
pub use copy::*;
mod scan;

/// Error returned during executing operators.
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -61,52 +54,3 @@ pub enum ExecutionError {
throwable: GlobalRef,
},
}

/// Copy an Arrow Array
pub fn copy_array(array: &dyn Array) -> ArrayRef {
let capacity = array.len();
let data = array.to_data();

let mut mutable = MutableArrayData::new(vec![&data], false, capacity);

mutable.extend(0, 0, capacity);

if matches!(array.data_type(), DataType::Dictionary(_, _)) {
let copied_dict = make_array(mutable.freeze());
let ref_copied_dict = &copied_dict;

downcast_dictionary_array!(
ref_copied_dict => {
// Copying dictionary value array
let values = ref_copied_dict.values();
let data = values.to_data();

let mut mutable = MutableArrayData::new(vec![&data], false, values.len());
mutable.extend(0, 0, values.len());

let copied_dict = ref_copied_dict.with_values(make_array(mutable.freeze()));
Arc::new(copied_dict)
}
t => unreachable!("Should not reach here: {}", t)
)
} else {
make_array(mutable.freeze())
}
}

/// Copy an Arrow Array or cast to primitive type if it is a dictionary array.
/// This is used for `CopyExec` to copy/cast the input array. If the input array
/// is a dictionary array, we will cast the dictionary array to primitive type
/// (i.e., unpack the dictionary array) and copy the primitive array. If the input
/// array is a primitive array, we simply copy the array.
pub fn copy_or_cast_array(array: &dyn Array) -> Result<ArrayRef, ArrowError> {
match array.data_type() {
DataType::Dictionary(_, value_type) => {
let options = CastOptions::default();
let casted = cast_with_options(array, value_type.as_ref(), &options);

casted.and_then(|a| copy_or_cast_array(a.as_ref()))
}
_ => Ok(copy_array(array)),
}
}

0 comments on commit 86b261c

Please sign in to comment.