Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate more typ and arity calls #30884

Merged
merged 4 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/compute-types/src/plan/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,6 @@ impl Context {
equivalences,
implementation,
} => {
let input_mapper = JoinInputMapper::new(inputs);

// Plan each of the join inputs independently.
// The `plans` get surfaced upwards, and the `input_keys` should
// be used as part of join planning / to validate the existing
Expand All @@ -564,6 +562,9 @@ impl Context {
input_keys.push(keys);
}

let input_mapper =
JoinInputMapper::new_from_input_arities(input_arities.iter().copied());

// Extract temporal predicates as joins cannot currently absorb them.
let (plan, missing) = match implementation {
IndexedFilter(_coll_id, _idx_id, key, _val) => {
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ pub fn enforce_linear_chains(expr: &mut MirRelationExpr) -> Result<(), ExplainEr
// .map(|id| LocalId::new(1000_u64 + u64::cast_from(id_map.len()) + id))
// .unwrap();
let id = id_gen.next().unwrap();
let value = input.take_safely();
let value = input.take_safely(None);
// generate a `let $fresh_id = $body in $fresh_id` to replace this input
let mut binding = MirRelationExpr::Let {
id,
Expand Down
2 changes: 2 additions & 0 deletions src/expr/src/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ impl MapFilterProject {
let (mfp, expr) = Self::extract_from_expression(input);
(mfp.project(outputs.iter().cloned()), expr)
}
// TODO: The recursion is quadratic in the number of Map/Filter/Project operators due to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// this call to `arity()`.
x => (Self::new(x.arity()), x),
}
}
Expand Down
48 changes: 38 additions & 10 deletions src/expr/src/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use mz_ore::collections::CollectionExt;
use mz_ore::id_gen::IdGen;
use mz_ore::metrics::Histogram;
use mz_ore::num::NonNeg;
use mz_ore::soft_assert_no_log;
use mz_ore::stack::RecursionLimitError;
use mz_ore::str::Indent;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
Expand Down Expand Up @@ -592,7 +593,7 @@ impl MirRelationExpr {
/// Reports the column types of the relation given the column types of the
/// input relations.
///
/// This method delegates to `try_col_with_input_cols`, panicing if an `Err`
/// This method delegates to `try_col_with_input_cols`, panicking if an `Err`
/// variant is returned.
pub fn col_with_input_cols<'a, I>(&self, input_types: I) -> Vec<ColumnType>
where
Expand Down Expand Up @@ -1687,9 +1688,27 @@ impl MirRelationExpr {
})
}

/// Take ownership of `self`, leaving an empty `MirRelationExpr::Constant` with the correct type.
pub fn take_safely(&mut self) -> MirRelationExpr {
let typ = self.typ();
/// Take ownership of `self`, leaving an empty `MirRelationExpr::Constant` with the optionally
/// given scalar types. The given scalar types should be `base_eq` with the types that `typ()`
/// would find. Keys and nullability are ignored in the given `RelationType`, and instead we set
/// the best possible key and nullability, since we are making an empty collection.
///
/// If `typ` is not given, then this calls `.typ()` (which is possibly expensive) to determine
/// the correct type.
pub fn take_safely(&mut self, typ: Option<RelationType>) -> MirRelationExpr {
if let Some(typ) = &typ {
soft_assert_no_log!(self
.typ()
.column_types
.iter()
.zip_eq(typ.column_types.iter())
.all(|(t1, t2)| t1.scalar_type.base_eq(&t2.scalar_type)));
}
let mut typ = typ.unwrap_or_else(|| self.typ());
typ.keys = vec![vec![]];
for ct in typ.column_types.iter_mut() {
ct.nullable = false;
}
std::mem::replace(
self,
MirRelationExpr::Constant {
Expand All @@ -1698,6 +1717,14 @@ impl MirRelationExpr {
},
)
}

/// Take ownership of `self`, leaving an empty `MirRelationExpr::Constant` with the given scalar
/// types. Nullability is ignored in the given `ColumnType`s, and instead we set the best
/// possible nullability, since we are making an empty collection.
pub fn take_safely_with_col_types(&mut self, typ: Vec<ColumnType>) -> MirRelationExpr {
self.take_safely(Some(RelationType::new(typ)))
}

/// Take ownership of `self`, leaving an empty `MirRelationExpr::Constant` with an **incorrect** type.
///
/// This should only be used if `self` is about to be dropped or otherwise overwritten.
Expand Down Expand Up @@ -1760,22 +1787,21 @@ impl MirRelationExpr {
.unzip();
assert_eq!(keys_and_values.arity() - self.arity(), data.len());
self.let_in(id_gen, |_id_gen, get_keys| {
let get_keys_arity = get_keys.arity();
Ok(MirRelationExpr::join(
vec![
// all the missing keys (with count 1)
keys_and_values
.distinct_by((0..get_keys.arity()).collect())
.distinct_by((0..get_keys_arity).collect())
.negate()
.union(get_keys.clone().distinct()),
// join with keys to get the correct counts
get_keys.clone(),
],
(0..get_keys.arity())
.map(|i| vec![(0, i), (1, i)])
.collect(),
(0..get_keys_arity).map(|i| vec![(0, i), (1, i)]).collect(),
)
// get rid of the extra copies of columns from keys
.project((0..get_keys.arity()).collect())
.project((0..get_keys_arity).collect())
// This join is logically equivalent to
// `.map(<default_expr>)`, but using a join allows for
// potential predicate pushdown and elision in the
Expand Down Expand Up @@ -2233,11 +2259,13 @@ impl MirRelationExpr {
value.visit_pre_mut(|e| {
if let MirRelationExpr::Get {
id: crate::Id::Local(id),
typ,
..
} = e
{
let typ = typ.clone();
if deadlist.contains(id) {
e.take_safely();
e.take_safely(Some(typ));
}
}
});
Expand Down
10 changes: 6 additions & 4 deletions src/sql/src/plan/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,8 @@ impl HirScalarExpr {

// Record input arity here so that any group_keys that need to mutate get_inner
// don't add those columns to the aggregate input.
let input_arity = get_inner.typ().arity();
let input_type = get_inner.typ();
let input_arity = input_type.arity();
// The reduction that computes the window function must be keyed on the columns
// from the outer context, plus the expressions in the partition key. The current
// subquery will be 'executed' for every distinct row from the outer context so
Expand Down Expand Up @@ -1428,8 +1429,6 @@ impl HirScalarExpr {
}

get_inner.let_in(id_gen, |id_gen, mut get_inner| {
let input_type = get_inner.typ();

// Original columns of the relation
let fields: Box<_> = input_type
.column_types
Expand Down Expand Up @@ -1575,12 +1574,14 @@ impl HirScalarExpr {
let inner_arity = get_inner.arity();
let mut total_arity = inner_arity;
let mut join_inputs = vec![get_inner];
let mut join_input_arities = vec![inner_arity];
for (expr, subquery) in subqueries.into_iter() {
// Avoid lowering duplicated subqueries
if !subquery_map.contains_key(&expr) {
let subquery_arity = subquery.arity();
assert_eq!(subquery_arity, inner_arity + 1);
join_inputs.push(subquery);
join_input_arities.push(subquery_arity);
total_arity += subquery_arity;

// Column with the value of the subquery
Expand All @@ -1590,7 +1591,8 @@ impl HirScalarExpr {
// Each subquery projects all the columns of the outer context (distinct_inner)
// plus 1 column, containing the result of the subquery. Those columns must be
// joined with the outer/main relation (get_inner).
let input_mapper = mz_expr::JoinInputMapper::new(&join_inputs);
let input_mapper =
mz_expr::JoinInputMapper::new_from_input_arities(join_input_arities);
let equivalences = (0..inner_arity)
.map(|col| {
join_inputs
Expand Down
2 changes: 1 addition & 1 deletion src/transform/src/canonicalization/flatmap_to_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl FlatMapToMap {
match (iter.next(), iter.next()) {
(None, _) => {
// If there are no elements in the literal argument, no output.
relation.take_safely();
relation.take_safely(None);
}
(Some((row, 1)), None) => {
*relation =
Expand Down
2 changes: 1 addition & 1 deletion src/transform/src/canonicalization/topk_elision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl TopKElision {
if limit.as_ref().map_or(true, |l| l.is_literal_null()) && *offset == 0 {
*relation = input.take_dangerous();
} else if limit.as_ref().and_then(|l| l.as_literal_int64()) == Some(0) {
relation.take_safely();
relation.take_safely(None);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/transform/src/demand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Demand {
),
MirRelationExpr::Map { input, scalars } => {
let relation_type = relation_type.as_ref().unwrap();
let arity = input.arity();
let arity = relation_type.arity() - scalars.len();
// contains columns whose supports have yet to be explored
let mut new_columns = columns.clone();
new_columns.retain(|c| *c >= arity);
Expand Down Expand Up @@ -219,7 +219,8 @@ impl Demand {
for expr in exprs {
expr.support_into(&mut columns);
}
columns.retain(|c| *c < input.arity());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ouch

let arity = input.arity();
columns.retain(|c| *c < arity);
self.action(input, columns, gets)
}
MirRelationExpr::Filter { input, predicates } => {
Expand Down
5 changes: 3 additions & 2 deletions src/transform/src/equivalence_propagation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl EquivalencePropagation {
let expr_type = derived
.value::<RelationType>()
.expect("RelationType required");
assert!(expr_type.is_some());
let expr_equivalences = derived
.value::<Equivalences>()
.expect("Equivalences required");
Expand All @@ -132,7 +133,7 @@ impl EquivalencePropagation {
let expr_equivalences = if let Some(e) = expr_equivalences {
e
} else {
expr.take_safely();
expr.take_safely_with_col_types(expr_type.clone().unwrap());
return;
};

Expand All @@ -147,7 +148,7 @@ impl EquivalencePropagation {

outer_equivalences.minimize(expr_type.as_ref().map(|x| &x[..]));
if outer_equivalences.unsatisfiable() {
expr.take_safely();
expr.take_safely_with_col_types(expr_type.clone().unwrap());
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/transform/src/fold_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl FoldConstants {
.iter()
.any(|p| p.is_literal_false() || p.is_literal_null())
{
relation.take_safely();
relation.take_safely(Some(relation_type.clone()));
} else if let Some((rows, ..)) = (**input).as_const() {
// Evaluate errors last, to reduce risk of spurious errors.
predicates.sort_by_key(|p| p.is_literal_err());
Expand Down Expand Up @@ -291,7 +291,7 @@ impl FoldConstants {
..
} => {
if inputs.iter().any(|e| e.is_empty()) {
relation.take_safely();
relation.take_safely(Some(relation_type.clone()));
} else if let Some(e) = inputs.iter().find_map(|i| i.as_const_err()) {
*relation = MirRelationExpr::Constant {
rows: Err(e.clone()),
Expand Down
2 changes: 1 addition & 1 deletion src/transform/src/fusion/top_k.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl TopK {
}

if let Some(0) = limit.as_ref().and_then(|l| l.as_literal_int64()) {
relation.take_safely();
relation.take_safely(None);
break;
}

Expand Down
32 changes: 20 additions & 12 deletions src/transform/src/literal_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use mz_ore::collections::CollectionExt;
use mz_ore::iter::IteratorExt;
use mz_ore::stack::RecursionLimitError;
use mz_ore::vec::swap_remove_multiple;
use mz_repr::{GlobalId, Row};
use mz_repr::{GlobalId, RelationType, Row};

use crate::canonicalize_mfp::CanonicalizeMfp;
use crate::notice::IndexTooWideForLiteralConstraints;
Expand Down Expand Up @@ -70,7 +70,9 @@ impl LiteralConstraints {
relation.try_visit_mut_children(|e| self.action(e, transform_ctx))?;

if let MirRelationExpr::Get {
id: Id::Global(id), ..
id: Id::Global(id),
ref typ,
..
} = *relation
{
let orig_mfp = mfp.clone();
Expand All @@ -87,10 +89,11 @@ impl LiteralConstraints {
mfp: &mut MapFilterProject,
orig_mfp: &MapFilterProject,
relation: &MirRelationExpr,
relation_type: RelationType,
) {
// undo list_of_predicates_to_and_of_predicates, distribute_and_over_or, unary_and
// (It undoes the latter 2 through `MirScalarExp::reduce`.)
LiteralConstraints::canonicalize_predicates(mfp, relation);
LiteralConstraints::canonicalize_predicates(mfp, relation, relation_type);
// undo inline_literal_constraints
mfp.optimize();
// We can usually undo, but sometimes not (see comment on `distribute_and_over_or`),
Expand All @@ -109,14 +112,16 @@ impl LiteralConstraints {
// todo: We might want to also call `canonicalize_equivalences`,
// see near the end of literal_constraints.slt.

let inp_typ = typ.clone();

let key_val = Self::detect_literal_constraints(&mfp, id, transform_ctx);

match key_val {
None => {
// We didn't find a usable index, so no chance to remove literal constraints.
// But, we might have removed contradicting OR args.
if removed_contradicting_or_args {
undo_preparation(&mut mfp, &orig_mfp, relation);
undo_preparation(&mut mfp, &orig_mfp, relation, inp_typ);
} else {
// We didn't remove anything, so let's go with the original MFP.
mfp = orig_mfp;
Expand All @@ -130,7 +135,7 @@ impl LiteralConstraints {
{
// We were able to remove the literal constraints or contradicting OR args,
// so we would like to use this new MFP, so we try undoing the preparation.
undo_preparation(&mut mfp, &orig_mfp, relation);
undo_preparation(&mut mfp, &orig_mfp, relation, inp_typ.clone());
} else {
// We were not able to remove the literal constraint, so `mfp` is
// equivalent to `orig_mfp`, but `orig_mfp` is often simpler (or the same).
Expand All @@ -140,7 +145,6 @@ impl LiteralConstraints {
// We transform the Get into a semi-join with a constant collection.

let inp_id = id.clone();
let inp_typ = relation.typ();
let filter_list = MirRelationExpr::Constant {
rows: Ok(possible_vals.iter().map(|val| (val.clone(), 1)).collect()),
typ: mz_repr::RelationType {
Expand All @@ -164,10 +168,7 @@ impl LiteralConstraints {
if possible_vals.is_empty() {
// Even better than what we were hoping for: Found contradicting
// literal constraints, so the whole relation is empty.
*relation = MirRelationExpr::Constant {
rows: Ok(Vec::new()),
typ: relation.typ(),
};
relation.take_safely(Some(inp_typ));
} else {
// The common case: We need to build the join which is the main point of
// this transform.
Expand Down Expand Up @@ -610,9 +611,16 @@ impl LiteralConstraints {
}

/// Call [mz_expr::canonicalize::canonicalize_predicates] on each of the predicates in the MFP.
fn canonicalize_predicates(mfp: &mut MapFilterProject, relation: &MirRelationExpr) {
fn canonicalize_predicates(
mfp: &mut MapFilterProject,
relation: &MirRelationExpr,
relation_type: RelationType,
) {
let (map, mut predicates, project) = mfp.as_map_filter_project();
let typ_after_map = relation.clone().map(map.clone()).typ();
let typ_after_map = relation
.clone()
.map(map.clone())
.typ_with_input_types(&[relation_type]);
canonicalize_predicates(&mut predicates, &typ_after_map.column_types);
// Rebuild the MFP with the new predicates.
*mfp = MapFilterProject::new(mfp.input_arity)
Expand Down
2 changes: 1 addition & 1 deletion src/transform/src/non_null_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl NonNullRequirements {
{
// A null value was introduced in a marked column;
// the entire expression can be zeroed out.
relation.take_safely();
relation.take_safely(None);
Ok(())
} else {
// For each column, if it must be non-null, extract the expression's
Expand Down
2 changes: 1 addition & 1 deletion src/transform/src/predicate_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ impl PredicatePushdown {
.count()
> 1
{
relation.take_safely();
relation.take_safely(Some(relation.typ_with_input_types(&input_types)));
return Ok(());
}

Expand Down
Loading