Skip to content

Commit

Permalink
Add test for optimize_projections with preserve_projections option en…
Browse files Browse the repository at this point in the history
…abled
  • Loading branch information
sgrebnov committed Nov 5, 2024
1 parent 2c69bd4 commit 32da325
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 13 deletions.
3 changes: 2 additions & 1 deletion datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ impl SessionConfig {
}

/// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections.
/// This is useful when optimization is used alongside unparsing logic to preserve the original layout and simplify the overall query structure.
/// This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module.
/// It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution.
///
/// [optimize_projections_preserve_existing_projections]: datafusion_common::config::OptimizerOptions::optimize_projections_preserve_existing_projections
pub fn with_optimize_projections_preserve_existing_projections(mut self, enabled: bool) -> Self {
Expand Down
63 changes: 58 additions & 5 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,12 +785,19 @@ fn rewrite_projection_given_requirements(
/// - `optimize_projections_preserve_existing_projections` optimizer config is false, and
/// - input schema of the projection, output schema of the projection are same, and
/// - all projection expressions are either Column or Literal
fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result<bool> {
!config
fn is_projection_unnecessary(
input: &LogicalPlan,
proj_exprs: &[Expr],
config: &dyn OptimizerConfig,
) -> Result<bool> {
if config
.options()
.optimizer
.optimize_projections_preserve_existing_projections

{
return Ok(false);
}

let proj_schema = projection_schema(input, proj_exprs)?;
Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial))
}
Expand All @@ -807,11 +814,12 @@ mod tests {
use crate::optimize_projections::OptimizeProjections;
use crate::optimizer::Optimizer;
use crate::test::{
assert_fields_eq, assert_optimized_plan_eq, scan_empty, test_table_scan,
test_table_scan_fields, test_table_scan_with_name,
assert_fields_eq, assert_optimized_plan_eq, assert_optimized_plan_with_config_eq,
scan_empty, test_table_scan, test_table_scan_fields, test_table_scan_with_name,
};
use crate::{OptimizerContext, OptimizerRule};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference,
};
Expand Down Expand Up @@ -1998,4 +2006,49 @@ mod tests {
optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
Ok(optimized_plan)
}

#[test]
fn aggregate_filter_pushdown_preserve_projections() -> Result<()> {
let table_scan = test_table_scan()?;
let aggr_with_filter = count_udaf()
.call(vec![col("b")])
.filter(col("c").gt(lit(42)))
.build()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(
vec![col("a")],
vec![count(col("b")), aggr_with_filter.alias("count2")],
)?
.project(vec![col("a"), col("count(test.b)"), col("count2")])?
.build()?;

let expected_default = "Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
\n TableScan: test projection=[a, b, c]";

let expected_preserve_projections = "Projection: test.a, count(test.b), count2\
\n Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
\n TableScan: test projection=[a, b, c]";

let scenarios = [
(false, expected_default),
(true, expected_preserve_projections),
];

for (preserve_projections, expected_plan) in scenarios.into_iter() {
let mut config = ConfigOptions::new();
config
.optimizer
.optimize_projections_preserve_existing_projections =
preserve_projections;
let optimizer_context = OptimizerContext::new_with_options(config);
assert_optimized_plan_with_config_eq(
Arc::new(OptimizeProjections::new()),
plan.clone(),
expected_plan,
&optimizer_context,
)?;
}

Ok(())
}
}
10 changes: 10 additions & 0 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ pub struct OptimizerContext {
/// Alias generator used to generate unique aliases for subqueries
alias_generator: Arc<AliasGenerator>,

/// Configuration options for the optimizer
options: ConfigOptions,
}

Expand All @@ -161,6 +162,15 @@ impl OptimizerContext {
}
}

/// Create optimizer config with the given configuration options
pub fn new_with_options(options: ConfigOptions) -> Self {
Self {
query_execution_start_time: Utc::now(),
alias_generator: Arc::new(AliasGenerator::new()),
options,
}
}

/// Specify whether to enable the filter_null_keys rule
pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
self.options.optimizer.filter_null_join_keys = filter_null_keys;
Expand Down
13 changes: 11 additions & 2 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::analyzer::{Analyzer, AnalyzerRule};
use crate::optimizer::Optimizer;
use crate::{OptimizerContext, OptimizerRule};
use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{assert_contains, Result};
Expand Down Expand Up @@ -173,8 +173,17 @@ pub fn assert_optimized_plan_eq(
// Apply the rule once
let opt_context = OptimizerContext::new().with_max_passes(1);

assert_optimized_plan_with_config_eq(rule, plan, expected, &opt_context)
}

pub fn assert_optimized_plan_with_config_eq(
rule: Arc<dyn OptimizerRule + Send + Sync>,
plan: LogicalPlan,
expected: &str,
config: &dyn OptimizerConfig,
) -> Result<()> {
let optimizer = Optimizer::with_rules(vec![Arc::clone(&rule)]);
let optimized_plan = optimizer.optimize(plan, &opt_context, observe)?;
let optimized_plan = optimizer.optimize(plan, config, observe)?;
let formatted_plan = format!("{optimized_plan}");
assert_eq!(formatted_plan, expected);

Expand Down
12 changes: 7 additions & 5 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,11 @@ impl Unparser<'_> {
) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
if let Some(unparsed_table_scan) =
Self::unparse_table_scan_pushdown(plan, None, select.already_projected())?
{
if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
plan,
None,
select.already_projected(),
)? {
return self.select_to_sql_recursively(
&unparsed_table_scan,
query,
Expand Down Expand Up @@ -568,7 +570,7 @@ impl Unparser<'_> {
plan,
Some(plan_alias.alias.clone()),
select.already_projected(),
)?;
)?;
// if the child plan is a TableScan with pushdown operations, we don't need to
// create an additional subquery for it
if !select.already_projected() && unparsed_table_scan.is_none() {
Expand Down Expand Up @@ -728,7 +730,7 @@ impl Unparser<'_> {
}

// Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
// For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
// For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
// information included in the TableScan node.
if !already_projected {
if let Some(project_vec) = &table_scan.projection {
Expand Down

0 comments on commit 32da325

Please sign in to comment.