Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(planner): improve infer filter #16361

Merged
merged 19 commits into from
Sep 10, 2024

Conversation

Dousir9
Copy link
Member

@Dousir9 Dousir9 commented Sep 1, 2024

I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/

Summary

Modify the comparison method of ScalarExpr::BoundColumnRef to compare (table_index, column_index), thereby improving predicates move-around and reducing unnecessary shuffling.

Example-1

create or replace table t1 as select * from numbers(5);
create or replace table t2(number uint64);
create or replace stream t2_s ON TABLE t2;
insert into t2(number) select * from numbers(10);
delete from t2 where number % 3 = 0;
insert into t2 select * from numbers(8);
set disable_join_reorder = 1;
explain select * from (select t1.number, t2.number from t1 right outer join (SELECT number FROM t2_s QUALIFY row_number() OVER ( PARTITION BY number ORDER BY number DESC ) = 1) AS t2 ON t1.number = t2.number) as tt(a, b) where a is null;

Before: there is an Exchange operator on the build side of the hash join

Sort
├── output columns: [t1.number (#0), t2.number (#1)]
├── sort keys: [number ASC NULLS LAST]
├── estimated rows: 0.01
└── Exchange
    ├── output columns: [t1.number (#0), t2.number (#1), #_order_col]
    ├── exchange type: Merge
    └── Sort
        ├── output columns: [t1.number (#0), t2.number (#1), #_order_col]
        ├── sort keys: [number ASC NULLS LAST]
        ├── estimated rows: 0.01
        └── HashJoin
            ├── output columns: [t1.number (#0), t2.number (#1)]
            ├── join type: RIGHT OUTER
            ├── build keys: [t2.number (#1)]
            ├── probe keys: [t1.number (#0)]
            ├── filters: []
            ├── estimated rows: 0.01
            ├── Exchange(Build)
            │   ├── output columns: [t2.number (#1)]
            │   ├── exchange type: Hash(t2.number (#1))
            │   └── Filter
            │       ├── output columns: [t2.number (#1)]
            │       ├── filters: [row_number() OVER ( PARTITION BY number ORDER BY number DESC ) (#2) = 1]
            │       ├── estimated rows: 0.01
            │       └── Window
            │           ├── output columns: [t2.number (#1), row_number() OVER ( PARTITION BY number ORDER BY number DESC ) (#2)]
            │           ├── aggregate function: [row_number]
            │           ├── partition by: [number]
            │           ├── order by: [number]
            │           ├── frame: [Range: Preceding(None) ~ CurrentRow]
            │           └── WindowPartition
            │               ├── output columns: [t2.number (#1)]
            │               ├── hash keys: [number]
            │               ├── estimated rows: 14.00
            │               └── Exchange
            │                   ├── output columns: [t2.number (#1)]
            │                   ├── exchange type: Hash(t2.number (#1))
            │                   └── TableScan
            │                       ├── table: default.tpch_test.t2
            │                       ├── output columns: [number (#1)]
            │                       ├── read rows: 14
            │                       ├── read size: < 1 KiB
            │                       ├── partitions total: 2
            │                       ├── partitions scanned: 2
            │                       ├── pruning stats: [segments: <range pruning: 2 to 2>, blocks: <range pruning: 2 to 2>]
            │                       ├── push downs: [filters: [], limit: NONE]
            │                       └── estimated rows: 14.00
            └── Exchange(Probe)
                ├── output columns: [t1.number (#0)]
                ├── exchange type: Hash(CAST(t1.number (#0) AS UInt64 NULL))
                └── TableScan
                    ├── table: default.tpch_test.t1
                    ├── output columns: [number (#0)]
                    ├── read rows: 5
                    ├── read size: < 1 KiB
                    ├── partitions total: 1
                    ├── partitions scanned: 1
                    ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
                    ├── push downs: [filters: [], limit: NONE]
                    └── estimated rows: 5.00

Now: the hash join build side reuses the data shuffle from the WindowPartition

Sort
├── output columns: [t1.number (#0), t2.number (#1)]
├── sort keys: [number ASC NULLS LAST]
├── estimated rows: 0.01
└── Exchange
    ├── output columns: [t1.number (#0), t2.number (#1), #_order_col]
    ├── exchange type: Merge
    └── Sort
        ├── output columns: [t1.number (#0), t2.number (#1), #_order_col]
        ├── sort keys: [number ASC NULLS LAST]
        ├── estimated rows: 0.01
        └── HashJoin
            ├── output columns: [t1.number (#0), t2.number (#1)]
            ├── join type: RIGHT OUTER
            ├── build keys: [t2.number (#1)]
            ├── probe keys: [t1.number (#0)]
            ├── filters: []
            ├── estimated rows: 0.01
            ├── Filter(Build)
            │   ├── output columns: [t2.number (#1)]
            │   ├── filters: [row_number() OVER ( PARTITION BY number ORDER BY number DESC ) (#6) = 1]
            │   ├── estimated rows: 0.01
            │   └── Window
            │       ├── output columns: [t2.number (#1), row_number() OVER ( PARTITION BY number ORDER BY number DESC ) (#6)]
            │       ├── aggregate function: [row_number]
            │       ├── partition by: [number]
            │       ├── order by: [number]
            │       ├── frame: [Range: Preceding(None) ~ CurrentRow]
            │       └── WindowPartition
            │           ├── output columns: [t2.number (#1)]
            │           ├── hash keys: [number]
            │           ├── estimated rows: 14.00
            │           └── Exchange
            │               ├── output columns: [t2.number (#1)]
            │               ├── exchange type: Hash(t2.number (#1))
            │               └── TableScan
            │                   ├── table: default.default.t2
            │                   ├── output columns: [number (#1)]
            │                   ├── read rows: 14
            │                   ├── read size: < 1 KiB
            │                   ├── partitions total: 2
            │                   ├── partitions scanned: 2
            │                   ├── pruning stats: [segments: <range pruning: 2 to 2>, blocks: <range pruning: 2 to 2>]
            │                   ├── push downs: [filters: [], limit: NONE]
            │                   └── estimated rows: 14.00
            └── Exchange(Probe)
                ├── output columns: [t1.number (#0)]
                ├── exchange type: Hash(t1.number (#0))
                └── TableScan
                    ├── table: default.default.t1
                    ├── output columns: [number (#0)]
                    ├── read rows: 5
                    ├── read size: < 1 KiB
                    ├── partitions total: 1
                    ├── partitions scanned: 1
                    ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
                    ├── push downs: [filters: [], limit: NONE]
                    └── estimated rows: 5.00

Example-2

EXPLAIN SELECT a, sum(number - 1) OVER ( PARTITION BY number % 3 ) FROM (SELECT number, rank() OVER ( PARTITION BY number % 3 ORDER BY number ) AS a FROM numbers(50)) AS t(number)

Before: there are two WindowPartition operators

Exchange
├── output columns: [numbers.number (#0), rank() OVER ( PARTITION BY number % 3 ORDER BY number ) (#2), sum_arg_0 (#3), sum_part_0 (#4), sum(number - 1) OVER ( PARTITION BY number % 3 ) (#5)]
├── exchange type: Merge
└── Window
    ├── output columns: [numbers.number (#0), rank() OVER ( PARTITION BY number % 3 ORDER BY number ) (#2), sum_arg_0 (#3), sum_part_0 (#4), sum(number - 1) OVER ( PARTITION BY number % 3 ) (#5)]
    ├── aggregate function: [sum(sum_arg_0)]
    ├── partition by: [sum_part_0]
    ├── order by: []
    ├── frame: [Range: Preceding(None) ~ Following(None)]
    └── WindowPartition
        ├── output columns: [numbers.number (#0), rank() OVER ( PARTITION BY number % 3 ORDER BY number ) (#2), sum_arg_0 (#3), sum_part_0 (#4)]
        ├── hash keys: [sum_part_0]
        ├── estimated rows: 50.00
        └── Exchange
            ├── output columns: [numbers.number (#0), rank() OVER ( PARTITION BY number % 3 ORDER BY number ) (#2), sum_arg_0 (#3), sum_part_0 (#4)]
            ├── exchange type: Hash(t.number (#0) % 3)
            └── EvalScalar
                ├── output columns: [numbers.number (#0), rank() OVER ( PARTITION BY number % 3 ORDER BY number ) (#2), sum_arg_0 (#3), sum_part_0 (#4)]
                ├── expressions: [t.number (#0) - 1, t.number (#0) % 3]
                ├── estimated rows: 50.00
                └── Window
                    ├── output columns: [numbers.number (#0), rank_part_0 (#1), rank() OVER ( PARTITION BY number % 3 ORDER BY number ) (#2)]
                    ├── aggregate function: [rank]
                    ├── partition by: [rank_part_0]
                    ├── order by: [number]
                    ├── frame: [Range: Preceding(None) ~ CurrentRow]
                    └── WindowPartition
                        ├── output columns: [numbers.number (#0), rank_part_0 (#1)]
                        ├── hash keys: [rank_part_0]
                        ├── estimated rows: 50.00
                        └── Exchange
                            ├── output columns: [numbers.number (#0), rank_part_0 (#1)]
                            ├── exchange type: Hash(numbers.number (#0) % 3)
                            └── EvalScalar
                                ├── output columns: [numbers.number (#0), rank_part_0 (#1)]
                                ├── expressions: [numbers.number (#0) % 3]
                                ├── estimated rows: 50.00
                                └── TableScan
                                    ├── table: default.system.numbers
                                    ├── output columns: [number (#0)]
                                    ├── read rows: 50
                                    ├── read size: < 1 KiB
                                    ├── partitions total: 1
                                    ├── partitions scanned: 1
                                    ├── push downs: [filters: [], limit: NONE]
                                    └── estimated rows: 50.00

Now: only one WindowPartition operator

Exchange
├── output columns: [numbers.number (#0), rank_part_0 (#1), rank() OVER ( PARTITION BY number % 3 ORDER BY number ) (#2), sum_arg_0 (#3), rank_part_0 (#1), sum(number - 1) OVER ( PARTITION BY number % 3 ) (#4)]
├── exchange type: Merge
└── Window
    ├── output columns: [numbers.number (#0), rank_part_0 (#1), rank() OVER ( PARTITION BY number % 3 ORDER BY number ) (#2), sum_arg_0 (#3), rank_part_0 (#1), sum(number - 1) OVER ( PARTITION BY number % 3 ) (#4)]
    ├── aggregate function: [sum(sum_arg_0)]
    ├── partition by: [rank_part_0]
    ├── order by: []
    ├── frame: [Range: Preceding(None) ~ Following(None)]
    └── EvalScalar
        ├── output columns: [numbers.number (#0), rank_part_0 (#1), rank() OVER ( PARTITION BY number % 3 ORDER BY number ) (#2), sum_arg_0 (#3), rank_part_0 (#1)]
        ├── expressions: [t.number (#0) - 1, t.number (#0) % 3]
        ├── estimated rows: 50.00
        └── Window
            ├── output columns: [numbers.number (#0), rank_part_0 (#1), rank() OVER ( PARTITION BY number % 3 ORDER BY number ) (#2)]
            ├── aggregate function: [rank]
            ├── partition by: [rank_part_0]
            ├── order by: [number]
            ├── frame: [Range: Preceding(None) ~ CurrentRow]
            └── WindowPartition
                ├── output columns: [numbers.number (#0), rank_part_0 (#1)]
                ├── hash keys: [rank_part_0]
                ├── estimated rows: 50.00
                └── Exchange
                    ├── output columns: [numbers.number (#0), rank_part_0 (#1)]
                    ├── exchange type: Hash(numbers.number (#0) % 3)
                    └── EvalScalar
                        ├── output columns: [numbers.number (#0), rank_part_0 (#1)]
                        ├── expressions: [numbers.number (#0) % 3]
                        ├── estimated rows: 50.00
                        └── TableScan
                            ├── table: default.system.numbers
                            ├── output columns: [number (#0)]
                            ├── read rows: 50
                            ├── read size: < 1 KiB
                            ├── partitions total: 1
                            ├── partitions scanned: 1
                            ├── push downs: [filters: [], limit: NONE]
                            └── estimated rows: 50.00

Close: #16360

Tests

  • Unit Test
  • Logic Test
  • Benchmark Test
  • No Test - Explain why

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Breaking Change (fix or feature that could cause existing functionality not to work as expected)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

This change is Reviewable

@github-actions github-actions bot added the pr-chore this PR only has small changes that no need to record, like coding styles. label Sep 1, 2024
@Dousir9 Dousir9 marked this pull request as ready for review September 9, 2024 12:54
@dosubot dosubot bot added the size:XL This PR changes 500-999 lines, ignoring generated files. label Sep 9, 2024
@dosubot dosubot bot added the A-planner Area: planner/optimizer label Sep 9, 2024
@sundy-li sundy-li enabled auto-merge September 10, 2024 01:01
@sundy-li sundy-li added this pull request to the merge queue Sep 10, 2024
@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Sep 10, 2024
@BohuTANG BohuTANG removed this pull request from the merge queue due to a manual request Sep 10, 2024
@BohuTANG BohuTANG merged commit c558b45 into databendlabs:main Sep 10, 2024
95 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-planner Area: planner/optimizer lgtm This PR has been approved by a maintainer pr-chore this PR only has small changes that no need to record, like coding styles. size:XL This PR changes 500-999 lines, ignoring generated files.
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

bug: Join filter pushdown incomplete
3 participants