Skip to content

Commit

Permalink
Support unparsing plans after applying optimize_projections rule (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgrebnov committed Nov 5, 2024
1 parent a6586cc commit 2c69bd4
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 30 deletions.
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,10 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false

/// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections.
/// This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution.
pub optimize_projections_preserve_existing_projections: bool, default = false
}
}

Expand Down
9 changes: 9 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ impl SessionConfig {
self
}

/// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections.
/// This is useful when optimization is used alongside unparsing logic to preserve the original layout and simplify the overall query structure.
///
/// [optimize_projections_preserve_existing_projections]: datafusion_common::config::OptimizerOptions::optimize_projections_preserve_existing_projections
pub fn with_optimize_projections_preserve_existing_projections(mut self, enabled: bool) -> Self {
self.options.optimizer.optimize_projections_preserve_existing_projections = enabled;
self
}

/// Enables or disables the use of pruning predicate for parquet readers to skip row groups
pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.pruning = enabled;
Expand Down
32 changes: 25 additions & 7 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn optimize_projections(
// that its input only contains absolutely necessary columns for
// the aggregate expressions. Note that necessary_indices refer to
// fields in `aggregate.input.schema()`.
add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)
add_projection_on_top_if_helpful(aggregate_input, necessary_exprs, config)
})?
.map_data(|aggregate_input| {
// Create a new aggregate plan with the updated input and only the
Expand Down Expand Up @@ -233,9 +233,14 @@ fn optimize_projections(
// refers to that schema
let required_exprs =
required_indices.get_required_exprs(&input_schema);
let window_child =
add_projection_on_top_if_helpful(window_child, required_exprs)?
.data;

let window_child = add_projection_on_top_if_helpful(
window_child,
required_exprs,
config,
)?
.data;

Window::try_new(new_window_expr, Arc::new(window_child))
.map(LogicalPlan::Window)
.map(Transformed::yes)
Expand Down Expand Up @@ -409,7 +414,7 @@ fn optimize_projections(
optimize_projections(child, config, required_indices)?.transform_data(
|new_input| {
if projection_beneficial {
add_projection_on_top_if_helpful(new_input, project_exprs)
add_projection_on_top_if_helpful(new_input, project_exprs, config)
} else {
Ok(Transformed::no(new_input))
}
Expand Down Expand Up @@ -708,16 +713,23 @@ fn split_join_requirements(
///
/// * `plan` - The input `LogicalPlan` to potentially add a projection to.
/// * `project_exprs` - A list of expressions for the projection.
/// * `config` - A reference to the optimizer configuration.
///
/// # Returns
///
/// A `Transformed` indicating if a projection was added
fn add_projection_on_top_if_helpful(
plan: LogicalPlan,
project_exprs: Vec<Expr>,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
// Make sure projection decreases the number of columns, otherwise it is unnecessary.
if project_exprs.len() >= plan.schema().fields().len() {
if config
.options()
.optimizer
.optimize_projections_preserve_existing_projections
|| project_exprs.len() >= plan.schema().fields().len()
{
Ok(Transformed::no(plan))
} else {
Projection::try_new(project_exprs, Arc::new(plan))
Expand Down Expand Up @@ -759,7 +771,7 @@ fn rewrite_projection_given_requirements(
// projection down
optimize_projections(Arc::unwrap_or_clone(input), config, required_indices)?
.transform_data(|input| {
if is_projection_unnecessary(&input, &exprs_used)? {
if is_projection_unnecessary(&input, &exprs_used, config)? {
Ok(Transformed::yes(input))
} else {
Projection::try_new(exprs_used, Arc::new(input))
Expand All @@ -770,9 +782,15 @@ fn rewrite_projection_given_requirements(
}

/// Projection is unnecessary, when
/// - `optimize_projections_preserve_existing_projections` optimizer config is false, and
/// - input schema of the projection, output schema of the projection are same, and
/// - all projection expressions are either Column or Literal
fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result<bool> {
!config
.options()
.optimizer
.optimize_projections_preserve_existing_projections

let proj_schema = projection_schema(input, proj_exprs)?;
Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial))
}
Expand Down
56 changes: 33 additions & 23 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl Unparser<'_> {
match plan {
LogicalPlan::TableScan(scan) => {
if let Some(unparsed_table_scan) =
Self::unparse_table_scan_pushdown(plan, None)?
Self::unparse_table_scan_pushdown(plan, None, select.already_projected())?
{
return self.select_to_sql_recursively(
&unparsed_table_scan,
Expand Down Expand Up @@ -567,7 +567,8 @@ impl Unparser<'_> {
let unparsed_table_scan = Self::unparse_table_scan_pushdown(
plan,
Some(plan_alias.alias.clone()),
)?;
select.already_projected(),
)?;
// if the child plan is a TableScan with pushdown operations, we don't need to
// create an additional subquery for it
if !select.already_projected() && unparsed_table_scan.is_none() {
Expand Down Expand Up @@ -696,6 +697,7 @@ impl Unparser<'_> {
fn unparse_table_scan_pushdown(
plan: &LogicalPlan,
alias: Option<TableReference>,
already_projected: bool,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::TableScan(table_scan) => {
Expand Down Expand Up @@ -725,24 +727,29 @@ impl Unparser<'_> {
}
}

if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
builder = builder.project(project_columns)?;
// Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
// For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
// information included in the TableScan node.
if !already_projected {
if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
builder = builder.project(project_columns)?;
}
}

let filter_expr: Result<Option<Expr>> = table_scan
Expand Down Expand Up @@ -787,14 +794,17 @@ impl Unparser<'_> {
Self::unparse_table_scan_pushdown(
&subquery_alias.input,
Some(subquery_alias.alias.clone()),
already_projected,
)
}
// SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
// The inner table scan could be a scan with pushdown operations.
LogicalPlan::Projection(projection) => {
if let Some(plan) =
Self::unparse_table_scan_pushdown(&projection.input, alias.clone())?
{
if let Some(plan) = Self::unparse_table_scan_pushdown(
&projection.input,
alias.clone(),
already_projected,
)? {
let exprs = if alias.is_some() {
let mut alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ fn test_table_scan_pushdown() -> Result<()> {
let query_from_table_scan_with_projection = LogicalPlanBuilder::from(
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?,
)
.project(vec![col("id"), col("age")])?
.project(vec![wildcard()])?
.build()?;
let query_from_table_scan_with_projection =
Expand Down

0 comments on commit 2c69bd4

Please sign in to comment.