Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate duplicate function LogicalPlan::with_new_inputs #8707

Merged
merged 2 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl LogicalPlanBuilder {
)
})
.collect::<Result<Vec<_>>>()?;
curr_plan.with_new_inputs(&new_inputs)
curr_plan.with_new_exprs(curr_plan.expressions(), &new_inputs)
}
}
}
Expand Down
47 changes: 5 additions & 42 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,35 +541,9 @@ impl LogicalPlan {
}

/// Returns a copy of this `LogicalPlan` with the new inputs
#[deprecated(since = "35.0.0", note = "please use `with_new_exprs` instead")]
pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> Result<LogicalPlan> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we mark this deprecated for at least a release or two to help users migrate? I always apprecate the hint in the deprecation message that tells me what to change so I don't have to go poking around in the commit log to see what happened.

Perhaps it could call through to the with_new_exprs API, something like (untested):

    #[deprecated(since = "35.0.0", note = "please use `with_new_exprs` instead")]
    pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> Result<LogicalPlan> {
      self.with_new_exprs(self, self.expressions(), inputs)
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Marked it as deprecated one.

// with_new_inputs use original expression,
// so we don't need to recompute Schema.
match &self {
LogicalPlan::Projection(projection) => {
// Schema of the projection may change
// when its input changes. Hence we should use
// `try_new` method instead of `try_new_with_schema`.
Projection::try_new(projection.expr.to_vec(), Arc::new(inputs[0].clone()))
.map(LogicalPlan::Projection)
}
LogicalPlan::Window(Window { window_expr, .. }) => Ok(LogicalPlan::Window(
Window::try_new(window_expr.to_vec(), Arc::new(inputs[0].clone()))?,
)),
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
..
}) => Aggregate::try_new(
// Schema of the aggregate may change
// when its input changes. Hence we should use
// `try_new` method instead of `try_new_with_schema`.
Arc::new(inputs[0].clone()),
group_expr.to_vec(),
aggr_expr.to_vec(),
)
.map(LogicalPlan::Aggregate),
_ => self.with_new_exprs(self.expressions(), inputs),
}
self.with_new_exprs(self.expressions(), inputs)
}

/// Returns a new `LogicalPlan` based on `self` with inputs and
Expand All @@ -591,10 +565,6 @@ impl LogicalPlan {
/// // create new plan using rewritten_exprs in same position
/// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
/// ```
///
/// Note: sometimes [`Self::with_new_exprs`] will use schema of
/// original plan, it will not change the scheam. Such as
/// `Projection/Aggregate/Window`
Comment on lines -595 to -597
Copy link
Member Author

@viirya viirya Jan 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks incorrect. For Projection/Aggregate/Window, the schema might change if input/expressions are changed.

pub fn with_new_exprs(
&self,
mut expr: Vec<Expr>,
Expand Down Expand Up @@ -706,17 +676,10 @@ impl LogicalPlan {
}))
}
},
LogicalPlan::Window(Window {
window_expr,
schema,
..
}) => {
LogicalPlan::Window(Window { window_expr, .. }) => {
assert_eq!(window_expr.len(), expr.len());
Ok(LogicalPlan::Window(Window {
input: Arc::new(inputs[0].clone()),
window_expr: expr,
schema: schema.clone(),
}))
Comment on lines 680 to -719
Copy link
Member Author

@viirya viirya Jan 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a potential bug. Since input/window_expr will change, we shouldn't reuse previous schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe @mustafasrepo remembers the context -- I vaguely remember something like this but can't remember the specifics

Copy link
Contributor

@mustafasrepo mustafasrepo Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a potential bug. Since input/window_expr will change, we shouldn't reuse previous schema.

Agree that, previous version wasn't robust to expression and input changes. This version is much better. Thanks @viirya for catching this.

Window::try_new(expr, Arc::new(inputs[0].clone()))
.map(LogicalPlan::Window)
}
LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
// group exprs are the first expressions
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/tree_node/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl TreeNode for LogicalPlan {
.zip(new_children.iter())
.any(|(c1, c2)| c1 != &c2)
{
self.with_new_inputs(new_children.as_slice())
self.with_new_exprs(self.expressions(), new_children.as_slice())
} else {
Ok(self)
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ impl OptimizerRule for EliminateOuterJoin {
schema: join.schema.clone(),
null_equals_null: join.null_equals_null,
});
let new_plan = plan.with_new_inputs(&[new_join])?;
let new_plan =
plan.with_new_exprs(plan.expressions(), &[new_join])?;
Ok(Some(new_plan))
}
_ => Ok(None),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ fn optimize_projections(
// `old_child` during construction:
.map(|(new_input, old_child)| new_input.unwrap_or_else(|| old_child.clone()))
.collect::<Vec<_>>();
plan.with_new_inputs(&new_inputs).map(Some)
plan.with_new_exprs(plan.expressions(), &new_inputs)
.map(Some)
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl Optimizer {
})
.collect::<Vec<_>>();

Ok(Some(plan.with_new_inputs(&new_inputs)?))
Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
}

/// Use a rule to optimize the whole plan.
Expand Down
28 changes: 19 additions & 9 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,11 @@ impl OptimizerRule for PushDownFilter {
| LogicalPlan::Distinct(_)
| LogicalPlan::Sort(_) => {
// commutable
let new_filter =
plan.with_new_inputs(&[child_plan.inputs()[0].clone()])?;
child_plan.with_new_inputs(&[new_filter])?
let new_filter = plan.with_new_exprs(
plan.expressions(),
&[child_plan.inputs()[0].clone()],
)?;
child_plan.with_new_exprs(child_plan.expressions(), &[new_filter])?
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
let mut replace_map = HashMap::new();
Expand All @@ -716,7 +718,7 @@ impl OptimizerRule for PushDownFilter {
new_predicate,
subquery_alias.input.clone(),
)?);
child_plan.with_new_inputs(&[new_filter])?
child_plan.with_new_exprs(child_plan.expressions(), &[new_filter])?
}
LogicalPlan::Projection(projection) => {
// A projection is filter-commutable if it do not contain volatile predicates or contain volatile
Expand Down Expand Up @@ -760,10 +762,15 @@ impl OptimizerRule for PushDownFilter {
)?);

match conjunction(keep_predicates) {
None => child_plan.with_new_inputs(&[new_filter])?,
None => child_plan.with_new_exprs(
child_plan.expressions(),
&[new_filter],
)?,
Some(keep_predicate) => {
let child_plan =
child_plan.with_new_inputs(&[new_filter])?;
let child_plan = child_plan.with_new_exprs(
child_plan.expressions(),
&[new_filter],
)?;
LogicalPlan::Filter(Filter::try_new(
keep_predicate,
Arc::new(child_plan),
Expand Down Expand Up @@ -837,7 +844,9 @@ impl OptimizerRule for PushDownFilter {
)?),
None => (*agg.input).clone(),
};
let new_agg = filter.input.with_new_inputs(&vec![child])?;
let new_agg = filter
.input
.with_new_exprs(filter.input.expressions(), &vec![child])?;
match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
predicate,
Expand Down Expand Up @@ -942,7 +951,8 @@ impl OptimizerRule for PushDownFilter {
None => extension_plan.node.inputs().into_iter().cloned().collect(),
};
// extension with new inputs.
let new_extension = child_plan.with_new_inputs(&new_children)?;
let new_extension =
child_plan.with_new_exprs(child_plan.expressions(), &new_children)?;

match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
Expand Down
23 changes: 13 additions & 10 deletions datafusion/optimizer/src/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl OptimizerRule for PushDownLimit {
fetch: scan.fetch.map(|x| min(x, limit)).or(Some(limit)),
projected_schema: scan.projected_schema.clone(),
});
Some(plan.with_new_inputs(&[new_input])?)
Some(plan.with_new_exprs(plan.expressions(), &[new_input])?)
}
}
LogicalPlan::Union(union) => {
Expand All @@ -145,7 +145,7 @@ impl OptimizerRule for PushDownLimit {
inputs: new_inputs,
schema: union.schema.clone(),
});
Some(plan.with_new_inputs(&[union])?)
Some(plan.with_new_exprs(plan.expressions(), &[union])?)
}

LogicalPlan::CrossJoin(cross_join) => {
Expand All @@ -166,15 +166,16 @@ impl OptimizerRule for PushDownLimit {
right: Arc::new(new_right),
schema: plan.schema().clone(),
});
Some(plan.with_new_inputs(&[new_cross_join])?)
Some(plan.with_new_exprs(plan.expressions(), &[new_cross_join])?)
}

LogicalPlan::Join(join) => {
let new_join = push_down_join(join, fetch + skip);
match new_join {
Some(new_join) => {
Some(plan.with_new_inputs(&[LogicalPlan::Join(new_join)])?)
}
Some(new_join) => Some(plan.with_new_exprs(
plan.expressions(),
&[LogicalPlan::Join(new_join)],
)?),
None => None,
}
}
Expand All @@ -192,14 +193,16 @@ impl OptimizerRule for PushDownLimit {
input: Arc::new((*sort.input).clone()),
fetch: new_fetch,
});
Some(plan.with_new_inputs(&[new_sort])?)
Some(plan.with_new_exprs(plan.expressions(), &[new_sort])?)
}
}
LogicalPlan::Projection(_) | LogicalPlan::SubqueryAlias(_) => {
// commute
let new_limit =
plan.with_new_inputs(&[child_plan.inputs()[0].clone()])?;
Some(child_plan.with_new_inputs(&[new_limit])?)
let new_limit = plan.with_new_exprs(
plan.expressions(),
&[child_plan.inputs()[0].clone()],
)?;
Some(child_plan.with_new_exprs(child_plan.expressions(), &[new_limit])?)
}
_ => None,
};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn optimize_children(
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
if plan_is_changed {
Ok(Some(plan.with_new_inputs(&new_inputs)?))
Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
} else {
Ok(None)
}
Expand Down