diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 336513035036..4ebfe76726f5 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -636,6 +636,10 @@ config_namespace! { /// then the output will be coerced to a non-view. /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. pub expand_views_at_output: bool, default = false + + /// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections. + /// This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution. + pub optimize_projections_preserve_existing_projections: bool, default = false } } diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 53646dc5b468..78b4bac6bc17 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -338,6 +338,15 @@ impl SessionConfig { self } + /// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections. + /// This is useful when optimization is used alongside unparsing logic to preserve the original layout and simplify the overall query structure. + /// + /// [optimize_projections_preserve_existing_projections]: datafusion_common::config::OptimizerOptions::optimize_projections_preserve_existing_projections + pub fn with_optimize_projections_preserve_existing_projections(mut self, enabled: bool) -> Self { + self.options.optimizer.optimize_projections_preserve_existing_projections = enabled; + self + } + /// Enables or disables the use of pruning predicate for parquet readers to skip row groups pub fn with_parquet_pruning(mut self, enabled: bool) -> Self { self.options.execution.parquet.pruning = enabled; diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 67d888abda52..7b2fc216c436 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -189,7 +189,7 @@ fn optimize_projections( // that its input only contains absolutely necessary columns for // the aggregate expressions. Note that necessary_indices refer to // fields in `aggregate.input.schema()`. - add_projection_on_top_if_helpful(aggregate_input, necessary_exprs) + add_projection_on_top_if_helpful(aggregate_input, necessary_exprs, config) })? .map_data(|aggregate_input| { // Create a new aggregate plan with the updated input and only the @@ -233,9 +233,14 @@ fn optimize_projections( // refers to that schema let required_exprs = required_indices.get_required_exprs(&input_schema); - let window_child = - add_projection_on_top_if_helpful(window_child, required_exprs)? - .data; + + let window_child = add_projection_on_top_if_helpful( + window_child, + required_exprs, + config, + )? + .data; + Window::try_new(new_window_expr, Arc::new(window_child)) .map(LogicalPlan::Window) .map(Transformed::yes) @@ -409,7 +414,7 @@ fn optimize_projections( optimize_projections(child, config, required_indices)?.transform_data( |new_input| { if projection_beneficial { - add_projection_on_top_if_helpful(new_input, project_exprs) + add_projection_on_top_if_helpful(new_input, project_exprs, config) } else { Ok(Transformed::no(new_input)) } @@ -708,6 +713,7 @@ fn split_join_requirements( /// /// * `plan` - The input `LogicalPlan` to potentially add a projection to. /// * `project_exprs` - A list of expressions for the projection. +/// * `config` - A reference to the optimizer configuration. /// /// # Returns /// @@ -715,9 +721,15 @@ fn split_join_requirements( fn add_projection_on_top_if_helpful( plan: LogicalPlan, project_exprs: Vec, + config: &dyn OptimizerConfig, ) -> Result> { // Make sure projection decreases the number of columns, otherwise it is unnecessary. - if project_exprs.len() >= plan.schema().fields().len() { + if config + .options() + .optimizer + .optimize_projections_preserve_existing_projections + || project_exprs.len() >= plan.schema().fields().len() + { Ok(Transformed::no(plan)) } else { Projection::try_new(project_exprs, Arc::new(plan)) @@ -759,7 +771,7 @@ fn rewrite_projection_given_requirements( // projection down optimize_projections(Arc::unwrap_or_clone(input), config, required_indices)? .transform_data(|input| { - if is_projection_unnecessary(&input, &exprs_used)? { + if is_projection_unnecessary(&input, &exprs_used, config)? { Ok(Transformed::yes(input)) } else { Projection::try_new(exprs_used, Arc::new(input)) @@ -770,9 +782,15 @@ fn rewrite_projection_given_requirements( } /// Projection is unnecessary, when +/// - `optimize_projections_preserve_existing_projections` optimizer config is false, and /// - input schema of the projection, output schema of the projection are same, and /// - all projection expressions are either Column or Literal fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result { +!config + .options() + .optimizer + .optimize_projections_preserve_existing_projections + let proj_schema = projection_schema(input, proj_exprs)?; Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial)) } diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 8167ddacffb4..d5aead6e7de6 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -279,7 +279,7 @@ impl Unparser<'_> { match plan { LogicalPlan::TableScan(scan) => { if let Some(unparsed_table_scan) = - Self::unparse_table_scan_pushdown(plan, None)? + Self::unparse_table_scan_pushdown(plan, None, select.already_projected())? { return self.select_to_sql_recursively( &unparsed_table_scan, @@ -567,7 +567,8 @@ impl Unparser<'_> { let unparsed_table_scan = Self::unparse_table_scan_pushdown( plan, Some(plan_alias.alias.clone()), - )?; + select.already_projected(), + )?; // if the child plan is a TableScan with pushdown operations, we don't need to // create an additional subquery for it if !select.already_projected() && unparsed_table_scan.is_none() { @@ -696,6 +697,7 @@ impl Unparser<'_> { fn unparse_table_scan_pushdown( plan: &LogicalPlan, alias: Option, + already_projected: bool, ) -> Result> { match plan { LogicalPlan::TableScan(table_scan) => { @@ -725,24 +727,29 @@ impl Unparser<'_> { } } - if let Some(project_vec) = &table_scan.projection { - let project_columns = project_vec - .iter() - .cloned() - .map(|i| { - let schema = table_scan.source.schema(); - let field = schema.field(i); - if alias.is_some() { - Column::new(alias.clone(), field.name().clone()) - } else { - Column::new( - Some(table_scan.table_name.clone()), - field.name().clone(), - ) - } - }) - .collect::>(); - builder = builder.project(project_columns)?; + // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists. + // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection + // information included in the TableScan node. + if !already_projected { + if let Some(project_vec) = &table_scan.projection { + let project_columns = project_vec + .iter() + .cloned() + .map(|i| { + let schema = table_scan.source.schema(); + let field = schema.field(i); + if alias.is_some() { + Column::new(alias.clone(), field.name().clone()) + } else { + Column::new( + Some(table_scan.table_name.clone()), + field.name().clone(), + ) + } + }) + .collect::>(); + builder = builder.project(project_columns)?; + } } let filter_expr: Result> = table_scan @@ -787,14 +794,17 @@ impl Unparser<'_> { Self::unparse_table_scan_pushdown( &subquery_alias.input, Some(subquery_alias.alias.clone()), + already_projected, ) } // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns]. // The inner table scan could be a scan with pushdown operations. LogicalPlan::Projection(projection) => { - if let Some(plan) = - Self::unparse_table_scan_pushdown(&projection.input, alias.clone())? - { + if let Some(plan) = Self::unparse_table_scan_pushdown( + &projection.input, + alias.clone(), + already_projected, + )? { let exprs = if alias.is_some() { let mut alias_rewriter = alias.as_ref().map(|alias_name| TableAliasRewriter { diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 669f9f06f035..d1e921110072 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -882,6 +882,7 @@ fn test_table_scan_pushdown() -> Result<()> { let query_from_table_scan_with_projection = LogicalPlanBuilder::from( table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?, ) + .project(vec![col("id"), col("age")])? .project(vec![wildcard()])? .build()?; let query_from_table_scan_with_projection =