Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor DependencyMap and Dependencies into structs #12761

Merged
merged 3 commits into from
Oct 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 179 additions & 22 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::iter::Peekable;
Expand Down Expand Up @@ -709,7 +710,7 @@ impl EquivalenceProperties {
/// c ASC: Node {None, HashSet{a ASC}}
/// ```
fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
let mut dependency_map = IndexMap::new();
let mut dependency_map = DependencyMap::new();
for ordering in self.normalized_oeq_class().iter() {
for (idx, sort_expr) in ordering.iter().enumerate() {
let target_sort_expr =
Expand All @@ -731,13 +732,11 @@ impl EquivalenceProperties {
let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
// Add sort expressions that can be projected or referred to
// by any of the projection expressions to the dependency map:
dependency_map
.entry(sort_expr.clone())
.or_insert_with(|| DependencyNode {
target_sort_expr: target_sort_expr.clone(),
dependencies: IndexSet::new(),
})
.insert_dependency(dependency);
dependency_map.insert(
sort_expr,
target_sort_expr.as_ref(),
dependency,
);
}
if !is_projected {
// If we can not project, stop constructing the dependency
Expand Down Expand Up @@ -1257,7 +1256,7 @@ fn referred_dependencies(
// Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them:
let mut expr_to_sort_exprs = IndexMap::<ExprWrapper, Dependencies>::new();
for sort_expr in dependency_map
.keys()
.sort_exprs()
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
{
let key = ExprWrapper(Arc::clone(&sort_expr.expr));
Expand All @@ -1270,10 +1269,16 @@ fn referred_dependencies(
// Generate all valid dependencies for the source. For example, if the source
// is `a + b` and the map is `[a -> (a ASC, a DESC), b -> (b ASC)]`, we get
// `vec![HashSet(a ASC, b ASC), HashSet(a DESC, b ASC)]`.
expr_to_sort_exprs
.values()
let dependencies = expr_to_sort_exprs
.into_values()
.map(Dependencies::into_inner)
.collect::<Vec<_>>();
dependencies
.iter()
.multi_cartesian_product()
.map(|referred_deps| referred_deps.into_iter().cloned().collect())
.map(|referred_deps| {
Dependencies::new_from_iter(referred_deps.into_iter().cloned())
})
.collect()
}

Expand All @@ -1296,7 +1301,9 @@ fn construct_prefix_orderings(
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let mut dep_enumerator = DependencyEnumerator::new();
dependency_map[relevant_sort_expr]
dependency_map
.get(relevant_sort_expr)
.expect("no relevant sort expr found")
.dependencies
.iter()
.flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map))
Expand Down Expand Up @@ -1433,13 +1440,161 @@ impl DependencyNode {
}
}

// Using `IndexMap` and `IndexSet` makes sure to generate consistent results across different executions for the same query.
// We could have used `HashSet`, `HashMap` in place of them without any loss of functionality.
// As an example, if existing orderings are `[a ASC, b ASC]`, `[c ASC]` for output ordering
// both `[a ASC, b ASC, c ASC]` and `[c ASC, a ASC, b ASC]` are valid (e.g. concatenated version of the alternative orderings).
// When using `HashSet`, `HashMap` it is not guaranteed to generate consistent result, among the possible 2 results in the example above.
type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
type Dependencies = IndexSet<PhysicalSortExpr>;
impl Display for DependencyNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(target) = &self.target_sort_expr {
write!(f, "(target: {}, ", target)?;
} else {
write!(f, "(")?;
}
write!(f, "dependencies: [{}])", self.dependencies)
}
}

/// Maps an expression --> DependencyNode
///
/// # Debugging / deplaying `DependencyMap`
///
/// This structure implements `Display` to assist debugging. For example:
///
/// ```text
/// DependencyMap: {
/// a@0 ASC --> (target: a@0 ASC, dependencies: [[]])
/// b@1 ASC --> (target: b@1 ASC, dependencies: [[a@0 ASC, c@2 ASC]])
/// c@2 ASC --> (target: c@2 ASC, dependencies: [[b@1 ASC, a@0 ASC]])
/// d@3 ASC --> (target: d@3 ASC, dependencies: [[c@2 ASC, b@1 ASC]])
/// }
/// ```
///
/// # Note on IndexMap Rationale
///
/// Using `IndexMap` (which preserves insert order) to ensure consistent results
/// across different executions for the same query. We could have used
/// `HashSet`, `HashMap` in place of them without any loss of functionality.
///
/// As an example, if existing orderings are
/// 1. `[a ASC, b ASC]`
/// 2. `[c ASC]` for
///
/// Then both the following output orderings are valid
/// 1. `[a ASC, b ASC, c ASC]`
/// 2. `[c ASC, a ASC, b ASC]`
///
/// (this are both valid as they are concatenated versions of the alternative
/// orderings). When using `HashSet`, `HashMap` it is not guaranteed to generate
/// consistent result, among the possible 2 results in the example above.
#[derive(Debug)]
struct DependencyMap {
inner: IndexMap<PhysicalSortExpr, DependencyNode>,
}

impl DependencyMap {
fn new() -> Self {
Self {
inner: IndexMap::new(),
}
}

/// Insert a new dependency `sort_expr` --> `dependency` into the map.
///
/// If `target_sort_expr` is none, a new entry is created with empty dependencies.
fn insert(
&mut self,
sort_expr: &PhysicalSortExpr,
target_sort_expr: Option<&PhysicalSortExpr>,
dependency: Option<&PhysicalSortExpr>,
) {
self.inner
.entry(sort_expr.clone())
.or_insert_with(|| DependencyNode {
target_sort_expr: target_sort_expr.cloned(),
dependencies: Dependencies::new(),
})
.insert_dependency(dependency)
}

/// Iterator over (sort_expr, DependencyNode) pairs
fn iter(&self) -> impl Iterator<Item = (&PhysicalSortExpr, &DependencyNode)> {
self.inner.iter()
}

/// iterator over all sort exprs
fn sort_exprs(&self) -> impl Iterator<Item = &PhysicalSortExpr> {
self.inner.keys()
}

/// Return the dependency node for the given sort expression, if any
fn get(&self, sort_expr: &PhysicalSortExpr) -> Option<&DependencyNode> {
self.inner.get(sort_expr)
}
}

impl Display for DependencyMap {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "DependencyMap: {{")?;
for (sort_expr, node) in self.inner.iter() {
writeln!(f, " {sort_expr} --> {node}")?;
}
writeln!(f, "}}")
}
}

/// A list of sort expressions that can be calculated from a known set of
/// dependencies.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
struct Dependencies {
inner: IndexSet<PhysicalSortExpr>,
}

impl Display for Dependencies {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[")?;
let mut iter = self.inner.iter();
if let Some(dep) = iter.next() {
write!(f, "{}", dep)?;
}
for dep in iter {
write!(f, ", {}", dep)?;
}
write!(f, "]")
}
}

impl Dependencies {
/// Create a new empty `Dependencies` instance.
fn new() -> Self {
Self {
inner: IndexSet::new(),
}
}

/// Create a new `Dependencies` from an iterator of `PhysicalSortExpr`.
fn new_from_iter(iter: impl IntoIterator<Item = PhysicalSortExpr>) -> Self {
Self {
inner: iter.into_iter().collect(),
}
}

/// Insert a new dependency into the set.
fn insert(&mut self, sort_expr: PhysicalSortExpr) {
self.inner.insert(sort_expr);
}

/// Iterator over dependencies in the set
fn iter(&self) -> impl Iterator<Item = &PhysicalSortExpr> + Clone {
self.inner.iter()
}

/// Return the inner set of dependencies
fn into_inner(self) -> IndexSet<PhysicalSortExpr> {
self.inner
}

/// Returns true if there are no dependencies
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}

/// Contains a mapping of all dependencies we have processed for each sort expr
struct DependencyEnumerator<'a> {
Expand Down Expand Up @@ -1487,8 +1642,9 @@ impl<'a> DependencyEnumerator<'a> {
referred_sort_expr: &'a PhysicalSortExpr,
dependency_map: &'a DependencyMap,
) -> Vec<LexOrdering> {
// We are sure that `referred_sort_expr` is inside `dependency_map`.
let node = &dependency_map[referred_sort_expr];
let node = dependency_map
.get(referred_sort_expr)
.expect("`referred_sort_expr` should be inside `dependency_map`");
// Since we work on intermediate nodes, we are sure `val.target_sort_expr`
// exists.
let target_sort_expr = node.target_sort_expr.as_ref().unwrap();
Expand All @@ -1506,6 +1662,7 @@ impl<'a> DependencyEnumerator<'a> {
} else {
vec![]
};

for ordering in orderings.iter_mut() {
ordering.push(target_sort_expr.clone())
}
Expand Down