Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't get the right logical plan after optimizer #3421

Closed
liukun4515 opened this issue Sep 9, 2022 · 11 comments · Fixed by #3459
Closed

Can't get the right logical plan after optimizer #3421

liukun4515 opened this issue Sep 9, 2022 · 11 comments · Fixed by #3459
Assignees
Labels
bug Something isn't working

Comments

@liukun4515
Copy link
Contributor

Describe the bug
After the type coercion moved to logical optimizer.

The order of optimizer rule is the filter_push_down is before the TypeCoercion, but we can't get the right logical plan

For example

❯ \d test
+---------------+--------------+------------+-------------+-----------------------------+-------------+
| table_catalog | table_schema | table_name | column_name | data_type                   | is_nullable |
+---------------+--------------+------------+-------------+-----------------------------+-------------+
| datafusion    | public       | test       | 0           | Timestamp(Nanosecond, None) | YES         |
| datafusion    | public       | test       | 1           | Date32                      | YES         |
| datafusion    | public       | test       | 2           | Int64                       | YES         |
| datafusion    | public       | test       | 3           | Int64                       | YES         |
| datafusion    | public       | test       | 7           | Decimal128(9, 0)            | YES         |
| datafusion    | public       | test       | 8           | Float32                     | YES         |
| datafusion    | public       | test       | 11          | Utf8                        | YES         |
| datafusion    | public       | test       | 16          | Utf8                        | YES         |
| datafusion    | public       | test       | 100000      | Int64                       | YES         |
| datafusion    | public       | test       | 100001      | Int64                       | YES         |
| datafusion    | public       | test       | 100002      | Int64                       | YES         |
+---------------+--------------+------------+-------------+-----------------------------+-------------+
❯ explain select "1" from test where "8" = 1;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                       |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: #test.1                                                                                                                                                                                                                                                        |
|               |   Filter: #test.8 = CAST(Int64(1) AS Float32)                                                                                                                                                                                                                              |
|               |     TableScan: test projection=[1, 8], partial_filters=[#test.8 = Int64(1)]                                                                                                                                                                                                |

❯ explain verbose select "1" from test where "8" = 1;
+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type                                             | plan                                                                                                                                                                                                                                                                       |
+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| initial_logical_plan                                  | Projection: #test.1                                                                                                                                                                                                                                                        |
|                                                       |   Filter: #test.8 = Int64(1)                                                                                                                                                                                                                                               |
|                                                       |     TableScan: test                                                                                                                                                                                                                                                        |
| logical_plan after simplify_expressions               | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after pre_cast_lit_in_comparison         | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after decorrelate_where_exists           | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after decorrelate_where_in               | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after scalar_subquery_to_join            | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after subquery_filter_to_join            | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after eliminate_filter                   | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after common_sub_expression_eliminate    | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after eliminate_limit                    | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after projection_push_down               | Projection: #test.1                                                                                                                                                                                                                                                        |
|                                                       |   Filter: #test.8 = Int64(1)                                                                                                                                                                                                                                               |
|                                                       |     TableScan: test projection=[1, 8]                                                                                                                                                                                                                                      |
| logical_plan after rewrite_disjunctive_predicate      | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after reduce_outer_join                  | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                         |
| logical_plan after filter_push_down                   | Projection: #test.1                                                                                                                                                                                                                                                        |
|                                                       |   Filter: #test.8 = Int64(1)                                                                                                                                                                                                                                               |
|                                                       |     TableScan: test projection=[1, 8], partial_filters=[#test.8 = Int64(1)]                                                                                                                                                                                                |
| logical_plan after TypeCoercion                       | Projection: #test.1                                                                                                                                                                                                                                                        |
|                                                       |   Filter: #test.8 = CAST(Int64(1) AS Float32)                                                                                                                                                                                                                              |
|                                                       |     TableScan: test projection=[1, 8], partial_filters=[#test.8 = Int64(1)]

The partial_filters was not casted to the right type, it will break the pruning for the parquet.

Why we assign this order for the optimizer rule?

@andygrove

To Reproduce
Steps to reproduce the behavior:

Expected behavior
A clear and concise description of what you expected to happen.

Additional context
Add any other context about the problem here.

@liukun4515 liukun4515 added the bug Something isn't working label Sep 9, 2022
@andygrove
Copy link
Member

Thanks, @liukun4515. I will look at this today or over the weekend

@andygrove
Copy link
Member

Why we assign this order for the optimizer rule?

        // we do type coercion after filter push down so that we don't push CAST filters to Parquet
        // until https://github.com/apache/arrow-datafusion/issues/3289 is resolved

@liukun4515
Copy link
Contributor Author

related issue #3289

@liukun4515 liukun4515 self-assigned this Sep 10, 2022
@andygrove
Copy link
Member

This issue exists in DataFusion 11.0.0. I don't think adding the new TypeCoercion rule introduced any regression here.

DataFusion CLI v11.0.0
❯ create external table test (a int, b float) stored as csv location 'test.csv';
0 rows in set. Query took 0.011 seconds.
❯ explain verbose select a from test where b = 2;
+-------------------------------------------------------+----------------------------------------------------------------------------------------+
| plan_type                                             | plan                                                                                   |
+-------------------------------------------------------+----------------------------------------------------------------------------------------+
| initial_logical_plan                                  | Projection: #test.a                                                                    |
|                                                       |   Filter: #test.b = Int64(2)                                                           |
|                                                       |     TableScan: test                                                                    |
| logical_plan after simplify_expressions               | SAME TEXT AS ABOVE                                                                     |
| logical_plan after decorrelate_where_exists           | SAME TEXT AS ABOVE                                                                     |
| logical_plan after decorrelate_where_in               | SAME TEXT AS ABOVE                                                                     |
| logical_plan after decorrelate_scalar_subquery        | SAME TEXT AS ABOVE                                                                     |
| logical_plan after subquery_filter_to_join            | SAME TEXT AS ABOVE                                                                     |
| logical_plan after eliminate_filter                   | SAME TEXT AS ABOVE                                                                     |
| logical_plan after common_sub_expression_eliminate    | SAME TEXT AS ABOVE                                                                     |
| logical_plan after eliminate_limit                    | SAME TEXT AS ABOVE                                                                     |
| logical_plan after projection_push_down               | Projection: #test.a                                                                    |
|                                                       |   Filter: #test.b = Int64(2)                                                           |
|                                                       |     TableScan: test projection=[a, b]                                                  |
| logical_plan after rewrite_disjunctive_predicate      | SAME TEXT AS ABOVE                                                                     |
| logical_plan after reduce_outer_join                  | SAME TEXT AS ABOVE                                                                     |
| logical_plan after filter_push_down                   | Projection: #test.a                                                                    |
|                                                       |   Filter: #test.b = Int64(2)                                                           |
|                                                       |     TableScan: test projection=[a, b], partial_filters=[#test.b = Int64(2)]            |
| logical_plan after limit_push_down                    | SAME TEXT AS ABOVE                                                                     |
| logical_plan after SingleDistinctAggregationToGroupBy | SAME TEXT AS ABOVE                                                                     |
| logical_plan                                          | Projection: #test.a                                                                    |
|                                                       |   Filter: #test.b = Int64(2)                                                           |
|                                                       |     TableScan: test projection=[a, b], partial_filters=[#test.b = Int64(2)]            |
| initial_physical_plan                                 | ProjectionExec: expr=[a@0 as a]                                                        |
|                                                       |   FilterExec: b@1 = CAST(2 AS Float32)                                                 |
|                                                       |     CsvExec: files=[tmp/test.csv], has_header=false, limit=None, projection=[a, b]     |
|                                                       |                                                                                        |
| physical_plan after aggregate_statistics              | SAME TEXT AS ABOVE                                                                     |
| physical_plan after hash_build_probe_order            | SAME TEXT AS ABOVE                                                                     |
| physical_plan after coalesce_batches                  | ProjectionExec: expr=[a@0 as a]                                                        |
|                                                       |   CoalesceBatchesExec: target_batch_size=4096                                          |
|                                                       |     FilterExec: b@1 = CAST(2 AS Float32)                                               |
|                                                       |       CsvExec: files=[tmp/test.csv], has_header=false, limit=None, projection=[a, b]   |
|                                                       |                                                                                        |
| physical_plan after repartition                       | ProjectionExec: expr=[a@0 as a]                                                        |
|                                                       |   CoalesceBatchesExec: target_batch_size=4096                                          |
|                                                       |     FilterExec: b@1 = CAST(2 AS Float32)                                               |
|                                                       |       RepartitionExec: partitioning=RoundRobinBatch(48)                                |
|                                                       |         CsvExec: files=[tmp/test.csv], has_header=false, limit=None, projection=[a, b] |
|                                                       |                                                                                        |
| physical_plan after add_merge_exec                    | SAME TEXT AS ABOVE                                                                     |
| physical_plan                                         | ProjectionExec: expr=[a@0 as a]                                                        |
|                                                       |   CoalesceBatchesExec: target_batch_size=4096                                          |
|                                                       |     FilterExec: b@1 = CAST(2 AS Float32)                                               |
|                                                       |       RepartitionExec: partitioning=RoundRobinBatch(48)                                |
|                                                       |         CsvExec: files=[tmp/test.csv], has_header=false, limit=None, projection=[a, b] |
|                                                       |                                                                                        |
+-------------------------------------------------------+----------------------------------------------------------------------------------------+
23 rows in set. Query took 0.001 seconds.

@alamb
Copy link
Contributor

alamb commented Sep 12, 2022

If there is no regression I agree this issue shouldn't hold up the release (we can fix it as part of the next release)

@liukun4515
Copy link
Contributor Author

If there is no regression I agree this issue shouldn't hold up the release (we can fix it as part of the next release)

Because in the 11.0.0, the type coercion is in the physical phase, in the logical phase we don't do type coercion.

But after the migration type coercion from physical phase to logical phase, we will get a different expr between the filter op and table scan, partial_filters.

@Dandandan
Copy link
Contributor

Instead of reordering the rules, I think type coercion should be applied to the partial_filters as well (any expression in the logical plan).

@liukun4515
Copy link
Contributor Author

Instead of reordering the rules, I think type coercion should be applied to the partial_filters as well (any expression in the logical plan).

agree, but i have checked the partial_filters in the type coercion.

I'am also confused why the partial_filters was not affected when run the type coercion.

@liukun4515
Copy link
Contributor Author

liukun4515 commented Sep 13, 2022

I find the code below:

    /// returns all expressions (non-recursively) in the current
    /// logical plan node. This does not include expressions in any
    /// children
    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
        match self {
            LogicalPlan::Projection(Projection { expr, .. }) => expr.clone(),
            LogicalPlan::Values(Values { values, .. }) => {
                values.iter().flatten().cloned().collect()
            }
            LogicalPlan::Filter(Filter { predicate, .. }) => vec![predicate.clone()],
            LogicalPlan::Repartition(Repartition {
                partitioning_scheme,
                ..
            }) => match partitioning_scheme {
                Partitioning::Hash(expr, _) => expr.clone(),
                Partitioning::DistributeBy(expr) => expr.clone(),
                Partitioning::RoundRobinBatch(_) => vec![],
            },
            LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(),
            LogicalPlan::Aggregate(Aggregate {
                group_expr,
                aggr_expr,
                ..
            }) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
            LogicalPlan::Join(Join { on, filter, .. }) => on
                .iter()
                .flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())])
                .chain(
                    filter
                        .as_ref()
                        .map(|expr| vec![expr.clone()])
                        .unwrap_or_default(),
                )
                .collect(),
            LogicalPlan::Sort(Sort { expr, .. }) => expr.clone(),
            LogicalPlan::Extension(extension) => extension.node.expressions(),
            // plans without expressions
            LogicalPlan::TableScan { .. }

The filter expr is not belong to the table scan

@Dandandan

cc @andygrove @alamb
is this a bug for the function expressions?

@Dandandan
Copy link
Contributor

Seems that is a bug in expressions, I think it has to return the expressions in partial_filter

@liukun4515
Copy link
Contributor Author

liukun4515 commented Sep 13, 2022

Seems that is a bug in expressions, I think it has to return the expressions in partial_filter

I can try it in follow-up pr. In this pr #3459, I just change the position of the type coercion rule.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants