diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 51e85e25e077f..ae272a200e793 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1295,21 +1295,32 @@ fn construct_prefix_orderings( relevant_sort_expr: &PhysicalSortExpr, dependency_map: &DependencyMap, ) -> Vec { + let mut processed_dependencies = IndexSet::new(); dependency_map[relevant_sort_expr] .dependencies .iter() - .flat_map(|dep| construct_orderings(dep, dependency_map)) + .flat_map(|dep| { + construct_orderings(dep, &mut processed_dependencies, dependency_map) + }) .collect() } -/// Given a set of relevant dependencies (`relevant_deps`) and a map of dependencies -/// (`dependency_map`), this function generates all possible prefix orderings -/// based on the given dependencies. +/// Generates all possible orderings where dependencies are satisfied for the +/// current projection expression. +/// +/// # Examaple +/// If `dependences` is `a + b ASC` and the dependency map holds dependencies +/// * `a ASC` --> `[c ASC]` +/// * `b ASC` --> `[d DESC]`, +/// +/// This function generates these two sort orders +/// * `[c ASC, d DESC, a + b ASC]` +/// * `[d DESC, c ASC, a + b ASC]` /// /// # Parameters /// -/// * `dependencies` - A reference to the dependencies. -/// * `dependency_map` - A reference to the map of dependencies for expressions. +/// * `dependencies` - Set of relevant expressions. +/// * `dependency_map` - Map of dependencies for expressions that may appear in `dependencies` /// /// # Returns /// @@ -1335,11 +1346,6 @@ fn generate_dependency_orderings( return vec![vec![]]; } - // Generate all possible orderings where dependencies are satisfied for the - // current projection expression. For example, if expression is `a + b ASC`, - // and the dependency for `a ASC` is `[c ASC]`, the dependency for `b ASC` - // is `[d DESC]`, then we generate `[c ASC, d DESC, a + b ASC]` and - // `[d DESC, c ASC, a + b ASC]`. relevant_prefixes .into_iter() .multi_cartesian_product() @@ -1421,7 +1427,7 @@ struct DependencyNode { } impl DependencyNode { - // Insert dependency to the state (if exists). + /// Insert dependency to the state (if exists). fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) { if let Some(dep) = dependency { self.dependencies.insert(dep.clone()); @@ -1453,10 +1459,16 @@ type Dependencies = IndexSet; /// /// A vector of lexicographical orderings (`Vec`) based on the given /// sort expression and its dependencies. -fn construct_orderings( - referred_sort_expr: &PhysicalSortExpr, - dependency_map: &DependencyMap, +fn construct_orderings<'a>( + referred_sort_expr: &'a PhysicalSortExpr, + processed_dependencies: &mut IndexSet<&'a PhysicalSortExpr>, + dependency_map: &'a DependencyMap, ) -> Vec { + // if we have already processed this dependency don't do it again + if !processed_dependencies.insert(referred_sort_expr) { + return vec![]; + } + // We are sure that `referred_sort_expr` is inside `dependency_map`. let node = &dependency_map[referred_sort_expr]; // Since we work on intermediate nodes, we are sure `val.target_sort_expr` @@ -1468,7 +1480,8 @@ fn construct_orderings( node.dependencies .iter() .flat_map(|dep| { - let mut orderings = construct_orderings(dep, dependency_map); + let mut orderings = + construct_orderings(dep, processed_dependencies, dependency_map); for ordering in orderings.iter_mut() { ordering.push(target_sort_expr.clone()) } @@ -1763,6 +1776,50 @@ mod tests { Ok(()) } + #[test] + fn project_equivalence_properties_test_multi() -> Result<()> { + // test multiple input orderings with equivalence properties + let input_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + Field::new("d", DataType::Int64, true), + ])); + + let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema)); + // add equivalent ordering [a, b, c, d] + input_properties.add_new_ordering(vec![ + parse_sort_expr("a", &input_schema), + parse_sort_expr("b", &input_schema), + parse_sort_expr("c", &input_schema), + parse_sort_expr("d", &input_schema), + ]); + + // add equivalent ordering [a, c, b, d] + input_properties.add_new_ordering(vec![ + parse_sort_expr("a", &input_schema), + parse_sort_expr("c", &input_schema), + parse_sort_expr("b", &input_schema), // NB b and c are swapped + parse_sort_expr("d", &input_schema), + ]); + + // simply project all the columns in order + let proj_exprs = vec![ + (col("a", &input_schema)?, "a".to_string()), + (col("b", &input_schema)?, "b".to_string()), + (col("c", &input_schema)?, "c".to_string()), + (col("d", &input_schema)?, "d".to_string()), + ]; + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?; + let out_properties = input_properties.project(&projection_mapping, input_schema); + + assert_eq!( + out_properties.to_string(), + "order: [[a@0 ASC,b@1 ASC,c@2 ASC,d@3 ASC]]" + ); + Ok(()) + } + #[test] fn test_join_equivalence_properties() -> Result<()> { let schema = create_test_schema()?;