diff --git a/native/core/src/execution/datafusion/expressions/avg.rs b/native/core/src/execution/datafusion/expressions/avg.rs index d1ebc67db..c3277bf7a 100644 --- a/native/core/src/execution/datafusion/expressions/avg.rs +++ b/native/core/src/execution/datafusion/expressions/avg.rs @@ -96,7 +96,7 @@ impl AggregateExpr for Avg { } fn expressions(&self) -> Vec> { - vec![self.expr.clone()] + vec![Arc::clone(&self.expr)] } fn name(&self) -> &str { diff --git a/native/core/src/execution/datafusion/expressions/avg_decimal.rs b/native/core/src/execution/datafusion/expressions/avg_decimal.rs index 7a6e988e0..d29dc6dab 100644 --- a/native/core/src/execution/datafusion/expressions/avg_decimal.rs +++ b/native/core/src/execution/datafusion/expressions/avg_decimal.rs @@ -107,7 +107,7 @@ impl AggregateExpr for AvgDecimal { } fn expressions(&self) -> Vec> { - vec![self.expr.clone()] + vec![Arc::clone(&self.expr)] } fn name(&self) -> &str { diff --git a/native/core/src/execution/datafusion/expressions/bitwise_not.rs b/native/core/src/execution/datafusion/expressions/bitwise_not.rs index 06ead2670..c7b2bc067 100644 --- a/native/core/src/execution/datafusion/expressions/bitwise_not.rs +++ b/native/core/src/execution/datafusion/expressions/bitwise_not.rs @@ -113,7 +113,7 @@ impl PhysicalExpr for BitwiseNotExpr { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(BitwiseNotExpr::new(children[0].clone()))) + Ok(Arc::new(BitwiseNotExpr::new(Arc::clone(&children[0])))) } fn dyn_hash(&self, state: &mut dyn Hasher) { diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs index b922119f8..b66ca5b2c 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs @@ -138,8 +138,8 @@ impl PhysicalExpr for BloomFilterMightContain { children: Vec>, ) -> Result> { Ok(Arc::new(BloomFilterMightContain::try_new( - children[0].clone(), - children[1].clone(), + Arc::clone(&children[0]), + Arc::clone(&children[1]), )?)) } diff --git a/native/core/src/execution/datafusion/expressions/checkoverflow.rs b/native/core/src/execution/datafusion/expressions/checkoverflow.rs index e4f54a1b8..ff2cffd42 100644 --- a/native/core/src/execution/datafusion/expressions/checkoverflow.rs +++ b/native/core/src/execution/datafusion/expressions/checkoverflow.rs @@ -158,7 +158,7 @@ impl PhysicalExpr for CheckOverflow { children: Vec>, ) -> datafusion_common::Result> { Ok(Arc::new(CheckOverflow::new( - children[0].clone(), + Arc::clone(&children[0]), self.data_type.clone(), self.fail_on_error, ))) diff --git a/native/core/src/execution/datafusion/expressions/correlation.rs b/native/core/src/execution/datafusion/expressions/correlation.rs index 78a3f3419..642c7d866 100644 --- a/native/core/src/execution/datafusion/expressions/correlation.rs +++ b/native/core/src/execution/datafusion/expressions/correlation.rs @@ -115,7 +115,7 @@ impl AggregateExpr for Correlation { } fn expressions(&self) -> Vec> { - vec![self.expr1.clone(), self.expr2.clone()] + vec![Arc::clone(&self.expr1), Arc::clone(&self.expr2)] } fn name(&self) -> &str { @@ -209,13 +209,21 @@ impl Accumulator for CorrelationAccumulator { fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { let states_c = [ - states[0].clone(), - states[1].clone(), - states[2].clone(), - states[3].clone(), + Arc::clone(&states[0]), + Arc::clone(&states[1]), + Arc::clone(&states[2]), + Arc::clone(&states[3]), + ]; + let states_s1 = [ + Arc::clone(&states[0]), + Arc::clone(&states[1]), + Arc::clone(&states[4]), + ]; + let states_s2 = [ + Arc::clone(&states[0]), + Arc::clone(&states[2]), + Arc::clone(&states[5]), ]; - let states_s1 = [states[0].clone(), states[1].clone(), states[4].clone()]; - let states_s2 = [states[0].clone(), states[2].clone(), states[5].clone()]; if states[0].len() > 0 && states[1].len() > 0 && states[2].len() > 0 { self.covar.merge_batch(&states_c)?; diff --git a/native/core/src/execution/datafusion/expressions/covariance.rs b/native/core/src/execution/datafusion/expressions/covariance.rs index 151fb61a2..20a6fab94 100644 --- a/native/core/src/execution/datafusion/expressions/covariance.rs +++ b/native/core/src/execution/datafusion/expressions/covariance.rs @@ -112,7 +112,7 @@ impl AggregateExpr for Covariance { } fn expressions(&self) -> Vec> { - vec![self.expr1.clone(), self.expr2.clone()] + vec![Arc::clone(&self.expr1), Arc::clone(&self.expr2)] } fn name(&self) -> &str { diff --git a/native/core/src/execution/datafusion/expressions/negative.rs b/native/core/src/execution/datafusion/expressions/negative.rs index 9e82812be..fbcd194f0 100644 --- a/native/core/src/execution/datafusion/expressions/negative.rs +++ b/native/core/src/execution/datafusion/expressions/negative.rs @@ -200,7 +200,7 @@ impl PhysicalExpr for NegativeExpr { children: Vec>, ) -> Result> { Ok(Arc::new(NegativeExpr::new( - children[0].clone(), + Arc::clone(&children[0]), self.fail_on_error, ))) } diff --git a/native/core/src/execution/datafusion/expressions/normalize_nan.rs b/native/core/src/execution/datafusion/expressions/normalize_nan.rs index 3bd5feea5..d2192feef 100644 --- a/native/core/src/execution/datafusion/expressions/normalize_nan.rs +++ b/native/core/src/execution/datafusion/expressions/normalize_nan.rs @@ -87,7 +87,7 @@ impl PhysicalExpr for NormalizeNaNAndZero { ) -> datafusion_common::Result> { Ok(Arc::new(NormalizeNaNAndZero::new( self.data_type.clone(), - children[0].clone(), + Arc::clone(&children[0]), ))) } diff --git a/native/core/src/execution/datafusion/expressions/stddev.rs b/native/core/src/execution/datafusion/expressions/stddev.rs index bbddf9aa4..50d66463a 100644 --- a/native/core/src/execution/datafusion/expressions/stddev.rs +++ b/native/core/src/execution/datafusion/expressions/stddev.rs @@ -102,7 +102,7 @@ impl AggregateExpr for Stddev { } fn expressions(&self) -> Vec> { - vec![self.expr.clone()] + vec![Arc::clone(&self.expr)] } fn name(&self) -> &str { diff --git a/native/core/src/execution/datafusion/expressions/strings.rs b/native/core/src/execution/datafusion/expressions/strings.rs index 9112c256b..96e45eae2 100644 --- a/native/core/src/execution/datafusion/expressions/strings.rs +++ b/native/core/src/execution/datafusion/expressions/strings.rs @@ -228,7 +228,7 @@ impl PhysicalExpr for SubstringExpr { children: Vec>, ) -> datafusion_common::Result> { Ok(Arc::new(SubstringExpr::new( - children[0].clone(), + Arc::clone(&children[0]), self.start, self.len, ))) @@ -292,7 +292,7 @@ impl PhysicalExpr for StringSpaceExpr { self: Arc, children: Vec>, ) -> datafusion_common::Result> { - Ok(Arc::new(StringSpaceExpr::new(children[0].clone()))) + Ok(Arc::new(StringSpaceExpr::new(Arc::clone(&children[0])))) } fn dyn_hash(&self, state: &mut dyn Hasher) { diff --git a/native/core/src/execution/datafusion/expressions/sum_decimal.rs b/native/core/src/execution/datafusion/expressions/sum_decimal.rs index 2afbbf011..4eb36cbd0 100644 --- a/native/core/src/execution/datafusion/expressions/sum_decimal.rs +++ b/native/core/src/execution/datafusion/expressions/sum_decimal.rs @@ -94,7 +94,7 @@ impl AggregateExpr for SumDecimal { } fn expressions(&self) -> Vec> { - vec![self.expr.clone()] + vec![Arc::clone(&self.expr)] } fn name(&self) -> &str { diff --git a/native/core/src/execution/datafusion/expressions/variance.rs b/native/core/src/execution/datafusion/expressions/variance.rs index f996c13d8..19322aa63 100644 --- a/native/core/src/execution/datafusion/expressions/variance.rs +++ b/native/core/src/execution/datafusion/expressions/variance.rs @@ -100,7 +100,7 @@ impl AggregateExpr for Variance { } fn expressions(&self) -> Vec> { - vec![self.expr.clone()] + vec![Arc::clone(&self.expr)] } fn name(&self) -> &str { diff --git a/native/core/src/execution/datafusion/operators/expand.rs b/native/core/src/execution/datafusion/operators/expand.rs index 67171212f..a3dd06507 100644 --- a/native/core/src/execution/datafusion/operators/expand.rs +++ b/native/core/src/execution/datafusion/operators/expand.rs @@ -52,7 +52,7 @@ impl CometExpandExec { schema: SchemaRef, ) -> Self { let cache = PlanProperties::new( - EquivalenceProperties::new(schema.clone()), + EquivalenceProperties::new(Arc::clone(&schema)), Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); @@ -93,7 +93,7 @@ impl ExecutionPlan for CometExpandExec { } fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } fn children(&self) -> Vec<&Arc> { @@ -106,8 +106,8 @@ impl ExecutionPlan for CometExpandExec { ) -> datafusion_common::Result> { let new_expand = CometExpandExec::new( self.projections.clone(), - children[0].clone(), - self.schema.clone(), + Arc::clone(&children[0]), + Arc::clone(&self.schema), ); Ok(Arc::new(new_expand)) } @@ -118,8 +118,11 @@ impl ExecutionPlan for CometExpandExec { context: Arc, ) -> datafusion_common::Result { let child_stream = self.child.execute(partition, context)?; - let expand_stream = - ExpandStream::new(self.projections.clone(), child_stream, self.schema.clone()); + let expand_stream = ExpandStream::new( + self.projections.clone(), + child_stream, + Arc::clone(&self.schema), + ); Ok(Box::pin(expand_stream)) } @@ -174,7 +177,7 @@ impl ExpandStream { })?; let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - RecordBatch::try_new_with_options(self.schema.clone(), columns, &options) + RecordBatch::try_new_with_options(Arc::clone(&self.schema), columns, &options) .map_err(|e| e.into()) } } @@ -210,6 +213,6 @@ impl Stream for ExpandStream { impl RecordBatchStream for ExpandStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index fe6ef9f7b..b0137bf85 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -163,7 +163,7 @@ impl PhysicalPlanner { Self { exec_context_id, execution_props: self.execution_props, - session_ctx: self.session_ctx.clone(), + session_ctx: Arc::clone(&self.session_ctx), } } @@ -210,37 +210,43 @@ impl PhysicalPlanner { input_schema, ), ExprStruct::Eq(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::Eq; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::Neq(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::NotEq; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::Gt(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::Gt; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::GtEq(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::GtEq; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::Lt(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::Lt; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::LtEq(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::LtEq; Ok(Arc::new(BinaryExpr::new(left, op, right))) @@ -272,13 +278,15 @@ impl PhysicalPlanner { Ok(Arc::new(IsNullExpr::new(child))) } ExprStruct::And(and) => { - let left = self.create_expr(and.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(and.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(and.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::And; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::Or(or) => { - let left = self.create_expr(or.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(or.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(or.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::Or; Ok(Arc::new(BinaryExpr::new(left, op, right))) @@ -396,13 +404,15 @@ impl PhysicalPlanner { Ok(Arc::new(SecondExpr::new(child, timezone))) } ExprStruct::TruncDate(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; + let child = + self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; let format = self.create_expr(expr.format.as_ref().unwrap(), input_schema)?; Ok(Arc::new(DateTruncExpr::new(child, format))) } ExprStruct::TruncTimestamp(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; + let child = + self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; let format = self.create_expr(expr.format.as_ref().unwrap(), input_schema)?; let timezone = expr.timezone.clone(); @@ -427,31 +437,36 @@ impl PhysicalPlanner { Ok(Arc::new(StringSpaceExpr::new(child))) } ExprStruct::Contains(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; Ok(Arc::new(Contains::new(left, right))) } ExprStruct::StartsWith(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; Ok(Arc::new(StartsWith::new(left, right))) } ExprStruct::EndsWith(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; Ok(Arc::new(EndsWith::new(left, right))) } ExprStruct::Like(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; Ok(Arc::new(Like::new(left, right))) } ExprStruct::Rlike(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; match right.as_any().downcast_ref::().unwrap().value() { ScalarValue::Utf8(Some(pattern)) => { @@ -475,19 +490,22 @@ impl PhysicalPlanner { } ExprStruct::ScalarFunc(expr) => self.create_scalar_function_expr(expr, input_schema), ExprStruct::EqNullSafe(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::IsNotDistinctFrom; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::NeqNullSafe(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::IsDistinctFrom; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::BitwiseAnd(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::BitwiseAnd; Ok(Arc::new(BinaryExpr::new(left, op, right))) @@ -497,32 +515,36 @@ impl PhysicalPlanner { Ok(Arc::new(BitwiseNotExpr::new(child))) } ExprStruct::BitwiseOr(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::BitwiseOr; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::BitwiseXor(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::BitwiseXor; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::BitwiseShiftRight(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::BitwiseShiftRight; Ok(Arc::new(BinaryExpr::new(left, op, right))) } ExprStruct::BitwiseShiftLeft(expr) => { - let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let left = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; let op = DataFusionOperator::BitwiseShiftLeft; Ok(Arc::new(BinaryExpr::new(left, op, right))) } // https://github.com/apache/datafusion-comet/issues/666 // ExprStruct::Abs(expr) => { - // let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; + // let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; // let return_type = child.data_type(&input_schema)?; // let args = vec![child]; // let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; @@ -537,12 +559,12 @@ impl PhysicalPlanner { let when_then_pairs = case_when .when .iter() - .map(|x| self.create_expr(x, input_schema.clone())) + .map(|x| self.create_expr(x, Arc::clone(&input_schema))) .zip( case_when .then .iter() - .map(|then| self.create_expr(then, input_schema.clone())), + .map(|then| self.create_expr(then, Arc::clone(&input_schema))), ) .try_fold(Vec::new(), |mut acc, (a, b)| { acc.push((a?, b?)); @@ -565,20 +587,20 @@ impl PhysicalPlanner { } ExprStruct::In(expr) => { let value = - self.create_expr(expr.in_value.as_ref().unwrap(), input_schema.clone())?; + self.create_expr(expr.in_value.as_ref().unwrap(), Arc::clone(&input_schema))?; let list = expr .lists .iter() - .map(|x| self.create_expr(x, input_schema.clone())) + .map(|x| self.create_expr(x, Arc::clone(&input_schema))) .collect::, _>>()?; in_list(value, list, &expr.negated, input_schema.as_ref()).map_err(|e| e.into()) } ExprStruct::If(expr) => { let if_expr = - self.create_expr(expr.if_expr.as_ref().unwrap(), input_schema.clone())?; + self.create_expr(expr.if_expr.as_ref().unwrap(), Arc::clone(&input_schema))?; let true_expr = - self.create_expr(expr.true_expr.as_ref().unwrap(), input_schema.clone())?; + self.create_expr(expr.true_expr.as_ref().unwrap(), Arc::clone(&input_schema))?; let false_expr = self.create_expr(expr.false_expr.as_ref().unwrap(), input_schema)?; Ok(Arc::new(IfExpr::new(if_expr, true_expr, false_expr))) @@ -589,7 +611,7 @@ impl PhysicalPlanner { } ExprStruct::UnaryMinus(expr) => { let child: Arc = - self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; + self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; let result = negative::create_negate_expr(child, expr.fail_on_error); result.map_err(|e| ExecutionError::GeneralError(e.to_string())) } @@ -604,8 +626,10 @@ impl PhysicalPlanner { Ok(Arc::new(Subquery::new(self.exec_context_id, id, data_type))) } ExprStruct::BloomFilterMightContain(expr) => { - let bloom_filter_expr = - self.create_expr(expr.bloom_filter.as_ref().unwrap(), input_schema.clone())?; + let bloom_filter_expr = self.create_expr( + expr.bloom_filter.as_ref().unwrap(), + Arc::clone(&input_schema), + )?; let value_expr = self.create_expr(expr.value.as_ref().unwrap(), input_schema)?; Ok(Arc::new(BloomFilterMightContain::try_new( bloom_filter_expr, @@ -616,13 +640,14 @@ impl PhysicalPlanner { let values = expr .values .iter() - .map(|expr| self.create_expr(expr, input_schema.clone())) + .map(|expr| self.create_expr(expr, Arc::clone(&input_schema))) .collect::, _>>()?; let names = expr.names.clone(); Ok(Arc::new(CreateNamedStruct::new(values, names))) } ExprStruct::GetStructField(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; + let child = + self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; Ok(Arc::new(GetStructField::new(child, expr.ordinal as usize))) } expr => Err(ExecutionError::GeneralError(format!( @@ -669,8 +694,8 @@ impl PhysicalPlanner { op: DataFusionOperator, input_schema: SchemaRef, ) -> Result, ExecutionError> { - let left = self.create_expr(left, input_schema.clone())?; - let right = self.create_expr(right, input_schema.clone())?; + let left = self.create_expr(left, Arc::clone(&input_schema))?; + let right = self.create_expr(right, Arc::clone(&input_schema))?; match ( &op, left.data_type(&input_schema), @@ -801,7 +826,7 @@ impl PhysicalPlanner { let agg_exprs: PhyAggResult = agg .agg_exprs .iter() - .map(|expr| self.create_agg_expr(expr, schema.clone())) + .map(|expr| self.create_agg_expr(expr, Arc::clone(&schema))) .collect(); let num_agg = agg.agg_exprs.len(); @@ -811,8 +836,8 @@ impl PhysicalPlanner { group_by, agg_exprs?, vec![None; num_agg], // no filter expressions - child.clone(), - schema.clone(), + Arc::clone(&child), + Arc::clone(&schema), )?, ); let result_exprs: PhyExprResult = agg @@ -1039,13 +1064,13 @@ impl PhysicalPlanner { let sort_exprs: Result, ExecutionError> = wnd .order_by_list .iter() - .map(|expr| self.create_sort_expr(expr, input_schema.clone())) + .map(|expr| self.create_sort_expr(expr, Arc::clone(&input_schema))) .collect(); let partition_exprs: Result>, ExecutionError> = wnd .partition_by_list .iter() - .map(|expr| self.create_expr(expr, input_schema.clone())) + .map(|expr| self.create_expr(expr, Arc::clone(&input_schema))) .collect(); let sort_exprs = &sort_exprs?; @@ -1057,7 +1082,7 @@ impl PhysicalPlanner { .map(|expr| { self.create_window_expr( expr, - input_schema.clone(), + Arc::clone(&input_schema), partition_exprs, sort_exprs, ) @@ -1147,7 +1172,7 @@ impl PhysicalPlanner { val_type.as_ref().clone(), f.is_nullable(), )), - _ => f.clone(), + _ => Arc::clone(f), }) .collect(); @@ -1248,17 +1273,17 @@ impl PhysicalPlanner { let children = expr .children .iter() - .map(|child| self.create_expr(child, schema.clone())) + .map(|child| self.create_expr(child, Arc::clone(&schema))) .collect::, _>>()?; // create `IS NOT NULL expr` and join them with `AND` if there are multiple let not_null_expr: Arc = children.iter().skip(1).fold( - Arc::new(IsNotNullExpr::new(children[0].clone())) as Arc, + Arc::new(IsNotNullExpr::new(Arc::clone(&children[0]))) as Arc, |acc, child| { Arc::new(BinaryExpr::new( acc, DataFusionOperator::And, - Arc::new(IsNotNullExpr::new(child.clone())), + Arc::new(IsNotNullExpr::new(Arc::clone(child))), )) }, ); @@ -1283,7 +1308,7 @@ impl PhysicalPlanner { .map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Min(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); create_aggregate_expr( @@ -1300,7 +1325,7 @@ impl PhysicalPlanner { .map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Max(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); create_aggregate_expr( @@ -1317,7 +1342,7 @@ impl PhysicalPlanner { .map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Sum(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); match datatype { @@ -1344,7 +1369,7 @@ impl PhysicalPlanner { } } AggExprStruct::Avg(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let input_datatype = to_arrow_datatype(expr.sum_datatype.as_ref().unwrap()); match datatype { @@ -1364,7 +1389,7 @@ impl PhysicalPlanner { } } AggExprStruct::First(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; let func = datafusion_expr::AggregateUDF::new_from_impl(FirstValue::new()); create_aggregate_expr( &func, @@ -1380,7 +1405,7 @@ impl PhysicalPlanner { .map_err(|e| e.into()) } AggExprStruct::Last(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; let func = datafusion_expr::AggregateUDF::new_from_impl(LastValue::new()); create_aggregate_expr( &func, @@ -1396,7 +1421,7 @@ impl PhysicalPlanner { .map_err(|e| e.into()) } AggExprStruct::BitAndAgg(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; create_aggregate_expr( &bit_and_udaf(), &[child], @@ -1411,7 +1436,7 @@ impl PhysicalPlanner { .map_err(|e| e.into()) } AggExprStruct::BitOrAgg(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; create_aggregate_expr( &bit_or_udaf(), &[child], @@ -1426,7 +1451,7 @@ impl PhysicalPlanner { .map_err(|e| e.into()) } AggExprStruct::BitXorAgg(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; create_aggregate_expr( &bit_xor_udaf(), &[child], @@ -1441,8 +1466,10 @@ impl PhysicalPlanner { .map_err(|e| e.into()) } AggExprStruct::Covariance(expr) => { - let child1 = self.create_expr(expr.child1.as_ref().unwrap(), schema.clone())?; - let child2 = self.create_expr(expr.child2.as_ref().unwrap(), schema.clone())?; + let child1 = + self.create_expr(expr.child1.as_ref().unwrap(), Arc::clone(&schema))?; + let child2 = + self.create_expr(expr.child2.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); match expr.stats_type { 0 => Ok(Arc::new(Covariance::new( @@ -1468,7 +1495,7 @@ impl PhysicalPlanner { } } AggExprStruct::Variance(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); match expr.stats_type { 0 => Ok(Arc::new(Variance::new( @@ -1492,7 +1519,7 @@ impl PhysicalPlanner { } } AggExprStruct::Stddev(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); match expr.stats_type { 0 => Ok(Arc::new(Stddev::new( @@ -1516,8 +1543,10 @@ impl PhysicalPlanner { } } AggExprStruct::Correlation(expr) => { - let child1 = self.create_expr(expr.child1.as_ref().unwrap(), schema.clone())?; - let child2 = self.create_expr(expr.child2.as_ref().unwrap(), schema.clone())?; + let child1 = + self.create_expr(expr.child1.as_ref().unwrap(), Arc::clone(&schema))?; + let child2 = + self.create_expr(expr.child2.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); Ok(Arc::new(Correlation::new( child1, @@ -1547,7 +1576,7 @@ impl PhysicalPlanner { window_args = f .args .iter() - .map(|expr| self.create_expr(expr, input_schema.clone())) + .map(|expr| self.create_expr(expr, Arc::clone(&input_schema))) .collect::, ExecutionError>>()?; } other => { @@ -1557,7 +1586,7 @@ impl PhysicalPlanner { } }; } else if let Some(agg_func) = &spark_expr.agg_func { - let result = self.process_agg_func(agg_func, input_schema.clone())?; + let result = self.process_agg_func(agg_func, Arc::clone(&input_schema))?; window_func_name = result.0; window_args = result.1; } else { @@ -1654,20 +1683,20 @@ impl PhysicalPlanner { let children = expr .children .iter() - .map(|child| self.create_expr(child, schema.clone())) + .map(|child| self.create_expr(child, Arc::clone(&schema))) .collect::, _>>()?; Ok(("count".to_string(), children)) } Some(AggExprStruct::Min(expr)) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; Ok(("min".to_string(), vec![child])) } Some(AggExprStruct::Max(expr)) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; Ok(("max".to_string(), vec![child])) } Some(AggExprStruct::Sum(expr)) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; let arrow_type = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let datatype = child.data_type(&schema)?; @@ -1708,7 +1737,7 @@ impl PhysicalPlanner { let exprs: PartitionPhyExprResult = hash_partition .hash_expression .iter() - .map(|x| self.create_expr(x, input_schema.clone())) + .map(|x| self.create_expr(x, Arc::clone(&input_schema))) .collect(); Ok(Partitioning::Hash( exprs?, @@ -1727,7 +1756,7 @@ impl PhysicalPlanner { let args = expr .args .iter() - .map(|x| self.create_expr(x, input_schema.clone())) + .map(|x| self.create_expr(x, Arc::clone(&input_schema))) .collect::, _>>()?; let fun_name = &expr.func; @@ -2134,7 +2163,7 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let stream = datafusion_plan.execute(0, task_ctx.clone()).unwrap(); + let stream = datafusion_plan.execute(0, Arc::clone(&task_ctx)).unwrap(); let output = collect(stream).await.unwrap(); assert!(output.is_empty()); } diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index f7c97beaa..751484dbb 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -114,7 +114,7 @@ impl ExecutionPlan for ShuffleWriterExec { ) -> Result> { match children.len() { 1 => Ok(Arc::new(ShuffleWriterExec::try_new( - children[0].clone(), + Arc::clone(&children[0]), self.partitioning.clone(), self.output_data_file.clone(), self.output_index_file.clone(), @@ -128,7 +128,7 @@ impl ExecutionPlan for ShuffleWriterExec { partition: usize, context: Arc, ) -> Result { - let input = self.input.execute(partition, context.clone())?; + let input = self.input.execute(partition, Arc::clone(&context))?; let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); Ok(Box::pin(RecordBatchStreamAdapter::new( @@ -175,7 +175,7 @@ impl ShuffleWriterExec { output_index_file: String, ) -> Result { let cache = PlanProperties::new( - EquivalenceProperties::new(input.schema().clone()), + EquivalenceProperties::new(Arc::clone(&input.schema())), partitioning.clone(), ExecutionMode::Bounded, ); @@ -284,7 +284,7 @@ impl PartitionBuffer { self.num_active_rows = 0; mem_diff -= self.active_slots_mem_size as isize; - let frozen_batch = make_batch(self.schema.clone(), active, num_rows)?; + let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; let frozen_capacity_old = self.frozen.capacity(); let mut cursor = Cursor::new(&mut self.frozen); @@ -634,10 +634,10 @@ impl ShuffleRepartitioner { Self { output_data_file, output_index_file, - schema: schema.clone(), + schema: Arc::clone(&schema), buffered_partitions: Mutex::new( (0..num_output_partitions) - .map(|_| PartitionBuffer::new(schema.clone(), batch_size)) + .map(|_| PartitionBuffer::new(Arc::clone(&schema), batch_size)) .collect::>(), ), spills: Mutex::new(vec![]), @@ -863,7 +863,7 @@ impl ShuffleRepartitioner { self.reservation.shrink(used); // shuffle writer always has empty output - Ok(Box::pin(EmptyStream::try_new(self.schema.clone())?)) + Ok(Box::pin(EmptyStream::try_new(Arc::clone(&self.schema))?)) } fn used(&self) -> usize { @@ -973,7 +973,7 @@ async fn external_shuffle( partition_id, output_data_file, output_index_file, - schema.clone(), + Arc::clone(&schema), partitioning, metrics, context.runtime_env(), @@ -1409,7 +1409,7 @@ impl Stream for EmptyStream { impl RecordBatchStream for EmptyStream { /// Get the schema fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } @@ -1467,7 +1467,7 @@ mod test { b.append_value(format!("{i}")); } let array = b.finish(); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap(); let batches = vec![batch.clone()]; diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 587defaaa..3ad822cc4 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -348,14 +348,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( // Because we don't know if input arrays are dictionary-encoded when we create // query plan, we need to defer stream initialization to first time execution. if exec_context.root_op.is_none() { - let planner = PhysicalPlanner::new(exec_context.session_ctx.clone()) + let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx)) .with_exec_id(exec_context_id); let (scans, root_op) = planner.create_plan( &exec_context.spark_plan, &mut exec_context.input_sources.clone(), )?; - exec_context.root_op = Some(root_op.clone()); + exec_context.root_op = Some(Arc::clone(&root_op)); exec_context.scans = scans; if exec_context.explain_native { diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index 0705a3b7c..d6c095a77 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -74,7 +74,7 @@ impl CopyExec { let schema = Arc::new(Schema::new(fields)); let cache = PlanProperties::new( - EquivalenceProperties::new(schema.clone()), + EquivalenceProperties::new(Arc::clone(&schema)), Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); @@ -105,7 +105,7 @@ impl ExecutionPlan for CopyExec { } fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } fn children(&self) -> Vec<&Arc> { @@ -116,11 +116,11 @@ impl ExecutionPlan for CopyExec { self: Arc, children: Vec>, ) -> DataFusionResult> { - let input = self.input.clone(); + let input = Arc::clone(&self.input); let new_input = input.with_new_children(children)?; Ok(Arc::new(CopyExec { input: new_input, - schema: self.schema.clone(), + schema: Arc::clone(&self.schema), cache: self.cache.clone(), metrics: self.metrics.clone(), mode: self.mode.clone(), @@ -193,8 +193,9 @@ impl CopyStream { .collect::, _>>()?; let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let maybe_batch = RecordBatch::try_new_with_options(self.schema.clone(), vectors, &options) - .map_err(|e| arrow_datafusion_err!(e)); + let maybe_batch = + RecordBatch::try_new_with_options(Arc::clone(&self.schema), vectors, &options) + .map_err(|e| arrow_datafusion_err!(e)); timer.stop(); self.baseline_metrics.record_output(batch.num_rows()); maybe_batch @@ -214,7 +215,7 @@ impl Stream for CopyStream { impl RecordBatchStream for CopyStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 902529ba5..fff656b43 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -380,7 +380,7 @@ pub fn comet_filter_record_batch( }) .collect(); let options = RecordBatchOptions::new().with_row_count(Some(record_batch.num_rows())); - RecordBatch::try_new_with_options(record_batch.schema().clone(), arrays, &options) + RecordBatch::try_new_with_options(Arc::clone(&record_batch.schema()), arrays, &options) } else { filter_record_batch(record_batch, predicate) } diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 0d5b33934..59616efbb 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -366,7 +366,7 @@ impl<'a> ScanStream<'a> { }) .collect::, _>>()?; let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); - RecordBatch::try_new_with_options(self.schema.clone(), new_columns, &options) + RecordBatch::try_new_with_options(Arc::clone(&self.schema), new_columns, &options) .map_err(|e| arrow_datafusion_err!(e)) } } @@ -406,7 +406,7 @@ impl<'a> Stream for ScanStream<'a> { impl<'a> RecordBatchStream for ScanStream<'a> { /// Get the schema fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index b608461ea..4a647d96a 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -19,7 +19,11 @@ #![allow(non_camel_case_types)] #![allow(dead_code)] #![allow(clippy::upper_case_acronyms)] -#![allow(clippy::derive_partial_eq_without_eq)] // For prost generated struct +// For prost generated struct +#![allow(clippy::derive_partial_eq_without_eq)] +// The clippy throws an error if the reference clone not wrapped into `Arc::clone` +// The lint makes easier for code reader/reviewer separate references clones from more heavyweight ones +#![deny(clippy::clone_on_ref_ptr)] use jni::{ objects::{JClass, JString}, diff --git a/native/core/src/parquet/read/column.rs b/native/core/src/parquet/read/column.rs index feb342719..73f8df956 100644 --- a/native/core/src/parquet/read/column.rs +++ b/native/core/src/parquet/read/column.rs @@ -801,13 +801,13 @@ impl TypedColumnReader { let mut page_buffer = page_data; let bit_width = log2(self.desc.max_rep_level() as u64 + 1) as u8; - let mut rl_decoder = LevelDecoder::new(self.desc.clone(), bit_width, true); + let mut rl_decoder = LevelDecoder::new(Arc::clone(&self.desc), bit_width, true); let offset = rl_decoder.set_data(page_value_count, &page_buffer); self.rep_level_decoder = Some(rl_decoder); page_buffer = page_buffer.slice(offset); let bit_width = log2(self.desc.max_def_level() as u64 + 1) as u8; - let mut dl_decoder = LevelDecoder::new(self.desc.clone(), bit_width, true); + let mut dl_decoder = LevelDecoder::new(Arc::clone(&self.desc), bit_width, true); let offset = dl_decoder.set_data(page_value_count, &page_buffer); self.def_level_decoder = Some(dl_decoder); page_buffer = page_buffer.slice(offset); @@ -829,12 +829,12 @@ impl TypedColumnReader { self.check_dictionary(&encoding); let bit_width = log2(self.desc.max_rep_level() as u64 + 1) as u8; - let mut rl_decoder = LevelDecoder::new(self.desc.clone(), bit_width, false); + let mut rl_decoder = LevelDecoder::new(Arc::clone(&self.desc), bit_width, false); rl_decoder.set_data(page_value_count, &rep_level_data); self.rep_level_decoder = Some(rl_decoder); let bit_width = log2(self.desc.max_def_level() as u64 + 1) as u8; - let mut dl_decoder = LevelDecoder::new(self.desc.clone(), bit_width, false); + let mut dl_decoder = LevelDecoder::new(Arc::clone(&self.desc), bit_width, false); dl_decoder.set_data(page_value_count, &def_level_data); self.def_level_decoder = Some(dl_decoder); @@ -987,7 +987,7 @@ impl TypedColumnReader { value_data, page_value_count, encoding, - self.desc.clone(), + Arc::clone(&self.desc), self.read_options, ) } diff --git a/native/core/src/parquet/util/buffer.rs b/native/core/src/parquet/util/buffer.rs index d584ac0b1..72cfa3fe3 100644 --- a/native/core/src/parquet/util/buffer.rs +++ b/native/core/src/parquet/util/buffer.rs @@ -87,7 +87,7 @@ impl BufferRef { ); Self { - inner: self.inner.clone(), + inner: Arc::clone(&self.inner), offset: self.offset + offset, len, } diff --git a/native/core/src/parquet/util/memory.rs b/native/core/src/parquet/util/memory.rs index a2bbbfdde..ed2ab9807 100644 --- a/native/core/src/parquet/util/memory.rs +++ b/native/core/src/parquet/util/memory.rs @@ -182,7 +182,7 @@ impl Buffer { let old_data = mem::take(&mut self.data); let mut result = BufferPtr::new(old_data); if let Some(ref mc) = self.mem_tracker { - result = result.with_mem_tracker(mc.clone()); + result = result.with_mem_tracker(Arc::clone(mc)); } result } @@ -361,7 +361,7 @@ impl BufferPtr { /// Reference counted pointer to the data is copied. pub fn all(&self) -> BufferPtr { BufferPtr { - data: self.data.clone(), + data: Arc::clone(&self.data), start: self.start, len: self.len, mem_tracker: self.mem_tracker.as_ref().cloned(), @@ -372,7 +372,7 @@ impl BufferPtr { pub fn start_from(&self, start: usize) -> BufferPtr { assert!(start <= self.len); BufferPtr { - data: self.data.clone(), + data: Arc::clone(&self.data), start: self.start + start, len: self.len - start, mem_tracker: self.mem_tracker.as_ref().cloned(), @@ -383,7 +383,7 @@ impl BufferPtr { pub fn range(&self, start: usize, len: usize) -> BufferPtr { assert!(start + len <= self.len); BufferPtr { - data: self.data.clone(), + data: Arc::clone(&self.data), start: self.start + start, len, mem_tracker: self.mem_tracker.as_ref().cloned(), @@ -431,7 +431,7 @@ mod tests { fn test_byte_buffer_mem_tracker() { let mem_tracker = Arc::new(MemTracker::new()); - let mut buffer = ByteBuffer::new().with_mem_tracker(mem_tracker.clone()); + let mut buffer = ByteBuffer::new().with_mem_tracker(Arc::clone(&mem_tracker)); buffer.set_data(vec![0; 10]); assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64); buffer.set_data(vec![0; 20]); @@ -439,7 +439,7 @@ mod tests { assert_eq!(mem_tracker.memory_usage(), capacity); let max_capacity = { - let mut buffer2 = ByteBuffer::new().with_mem_tracker(mem_tracker.clone()); + let mut buffer2 = ByteBuffer::new().with_mem_tracker(Arc::clone(&mem_tracker)); buffer2.reserve(30); assert_eq!( mem_tracker.memory_usage(), @@ -467,7 +467,7 @@ mod tests { fn test_byte_ptr_mem_tracker() { let mem_tracker = Arc::new(MemTracker::new()); - let mut buffer = ByteBuffer::new().with_mem_tracker(mem_tracker.clone()); + let mut buffer = ByteBuffer::new().with_mem_tracker(Arc::clone(&mem_tracker)); buffer.set_data(vec![0; 60]); { diff --git a/native/core/src/parquet/util/test_common/page_util.rs b/native/core/src/parquet/util/test_common/page_util.rs index efd3f38e3..6303943c4 100644 --- a/native/core/src/parquet/util/test_common/page_util.rs +++ b/native/core/src/parquet/util/test_common/page_util.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::VecDeque, mem}; +use std::{collections::VecDeque, mem, sync::Arc}; use rand::distributions::uniform::SampleUniform; @@ -252,7 +252,7 @@ pub fn make_pages( let max_def_level = desc.max_def_level(); let max_rep_level = desc.max_rep_level(); - let mut dict_encoder = DictEncoder::::new(desc.clone()); + let mut dict_encoder = DictEncoder::::new(Arc::clone(&desc)); for i in 0..num_pages { let mut num_values_cur_page = 0; @@ -275,7 +275,8 @@ pub fn make_pages( // Generate the current page - let mut pb = DataPageBuilderImpl::new(desc.clone(), num_values_cur_page as u32, use_v2); + let mut pb = + DataPageBuilderImpl::new(Arc::clone(&desc), num_values_cur_page as u32, use_v2); if max_rep_level > 0 { pb.add_rep_levels(max_rep_level, &rep_levels[level_range.clone()]); }