Skip to content

Commit

Permalink
[refactor](Nereids) refactor column pruning (apache#17579)
Browse files Browse the repository at this point in the history
This pr refactor the column pruning by the visitor, the good sides
1. easy to provide ability of column pruning for new plan by implement the interface `OutputPrunable` if the plan contains output field or do nothing if not contains output field, don't need to add new rule like `PruneXxxChildColumns`, few scenarios need to override the visit function to write special logic, like prune the LogicalSetOperation and Aggregate
2. support shrink output field in some plans, this can skip some useless operations so improvement

example:
```sql
select id 
from (
  select id, sum(age)
  from student
  group by id
)a
```

we should prune the useless `sum (age)` in the aggregate.
before refactor:
```
LogicalProject ( distinct=false, projects=[id#0], excepts=[], canEliminate=true )
+--LogicalSubQueryAlias ( qualifier=[a] )
   +--LogicalAggregate ( groupByExpr=[id#0], outputExpr=[id#0, sum(age#2) AS `sum(age)`#4], hasRepeat=false )
      +--LogicalProject ( distinct=false, projects=[id#0, age#2], excepts=[], canEliminate=true )
         +--LogicalOlapScan ( qualified=default_cluster:test.student, indexName=<index_not_selected>, selectedIndexId=10007, preAgg=ON )
```

after refactor:
```
LogicalProject ( distinct=false, projects=[id#0], excepts=[], canEliminate=true )
+--LogicalSubQueryAlias ( qualifier=[a] )
   +--LogicalAggregate ( groupByExpr=[id#0], outputExpr=[id#0], hasRepeat=false )
      +--LogicalProject ( distinct=false, projects=[id#0], excepts=[], canEliminate=true )
         +--LogicalOlapScan ( qualified=default_cluster:test.student, indexName=<index_not_selected>, selectedIndexId=10007, preAgg=ON )
```
  • Loading branch information
924060929 authored Mar 24, 2023
1 parent 678314d commit d3e7f12
Show file tree
Hide file tree
Showing 44 changed files with 628 additions and 583 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ public void updateMaterializedSlots() {
// why output and intermediate may have different materialized slots?
// because some slot is materialized by materializeSrcExpr method directly
// in that case, only output slots is materialized
// assume output tuple has correct marterialized infomation
// assume output tuple has correct materialized information
// we update intermediate tuple and materializedSlots based on output tuple
materializedSlots.clear();
ArrayList<SlotDescriptor> outputSlots = outputTupleDesc.getSlots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1671,7 +1671,11 @@ public PlanFragment visitPhysicalGenerate(PhysicalGenerate<? extends Plan> gener
.map(e -> ExpressionTranslator.translate(e, context))
.collect(Collectors.toCollection(ArrayList::new));
TupleDescriptor tupleDescriptor = generateTupleDesc(generate.getGeneratorOutput(), null, context);
List<SlotId> outputSlotIds = Stream.concat(currentFragment.getPlanRoot().getTupleIds().stream(),
List<TupleId> childOutputTupleIds = currentFragment.getPlanRoot().getOutputTupleIds();
if (childOutputTupleIds == null || childOutputTupleIds.isEmpty()) {
childOutputTupleIds = currentFragment.getPlanRoot().getTupleIds();
}
List<SlotId> outputSlotIds = Stream.concat(childOutputTupleIds.stream(),
Stream.of(tupleDescriptor.getId()))
.map(id -> context.getTupleDesc(id).getSlots())
.flatMap(List::stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.RewriteJob;
import org.apache.doris.nereids.rules.RuleFactory;
import org.apache.doris.nereids.rules.RuleSet;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.analysis.AdjustAggregateNullableForEmptySet;
Expand Down Expand Up @@ -56,13 +57,16 @@
import org.apache.doris.nereids.rules.rewrite.logical.MergeProjects;
import org.apache.doris.nereids.rules.rewrite.logical.MergeSetOperations;
import org.apache.doris.nereids.rules.rewrite.logical.NormalizeAggregate;
import org.apache.doris.nereids.rules.rewrite.logical.NormalizeSort;
import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanPartition;
import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet;
import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownLimit;
import org.apache.doris.nereids.rules.rewrite.logical.ReorderJoin;
import org.apache.doris.nereids.rules.rewrite.logical.SplitLimit;

import com.google.common.collect.ImmutableList;

import java.util.List;

/**
Expand All @@ -89,7 +93,7 @@ public class NereidsRewriter extends BatchRewriteJob {

// ExtractSingleTableExpressionFromDisjunction conflict to InPredicateToEqualToRule
// in the ExpressionNormalization, so must invoke in another job, or else run into
// deep loop
// dead loop
topDown(
new ExtractSingleTableExpressionFromDisjunction()
)
Expand Down Expand Up @@ -117,10 +121,11 @@ public class NereidsRewriter extends BatchRewriteJob {

// The rule modification needs to be done after the subquery is unnested,
// because for scalarSubQuery, the connection condition is stored in apply in the analyzer phase,
// but when normalizeAggregate is performed, the members in apply cannot be obtained,
// but when normalizeAggregate/normalizeSort is performed, the members in apply cannot be obtained,
// resulting in inconsistent output results and results in apply
topDown(
new NormalizeAggregate()
new NormalizeAggregate(),
new NormalizeSort()
),

topDown(
Expand All @@ -138,71 +143,86 @@ public class NereidsRewriter extends BatchRewriteJob {
),

topic("Rewrite join",
// infer not null filter, then push down filter, and then reorder join(cross join to inner join)
topDown(
new InferFilterNotNull(),
new InferJoinNotNull()
),
// ReorderJoin depends PUSH_DOWN_FILTERS
// the PUSH_DOWN_FILTERS depends on lots of rules, e.g. merge project, eliminate outer,
// sometimes transform the bottom plan make some rules usable which can apply to the top plan,
// but top-down traverse can not cover this case in one iteration, so bottom-up is more
// efficient because it can find the new plans and apply transform wherever it is
bottomUp(RuleSet.PUSH_DOWN_FILTERS),

topDown(
new MergeFilters(),
new ReorderJoin(),
new PushFilterInsideJoin(),
new FindHashConditionForJoin(),
new ConvertInnerOrCrossJoin(),
new EliminateNullAwareLeftAntiJoin()
),
topDown(
new EliminateDedupJoinCondition()
)
// infer not null filter, then push down filter, and then reorder join(cross join to inner join)
topDown(
new InferFilterNotNull(),
new InferJoinNotNull()
),
// ReorderJoin depends PUSH_DOWN_FILTERS
// the PUSH_DOWN_FILTERS depends on lots of rules, e.g. merge project, eliminate outer,
// sometimes transform the bottom plan make some rules usable which can apply to the top plan,
// but top-down traverse can not cover this case in one iteration, so bottom-up is more
// efficient because it can find the new plans and apply transform wherever it is
bottomUp(RuleSet.PUSH_DOWN_FILTERS),

topDown(
new MergeFilters(),
new ReorderJoin(),
new PushFilterInsideJoin(),
new FindHashConditionForJoin(),
new ConvertInnerOrCrossJoin(),
new EliminateNullAwareLeftAntiJoin()
),
topDown(
new EliminateDedupJoinCondition()
)
),

topic("Column pruning and infer predicate",
topDown(new ColumnPruning()),
custom(RuleType.COLUMN_PRUNING, () -> new ColumnPruning()),

custom(RuleType.INFER_PREDICATES, () -> new InferPredicates()),
custom(RuleType.INFER_PREDICATES, () -> new InferPredicates()),

// column pruning create new project, so we should use PUSH_DOWN_FILTERS
// to change filter-project to project-filter
bottomUp(RuleSet.PUSH_DOWN_FILTERS),
// column pruning create new project, so we should use PUSH_DOWN_FILTERS
// to change filter-project to project-filter
bottomUp(RuleSet.PUSH_DOWN_FILTERS),

// after eliminate outer join in the PUSH_DOWN_FILTERS, we can infer more predicate and push down
custom(RuleType.INFER_PREDICATES, () -> new InferPredicates()),
// after eliminate outer join in the PUSH_DOWN_FILTERS, we can infer more predicate and push down
custom(RuleType.INFER_PREDICATES, () -> new InferPredicates()),

bottomUp(RuleSet.PUSH_DOWN_FILTERS),
bottomUp(RuleSet.PUSH_DOWN_FILTERS),

// after eliminate outer join, we can move some filters to join.otherJoinConjuncts,
// this can help to translate plan to backend
topDown(
new PushFilterInsideJoin()
)
// after eliminate outer join, we can move some filters to join.otherJoinConjuncts,
// this can help to translate plan to backend
topDown(
new PushFilterInsideJoin()
)
),

// this rule should invoke after ColumnPruning
custom(RuleType.ELIMINATE_UNNECESSARY_PROJECT, () -> new EliminateUnnecessaryProject()),

// we need to execute this rule at the end of rewrite
// to avoid two consecutive same project appear when we do optimization.
topic("Others optimization", topDown(
topic("Others optimization",
bottomUp(ImmutableList.<RuleFactory>builder().addAll(ImmutableList.of(
new EliminateNotNull(),
new EliminateLimit(),
new EliminateFilter(),
new PruneOlapScanPartition(),
new SelectMaterializedIndexWithAggregate(),
new SelectMaterializedIndexWithoutAggregate(),
new PruneOlapScanTablet(),
new EliminateAggregate(),
new MergeSetOperations(),
new PushdownLimit(),
new SplitLimit(),
new BuildAggForUnion()
)),
// after eliminate filter, the project maybe can push down again,
// so we add push down rules
)).addAll(RuleSet.PUSH_DOWN_FILTERS).build())
),

// TODO: I think these rules should be implementation rules, and generate alternative physical plans.
topic("Table/MV/Physical optimization",
topDown(
// TODO: the logical plan should not contains any phase information,
// we should refactor like AggregateStrategies, e.g. LimitStrategies,
// generate one PhysicalLimit if current distribution is gather or two
// PhysicalLimits with gather exchange
new SplitLimit(),

new SelectMaterializedIndexWithAggregate(),
new SelectMaterializedIndexWithoutAggregate(),
new PruneOlapScanTablet(),
new PruneOlapScanPartition()
)
),

// this rule batch must keep at the end of rewrite to do some plan check
topic("Final rewrite and check", bottomUp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public enum RuleType {

// rewrite rules
NORMALIZE_AGGREGATE(RuleTypeClass.REWRITE),
NORMALIZE_SORT(RuleTypeClass.REWRITE),
NORMALIZE_REPEAT(RuleTypeClass.REWRITE),
EXTRACT_AND_NORMALIZE_WINDOW_EXPRESSIONS(RuleTypeClass.REWRITE),
CHECK_AND_STANDARDIZE_WINDOW_FUNCTION_AND_FRAME(RuleTypeClass.REWRITE),
Expand Down Expand Up @@ -128,14 +129,7 @@ public enum RuleType {
PUSHDOWN_PROJECT_THROUGH_LIMIT(RuleTypeClass.REWRITE),
PUSHDOWN_ALIAS_THROUGH_JOIN(RuleTypeClass.REWRITE),
PUSHDOWN_FILTER_THROUGH_SET_OPERATION(RuleTypeClass.REWRITE),
// column prune rules,
COLUMN_PRUNE_AGGREGATION_CHILD(RuleTypeClass.REWRITE),
COLUMN_PRUNE_FILTER_CHILD(RuleTypeClass.REWRITE),
PRUNE_ONE_ROW_RELATION_COLUMN(RuleTypeClass.REWRITE),
COLUMN_PRUNE_SORT_CHILD(RuleTypeClass.REWRITE),
COLUMN_PRUNE_SORT(RuleTypeClass.REWRITE),
COLUMN_PRUNE_JOIN_CHILD(RuleTypeClass.REWRITE),
COLUMN_PRUNE_REPEAT_CHILD(RuleTypeClass.REWRITE),
COLUMN_PRUNING(RuleTypeClass.REWRITE),
// expression of plan rewrite
REWRITE_ONE_ROW_RELATION_EXPRESSION(RuleTypeClass.REWRITE),
REWRITE_PROJECT_EXPRESSION(RuleTypeClass.REWRITE),
Expand Down
Loading

0 comments on commit d3e7f12

Please sign in to comment.