Skip to content

Commit

Permalink
Add binary type coercion to logical plan and do not allow CAST to cha…
Browse files Browse the repository at this point in the history
…nge an expression name
  • Loading branch information
andygrove committed Sep 3, 2022
1 parent 786c319 commit f5fbe25
Show file tree
Hide file tree
Showing 20 changed files with 753 additions and 554 deletions.
13 changes: 8 additions & 5 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
use datafusion_optimizer::pre_cast_lit_in_comparison::PreCastLitInComparisonExpressions;
use datafusion_optimizer::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
use datafusion_optimizer::scalar_subquery_to_join::ScalarSubqueryToJoin;
use datafusion_optimizer::type_coercion::TypeCoercion;
use datafusion_sql::{
parser::DFParser,
planner::{ContextProvider, SqlToRel},
Expand Down Expand Up @@ -1401,6 +1402,8 @@ impl SessionState {
}
rules.push(Arc::new(ReduceOuterJoin::new()));
rules.push(Arc::new(FilterPushDown::new()));
// we do type coercion after filter push down so that we don't push CAST filters to Parquet
rules.push(Arc::new(TypeCoercion::new()));
rules.push(Arc::new(LimitPushDown::new()));
rules.push(Arc::new(SingleDistinctToGroupBy::new()));

Expand Down Expand Up @@ -1872,11 +1875,11 @@ mod tests {
.await?;

let expected = vec![
"+----------------------+------------------------+------------------------+",
"| @@version | @name | @integer Plus Int64(1) |",
"+----------------------+------------------------+------------------------+",
"| system-var-@@version | user-defined-var-@name | 42 |",
"+----------------------+------------------------+------------------------+",
"+----------------------+------------------------+---------------------+",
"| @@version | @name | @integer + Int64(1) |",
"+----------------------+------------------------+---------------------+",
"| system-var-@@version | user-defined-var-@name | 42 |",
"+----------------------+------------------------+---------------------+",
];
assert_batches_eq!(expected, &results);

Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Expr::BinaryExpr { left, op, right } => {
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{} {:?} {}", left, op, right))
Ok(format!("{} {} {}", left, op, right))
}
Expr::Case {
expr,
Expand All @@ -128,13 +128,13 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
name += "END";
Ok(name)
}
Expr::Cast { expr, data_type } => {
let expr = create_physical_name(expr, false)?;
Ok(format!("CAST({} AS {:?})", expr, data_type))
Expr::Cast { expr, .. } => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::TryCast { expr, data_type } => {
let expr = create_physical_name(expr, false)?;
Ok(format!("TRY_CAST({} AS {:?})", expr, data_type))
Expr::TryCast { expr, .. } => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::Not(expr) => {
let expr = create_physical_name(expr, false)?;
Expand Down
16 changes: 8 additions & 8 deletions datafusion/core/tests/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,14 +667,14 @@ async fn test_fn_substr() -> Result<()> {
async fn test_cast() -> Result<()> {
let expr = cast(col("b"), DataType::Float64);
let expected = vec![
"+-------------------------+",
"| CAST(test.b AS Float64) |",
"+-------------------------+",
"| 1 |",
"| 10 |",
"| 10 |",
"| 100 |",
"+-------------------------+",
"+--------+",
"| test.b |",
"+--------+",
"| 1 |",
"| 10 |",
"| 10 |",
"| 100 |",
"+--------+",
];

assert_fn_batches!(expr, expected);
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/parquet_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ impl ContextWithParquet {
let pretty_input = pretty_format_batches(&input).unwrap().to_string();

let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");

let physical_plan = self
.ctx
.create_physical_plan(&logical_plan)
Expand Down
38 changes: 28 additions & 10 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,11 @@ async fn csv_query_external_table_sum() {
"SELECT SUM(CAST(c7 AS BIGINT)), SUM(CAST(c8 AS BIGINT)) FROM aggregate_test_100";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------------------------------------+-------------------------------------------+",
"| SUM(CAST(aggregate_test_100.c7 AS Int64)) | SUM(CAST(aggregate_test_100.c8 AS Int64)) |",
"+-------------------------------------------+-------------------------------------------+",
"| 13060 | 3017641 |",
"+-------------------------------------------+-------------------------------------------+",
"+----------------------------+----------------------------+",
"| SUM(aggregate_test_100.c7) | SUM(aggregate_test_100.c8) |",
"+----------------------------+----------------------------+",
"| 13060 | 3017641 |",
"+----------------------------+----------------------------+",
];
assert_batches_eq!(expected, &actual);
}
Expand Down Expand Up @@ -571,6 +571,24 @@ async fn csv_query_approx_count() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn csv_query_approx_count_dupe_expr_aliased() -> Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
let sql =
"SELECT approx_distinct(c9) a, approx_distinct(c9) b FROM aggregate_test_100";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----+-----+",
"| a | b |",
"+-----+-----+",
"| 100 | 100 |",
"+-----+-----+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

// This test executes the APPROX_PERCENTILE_CONT aggregation against the test
// data, asserting the estimated quantiles are ±5% their actual values.
//
Expand Down Expand Up @@ -1815,11 +1833,11 @@ async fn aggregate_avg_add() -> Result<()> {
assert_eq!(results.len(), 1);

let expected = vec![
"+--------------+----------------------------+----------------------------+----------------------------+",
"| AVG(test.c1) | AVG(test.c1) Plus Int64(1) | AVG(test.c1) Plus Int64(2) | Int64(1) Plus AVG(test.c1) |",
"+--------------+----------------------------+----------------------------+----------------------------+",
"| 1.5 | 2.5 | 3.5 | 2.5 |",
"+--------------+----------------------------+----------------------------+----------------------------+",
"+--------------+-------------------------+-------------------------+-------------------------+",
"| AVG(test.c1) | AVG(test.c1) + Int64(1) | AVG(test.c1) + Int64(2) | Int64(1) + AVG(test.c1) |",
"+--------------+-------------------------+-------------------------+-------------------------+",
"| 1.5 | 2.5 | 3.5 | 2.5 |",
"+--------------+-------------------------+-------------------------+-------------------------+",
];
assert_batches_sorted_eq!(expected, &results);

Expand Down
Loading

0 comments on commit f5fbe25

Please sign in to comment.