Skip to content

Commit

Permalink
Format and fix type errors. Adjusted expected output when using `LexO…
Browse files Browse the repository at this point in the history
…rdering`.
  • Loading branch information
nglime committed Oct 29, 2024
1 parent ff430c1 commit 7ba63c3
Show file tree
Hide file tree
Showing 20 changed files with 59 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1491,7 +1491,9 @@ pub(crate) mod tests {
if self.expr.is_empty() {
vec![None]
} else {
vec![Some(PhysicalSortRequirement::from_sort_exprs(self.expr.iter()))]
vec![Some(PhysicalSortRequirement::from_sort_exprs(
self.expr.iter(),
))]
}
}

Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,9 @@ fn analyze_immediate_sort_removal(
// If this sort is unnecessary, we should remove it:
if sort_input
.equivalence_properties()
.ordering_satisfy(LexOrderingRef::new(sort_exec.properties().output_ordering().unwrap_or(&[])))
.ordering_satisfy(LexOrderingRef::new(
sort_exec.properties().output_ordering().unwrap_or(&[]),
))
{
node.plan = if !sort_exec.preserve_partitioning()
&& sort_input.output_partitioning().partition_count() > 1
Expand Down Expand Up @@ -620,7 +622,10 @@ fn remove_corresponding_sort_from_sub_plan(
// `SortPreservingMergeExec` instead of a `CoalescePartitionsExec`.
let plan = node.plan.clone();
let plan = if let Some(ordering) = plan.output_ordering() {
Arc::new(SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)) as _
Arc::new(SortPreservingMergeExec::new(
LexOrdering::new(ordering.to_vec()),
plan,
)) as _
} else {
Arc::new(CoalescePartitionsExec::new(plan)) as _
};
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,14 @@ fn try_swapping_with_sym_hash_join(
new_filter,
sym_join.join_type(),
sym_join.null_equals_null(),
sym_join.right().output_ordering().map(|p| LexOrdering::new(p.to_vec())),
sym_join.left().output_ordering().map(|p| LexOrdering::new(p.to_vec())),
sym_join
.right()
.output_ordering()
.map(|p| LexOrdering::new(p.to_vec())),
sym_join
.left()
.output_ordering()
.map(|p| LexOrdering::new(p.to_vec())),
sym_join.partition_mode(),
)?)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::ExecutionPlanProperties;

use itertools::izip;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
use itertools::izip;

/// For a given `plan`, this object carries the information one needs from its
/// descendants to decide whether it is beneficial to replace order-losing (but
Expand Down Expand Up @@ -132,7 +132,8 @@ fn plan_with_order_preserving_variants(
if let Some(ordering) = child.output_ordering().map(Vec::from) {
// When the input of a `CoalescePartitionsExec` has an ordering,
// replace it with a `SortPreservingMergeExec` if appropriate:
let spm = SortPreservingMergeExec::new(LexOrdering::new(ordering), child.clone());
let spm =
SortPreservingMergeExec::new(LexOrdering::new(ordering), child.clone());
sort_input.plan = Arc::new(spm) as _;
sort_input.children[0].data = true;
return Ok(sort_input);
Expand Down Expand Up @@ -256,7 +257,9 @@ pub(crate) fn replace_with_order_preserving_variants(
if alternate_plan
.plan
.equivalence_properties()
.ordering_satisfy(LexOrderingRef::new(requirements.plan.output_ordering().unwrap_or(&[])))
.ordering_satisfy(LexOrderingRef::new(
requirements.plan.output_ordering().unwrap_or(&[]),
))
{
for child in alternate_plan.children.iter_mut() {
child.data = false;
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ fn pushdown_requirement_to_children(
spm_eqs = spm_eqs.with_reorder(new_ordering);
// Do not push-down through SortPreservingMergeExec when
// ordering requirement invalidates requirement of sort preserving merge exec.
if !spm_eqs.ordering_satisfy(LexOrderingRef::new(plan.output_ordering().unwrap_or(&[]))) {
if !spm_eqs
.ordering_satisfy(LexOrderingRef::new(plan.output_ordering().unwrap_or(&[])))
{
Ok(None)
} else {
// Can push-down through SortPreservingMergeExec, because parent requirement is finer
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ use datafusion_physical_plan::{

use async_trait::async_trait;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement, PhysicalSortRequirement};
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, PhysicalSortRequirement,
};

async fn register_current_csv(
ctx: &SessionContext,
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1595,8 +1595,9 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
None => None,
};

let ordering_reqs: LexOrdering =
physical_sort_exprs.clone().unwrap_or(LexOrdering::default());
let ordering_reqs: LexOrdering = physical_sort_exprs
.clone()
.unwrap_or(LexOrdering::default());

let agg_expr =
AggregateExprBuilder::new(func.to_owned(), physical_args.to_vec())
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::InputOrderMode;
use test_utils::{add_empty_batches, StringBatchGenerator};

use crate::fuzz_cases::aggregation_fuzzer::{
AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig, QueryBuilder,
};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tokio::task::JoinSet;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use crate::fuzz_cases::aggregation_fuzzer::{
AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig, QueryBuilder,
};

// ========================================================================
// The new aggregation fuzz tests based on [`AggregationFuzzer`]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result};
use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::sorts::sort::sort_batch;
use rand::{
rngs::{StdRng, ThreadRng},
thread_rng, Rng, SeedableRng,
};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use test_utils::{
array_gen::{PrimitiveArrayGenerator, StringArrayGenerator},
stagger_batch,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_execution::memory_pool::GreedyMemoryPool;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use rand::Rng;
use std::sync::Arc;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use test_utils::{batches_to_vec, partitions_to_sorted_vec};

const KB: usize = 1 << 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ mod sp_repartition_fuzz_tests {
};
use test_utils::add_empty_batches;

use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
use datafusion_physical_expr_common::sort_expr::LexOrdering;

// Generate a schema which consists of 6 columns (a, b, c, d, e, f)
fn create_test_schema() -> Result<SchemaRef> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ use test_utils::add_empty_batches;
use datafusion::functions_window::row_number::row_number_udwf;
use datafusion_functions_window::lead_lag::{lag_udwf, lead_udwf};
use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use hashbrown::HashMap;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use datafusion_physical_expr_common::sort_expr::LexOrdering;

#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn window_bounded_window_random_comparison() -> Result<()> {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/functions-aggregate/benches/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator};
use datafusion_functions_aggregate::count::Count;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
use std::sync::Arc;

fn prepare_accumulator() -> Box<dyn GroupsAccumulator> {
Expand All @@ -31,7 +32,7 @@ fn prepare_accumulator() -> Box<dyn GroupsAccumulator> {
return_type: &DataType::Int64,
schema: &schema,
ignore_nulls: false,
ordering_req: &[],
ordering_req: LexOrderingRef::default(),
is_reversed: false,
name: "COUNT(f)",
is_distinct: false,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/functions-aggregate/benches/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator};
use datafusion_functions_aggregate::sum::Sum;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
use std::sync::Arc;

fn prepare_accumulator(data_type: &DataType) -> Box<dyn GroupsAccumulator> {
Expand All @@ -31,7 +32,7 @@ fn prepare_accumulator(data_type: &DataType) -> Box<dyn GroupsAccumulator> {
return_type: data_type,
schema: &schema,
ignore_nulls: false,
ordering_req: &[],
ordering_req: LexOrderingRef::empty(),
is_reversed: false,
name: "SUM(f)",
is_distinct: false,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl Display for LexOrdering {
}

impl FromIterator<PhysicalSortExpr> for LexOrdering {
fn from_iter<T: IntoIterator<Item=PhysicalSortExpr>>(iter: T) -> Self {
fn from_iter<T: IntoIterator<Item = PhysicalSortExpr>>(iter: T) -> Self {
let mut lex_ordering = LexOrdering::default();

for i in iter {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-optimizer/src/topk_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::aggregates::AggregateExec;
use datafusion_physical_plan::execution_plan::CardinalityEffect;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::ExecutionPlan;
use itertools::Itertools;
use datafusion_physical_expr::LexOrdering;

/// An optimizer rule that passes a `limit` hint to aggregations if the whole result is not needed
#[derive(Debug)]
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};

use crate::execution_plan::CardinalityEffect;
use datafusion_physical_expr_common::sort_expr::{LexOrderingRef, PhysicalSortExpr};
use futures::stream::Stream;
use futures::{FutureExt, StreamExt, TryStreamExt};
use hashbrown::HashMap;
use log::trace;
use parking_lot::Mutex;
use datafusion_physical_expr_common::sort_expr::{LexOrderingRef, PhysicalSortExpr};

mod distributor_channels;

Expand Down Expand Up @@ -1558,10 +1558,10 @@ mod tests {
mod test {
use arrow_schema::{DataType, Field, Schema, SortOptions};

use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use crate::memory::MemoryExec;
use crate::union::UnionExec;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};

use super::*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1717,8 +1717,8 @@ mod tests {
let plan = projection_exec(window)?;

let expected_plan = vec![
"ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]@2 as col_2]",
" BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]: Ok(Field { name: \"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: \\\"hash\\\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \\\"sn\\\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(1)), is_causal: false }], mode=[Linear]",
"ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]@2 as col_2]",
" BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]: Ok(Field { name: \"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: \\\"hash\\\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \\\"sn\\\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(1)), is_causal: false }], mode=[Linear]",
" StreamingTableExec: partition_sizes=1, projection=[sn, hash], infinite_source=true, output_ordering=[sn@0 ASC NULLS LAST]",
];

Expand Down
4 changes: 3 additions & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,9 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
&sink_schema,
extension_codec,
)
.map(|item| PhysicalSortRequirement::from_sort_exprs(&item.inner))
.map(|item| {
PhysicalSortRequirement::from_sort_exprs(&item.inner)
})
})
.transpose()?;
Ok(Arc::new(DataSinkExec::new(
Expand Down
5 changes: 4 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility};
use datafusion::physical_expr::expressions::Literal;
use datafusion::physical_expr::window::SlidingAggregateWindowExpr;
use datafusion::physical_expr::{LexOrdering, LexOrderingRef, LexRequirement, PhysicalSortRequirement, ScalarFunctionExpr};
use datafusion::physical_expr::{
LexOrdering, LexOrderingRef, LexRequirement, PhysicalSortRequirement,
ScalarFunctionExpr,
};
use datafusion::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
Expand Down

0 comments on commit 7ba63c3

Please sign in to comment.