From 7ba63c3a338ec623816f993414399e2f13e65355 Mon Sep 17 00:00:00 2001 From: nglime Date: Mon, 28 Oct 2024 19:36:26 -0500 Subject: [PATCH] Format and fix type errors. Adjusted expected output when using `LexOrdering`. --- .../src/physical_optimizer/enforce_distribution.rs | 4 +++- .../core/src/physical_optimizer/enforce_sorting.rs | 9 +++++++-- .../core/src/physical_optimizer/projection_pushdown.rs | 10 ++++++++-- .../replace_with_order_preserving_variants.rs | 9 ++++++--- .../core/src/physical_optimizer/sort_pushdown.rs | 4 +++- datafusion/core/src/physical_optimizer/test_utils.rs | 4 +++- datafusion/core/src/physical_planner.rs | 5 +++-- datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 8 ++++---- .../fuzz_cases/aggregation_fuzzer/data_generator.rs | 2 +- datafusion/core/tests/fuzz_cases/sort_fuzz.rs | 2 +- .../fuzz_cases/sort_preserving_repartition_fuzz.rs | 2 +- datafusion/core/tests/fuzz_cases/window_fuzz.rs | 2 +- datafusion/functions-aggregate/benches/count.rs | 3 ++- datafusion/functions-aggregate/benches/sum.rs | 3 ++- datafusion/physical-expr-common/src/sort_expr.rs | 2 +- datafusion/physical-optimizer/src/topk_aggregation.rs | 2 +- datafusion/physical-plan/src/repartition/mod.rs | 6 +++--- .../src/windows/bounded_window_agg_exec.rs | 4 ++-- datafusion/proto/src/physical_plan/mod.rs | 4 +++- .../proto/tests/cases/roundtrip_physical_plan.rs | 5 ++++- 20 files changed, 59 insertions(+), 31 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 9def510d2003f..6af84b83beec1 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1491,7 +1491,9 @@ pub(crate) mod tests { if self.expr.is_empty() { vec![None] } else { - vec![Some(PhysicalSortRequirement::from_sort_exprs(self.expr.iter()))] + vec![Some(PhysicalSortRequirement::from_sort_exprs( + self.expr.iter(), + ))] } } diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 36500235051a7..20a628ccad05a 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -393,7 +393,9 @@ fn analyze_immediate_sort_removal( // If this sort is unnecessary, we should remove it: if sort_input .equivalence_properties() - .ordering_satisfy(LexOrderingRef::new(sort_exec.properties().output_ordering().unwrap_or(&[]))) + .ordering_satisfy(LexOrderingRef::new( + sort_exec.properties().output_ordering().unwrap_or(&[]), + )) { node.plan = if !sort_exec.preserve_partitioning() && sort_input.output_partitioning().partition_count() > 1 @@ -620,7 +622,10 @@ fn remove_corresponding_sort_from_sub_plan( // `SortPreservingMergeExec` instead of a `CoalescePartitionsExec`. let plan = node.plan.clone(); let plan = if let Some(ordering) = plan.output_ordering() { - Arc::new(SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)) as _ + Arc::new(SortPreservingMergeExec::new( + LexOrdering::new(ordering.to_vec()), + plan, + )) as _ } else { Arc::new(CoalescePartitionsExec::new(plan)) as _ }; diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index c9076f0b014ba..529a981c0adb6 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -915,8 +915,14 @@ fn try_swapping_with_sym_hash_join( new_filter, sym_join.join_type(), sym_join.null_equals_null(), - sym_join.right().output_ordering().map(|p| LexOrdering::new(p.to_vec())), - sym_join.left().output_ordering().map(|p| LexOrdering::new(p.to_vec())), + sym_join + .right() + .output_ordering() + .map(|p| LexOrdering::new(p.to_vec())), + sym_join + .left() + .output_ordering() + .map(|p| LexOrdering::new(p.to_vec())), sym_join.partition_mode(), )?))) } diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 0dc8ceb8efbb2..b4ab93029cb19 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -33,8 +33,8 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::tree_node::PlanContext; use datafusion_physical_plan::ExecutionPlanProperties; -use itertools::izip; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; +use itertools::izip; /// For a given `plan`, this object carries the information one needs from its /// descendants to decide whether it is beneficial to replace order-losing (but @@ -132,7 +132,8 @@ fn plan_with_order_preserving_variants( if let Some(ordering) = child.output_ordering().map(Vec::from) { // When the input of a `CoalescePartitionsExec` has an ordering, // replace it with a `SortPreservingMergeExec` if appropriate: - let spm = SortPreservingMergeExec::new(LexOrdering::new(ordering), child.clone()); + let spm = + SortPreservingMergeExec::new(LexOrdering::new(ordering), child.clone()); sort_input.plan = Arc::new(spm) as _; sort_input.children[0].data = true; return Ok(sort_input); @@ -256,7 +257,9 @@ pub(crate) fn replace_with_order_preserving_variants( if alternate_plan .plan .equivalence_properties() - .ordering_satisfy(LexOrderingRef::new(requirements.plan.output_ordering().unwrap_or(&[]))) + .ordering_satisfy(LexOrderingRef::new( + requirements.plan.output_ordering().unwrap_or(&[]), + )) { for child in alternate_plan.children.iter_mut() { child.data = false; diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 96319901b3b97..43b191a83893b 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -277,7 +277,9 @@ fn pushdown_requirement_to_children( spm_eqs = spm_eqs.with_reorder(new_ordering); // Do not push-down through SortPreservingMergeExec when // ordering requirement invalidates requirement of sort preserving merge exec. - if !spm_eqs.ordering_satisfy(LexOrderingRef::new(plan.output_ordering().unwrap_or(&[]))) { + if !spm_eqs + .ordering_satisfy(LexOrderingRef::new(plan.output_ordering().unwrap_or(&[]))) + { Ok(None) } else { // Can push-down through SortPreservingMergeExec, because parent requirement is finer diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 9348cc3d0a196..d5ae9c56ab7f4 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -56,7 +56,9 @@ use datafusion_physical_plan::{ use async_trait::async_trait; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement, PhysicalSortRequirement}; +use datafusion_physical_expr_common::sort_expr::{ + LexOrdering, LexRequirement, PhysicalSortRequirement, +}; async fn register_current_csv( ctx: &SessionContext, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 69d2ad3762380..544b5aa7d8c9c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1595,8 +1595,9 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( None => None, }; - let ordering_reqs: LexOrdering = - physical_sort_exprs.clone().unwrap_or(LexOrdering::default()); + let ordering_reqs: LexOrdering = physical_sort_exprs + .clone() + .unwrap_or(LexOrdering::default()); let agg_expr = AggregateExprBuilder::new(func.to_owned(), physical_args.to_vec()) diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 2a760166bac59..a162ba6819cef 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -39,14 +39,14 @@ use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::InputOrderMode; use test_utils::{add_empty_batches, StringBatchGenerator}; +use crate::fuzz_cases::aggregation_fuzzer::{ + AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig, QueryBuilder, +}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use hashbrown::HashMap; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use tokio::task::JoinSet; -use datafusion_physical_expr_common::sort_expr::LexOrdering; -use crate::fuzz_cases::aggregation_fuzzer::{ - AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig, QueryBuilder, -}; // ======================================================================== // The new aggregation fuzz tests based on [`AggregationFuzzer`] diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs index 13e2f5b203a64..1cb5a8b0fd3a1 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs @@ -25,12 +25,12 @@ use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::{arrow_datafusion_err, DataFusionError, Result}; use datafusion_physical_expr::{expressions::col, PhysicalSortExpr}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::sorts::sort::sort_batch; use rand::{ rngs::{StdRng, ThreadRng}, thread_rng, Rng, SeedableRng, }; -use datafusion_physical_expr_common::sort_expr::LexOrdering; use test_utils::{ array_gen::{PrimitiveArrayGenerator, StringArrayGenerator}, stagger_batch, diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index 1e9f61deb77ae..e4acb96f49305 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -30,9 +30,9 @@ use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_execution::memory_pool::GreedyMemoryPool; use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use rand::Rng; use std::sync::Arc; -use datafusion_physical_expr_common::sort_expr::LexOrdering; use test_utils::{batches_to_vec, partitions_to_sorted_vec}; const KB: usize = 1 << 10; diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index 490e03674cffa..25d5bd1f6b1a8 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -45,9 +45,9 @@ mod sp_repartition_fuzz_tests { }; use test_utils::add_empty_batches; + use datafusion_physical_expr_common::sort_expr::LexOrdering; use itertools::izip; use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; - use datafusion_physical_expr_common::sort_expr::LexOrdering; // Generate a schema which consists of 6 columns (a, b, c, d, e, f) fn create_test_schema() -> Result { diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index c7f670244302f..7432dbeb774c0 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -47,11 +47,11 @@ use test_utils::add_empty_batches; use datafusion::functions_window::row_number::row_number_udwf; use datafusion_functions_window::lead_lag::{lag_udwf, lead_udwf}; use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use hashbrown::HashMap; use rand::distributions::Alphanumeric; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; -use datafusion_physical_expr_common::sort_expr::LexOrdering; #[tokio::test(flavor = "multi_thread", worker_threads = 16)] async fn window_bounded_window_random_comparison() -> Result<()> { diff --git a/datafusion/functions-aggregate/benches/count.rs b/datafusion/functions-aggregate/benches/count.rs index 65956cb8a1dea..1c8266ed5b896 100644 --- a/datafusion/functions-aggregate/benches/count.rs +++ b/datafusion/functions-aggregate/benches/count.rs @@ -23,6 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator}; use datafusion_functions_aggregate::count::Count; use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr_common::sort_expr::LexOrderingRef; use std::sync::Arc; fn prepare_accumulator() -> Box { @@ -31,7 +32,7 @@ fn prepare_accumulator() -> Box { return_type: &DataType::Int64, schema: &schema, ignore_nulls: false, - ordering_req: &[], + ordering_req: LexOrderingRef::default(), is_reversed: false, name: "COUNT(f)", is_distinct: false, diff --git a/datafusion/functions-aggregate/benches/sum.rs b/datafusion/functions-aggregate/benches/sum.rs index 652d447129dc1..81f3f986c0e90 100644 --- a/datafusion/functions-aggregate/benches/sum.rs +++ b/datafusion/functions-aggregate/benches/sum.rs @@ -23,6 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator}; use datafusion_functions_aggregate::sum::Sum; use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr_common::sort_expr::LexOrderingRef; use std::sync::Arc; fn prepare_accumulator(data_type: &DataType) -> Box { @@ -31,7 +32,7 @@ fn prepare_accumulator(data_type: &DataType) -> Box { return_type: data_type, schema: &schema, ignore_nulls: false, - ordering_req: &[], + ordering_req: LexOrderingRef::empty(), is_reversed: false, name: "SUM(f)", is_distinct: false, diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index e005bfdf40adc..fcd26d8d5462a 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -432,7 +432,7 @@ impl Display for LexOrdering { } impl FromIterator for LexOrdering { - fn from_iter>(iter: T) -> Self { + fn from_iter>(iter: T) -> Self { let mut lex_ordering = LexOrdering::default(); for i in iter { diff --git a/datafusion/physical-optimizer/src/topk_aggregation.rs b/datafusion/physical-optimizer/src/topk_aggregation.rs index 5ebe2426907f1..0e5fb82d9e93e 100644 --- a/datafusion/physical-optimizer/src/topk_aggregation.rs +++ b/datafusion/physical-optimizer/src/topk_aggregation.rs @@ -25,13 +25,13 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::execution_plan::CardinalityEffect; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::ExecutionPlan; use itertools::Itertools; -use datafusion_physical_expr::LexOrdering; /// An optimizer rule that passes a `limit` hint to aggregations if the whole result is not needed #[derive(Debug)] diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index e408efab28d4b..ed252e75af88b 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -50,12 +50,12 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use crate::execution_plan::CardinalityEffect; +use datafusion_physical_expr_common::sort_expr::{LexOrderingRef, PhysicalSortExpr}; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; use hashbrown::HashMap; use log::trace; use parking_lot::Mutex; -use datafusion_physical_expr_common::sort_expr::{LexOrderingRef, PhysicalSortExpr}; mod distributor_channels; @@ -1558,10 +1558,10 @@ mod tests { mod test { use arrow_schema::{DataType, Field, Schema, SortOptions}; - use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use crate::memory::MemoryExec; use crate::union::UnionExec; + use datafusion_physical_expr::expressions::col; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use super::*; diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index a36c3a2bc824c..e3febd44da502 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1717,8 +1717,8 @@ mod tests { let plan = projection_exec(window)?; let expected_plan = vec![ - "ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]@2 as col_2]", - " BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]: Ok(Field { name: \"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: \\\"hash\\\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \\\"sn\\\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(1)), is_causal: false }], mode=[Linear]", + "ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]@2 as col_2]", + " BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]: Ok(Field { name: \"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: \\\"hash\\\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \\\"sn\\\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(1)), is_causal: false }], mode=[Linear]", " StreamingTableExec: partition_sizes=1, projection=[sn, hash], infinite_source=true, output_ordering=[sn@0 ASC NULLS LAST]", ]; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 83ba06f43150b..af63bbf2f4aa4 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1104,7 +1104,9 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { &sink_schema, extension_codec, ) - .map(|item| PhysicalSortRequirement::from_sort_exprs(&item.inner)) + .map(|item| { + PhysicalSortRequirement::from_sort_exprs(&item.inner) + }) }) .transpose()?; Ok(Arc::new(DataSinkExec::new( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 42d7e28f60a79..70b429d97b4ef 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -51,7 +51,10 @@ use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility}; use datafusion::physical_expr::expressions::Literal; use datafusion::physical_expr::window::SlidingAggregateWindowExpr; -use datafusion::physical_expr::{LexOrdering, LexOrderingRef, LexRequirement, PhysicalSortRequirement, ScalarFunctionExpr}; +use datafusion::physical_expr::{ + LexOrdering, LexOrderingRef, LexRequirement, PhysicalSortRequirement, + ScalarFunctionExpr, +}; use datafusion::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, };