Skip to content

Commit

Permalink
[improvement](mtmv) Not roll up when aggregate rewrite if roll up gro…
Browse files Browse the repository at this point in the history
…up by expr is uniform (#38387)

## Proposed changes

Not roll up when aggregate rewrite if roll up group by expr is uniform
Such as mv name is mv3_0, and def is:
```sql
CREATE MATERIALIZED VIEW mv3_0
        BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL
        DISTRIBUTED BY RANDOM BUCKETS 2
        PROPERTIES ('replication_num' = '1') 
        AS
select 
  o_orderdate, 
  o_shippriority, 
  o_comment, 
  sum(o_totalprice) as sum_total, 
  max(o_totalprice) as max_total, 
  min(o_totalprice) as min_total, 
  count(*) as count_all 
from 
  orders 
group by 
  o_orderdate, 
  o_shippriority, 
  o_comment;
```

query sql is as following:
```sql
select 
  o_comment, 
  sum(o_totalprice), 
  max(o_totalprice), 
  min(o_totalprice), 
  count(*) 
from 
  orders 
where 
  o_orderdate = '2023-12-09' 
  and o_shippriority = 1 
group by 
  o_comment;
```
after rewrite the plan is as following, not need to add aggregate
```
PhysicalResultSink
--filter((mv3_0.o_orderdate = '2023-12-09') and (mv3_0.o_shippriority = 1))
----PhysicalOlapScan[mv3_0]
```
  • Loading branch information
seawinde authored and dataroaring committed Aug 6, 2024
1 parent eb17f8a commit d73e5f2
Show file tree
Hide file tree
Showing 5 changed files with 774 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.properties.DataTrait;
import org.apache.doris.nereids.rules.analysis.NormalizeRepeat;
import org.apache.doris.nereids.rules.exploration.mv.AbstractMaterializedViewAggregateRule.AggregateExpressionRewriteContext.ExpressionRewriteMode;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.PlanCheckContext;
Expand All @@ -45,6 +46,8 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Repeat;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.trees.plans.visitor.ExpressionLineageReplacer;
Expand Down Expand Up @@ -113,7 +116,7 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
boolean queryContainsGroupSets = queryAggregate.getSourceRepeat().isPresent();
// If group by expression between query and view is equals, try to rewrite expression directly
if (!queryContainsGroupSets && isGroupByEquals(queryTopPlanAndAggPair, viewTopPlanAndAggPair,
viewToQuerySlotMapping, queryStructInfo, viewStructInfo, materializationContext,
viewToQuerySlotMapping, queryStructInfo, viewStructInfo, tempRewritedPlan, materializationContext,
cascadesContext)) {
List<Expression> rewrittenQueryExpressions = rewriteExpression(queryTopPlan.getOutput(),
queryTopPlan,
Expand Down Expand Up @@ -324,18 +327,21 @@ private boolean isGroupByEquals(Pair<Plan, LogicalAggregate<Plan>> queryTopPlanA
SlotMapping viewToQuerySlotMapping,
StructInfo queryStructInfo,
StructInfo viewStructInfo,
Plan tempRewrittenPlan,
MaterializationContext materializationContext,
CascadesContext cascadesContext) {

if (materializationContext instanceof SyncMaterializationContext) {
// For data correctness, should always add aggregate node if rewritten by sync materialized view
return false;
}
Plan queryTopPlan = queryTopPlanAndAggPair.key();
Plan viewTopPlan = viewTopPlanAndAggPair.key();
LogicalAggregate<Plan> queryAggregate = queryTopPlanAndAggPair.value();
LogicalAggregate<Plan> viewAggregate = viewTopPlanAndAggPair.value();

Set<Expression> queryGroupShuttledExpression = new HashSet<>();
for (Expression queryExpression : ExpressionUtils.shuttleExpressionWithLineage(
queryAggregate.getGroupByExpressions(), queryTopPlan, queryStructInfo.getTableBitSet())) {
queryGroupShuttledExpression.add(queryExpression);
}
Set<Expression> queryGroupShuttledExpression = new HashSet<>(ExpressionUtils.shuttleExpressionWithLineage(
queryAggregate.getGroupByExpressions(), queryTopPlan, queryStructInfo.getTableBitSet()));

// try to eliminate group by dimension by function dependency if group by expression is not in query
Map<Expression, Expression> viewShuttledExpressionQueryBasedToGroupByExpressionMap = new HashMap<>();
Expand All @@ -355,22 +361,112 @@ private boolean isGroupByEquals(Pair<Plan, LogicalAggregate<Plan>> queryTopPlanA
viewGroupExpressionQueryBased
);
}
if (queryGroupShuttledExpression.equals(viewShuttledExpressionQueryBasedToGroupByExpressionMap.values())) {
if (queryGroupShuttledExpression.equals(viewShuttledExpressionQueryBasedToGroupByExpressionMap.keySet())) {
// return true, if equals directly
return true;
}

boolean isGroupByEquals = false;
// check is equals by group by eliminate
isGroupByEquals |= isGroupByEqualsAfterGroupByEliminate(queryGroupShuttledExpression,
viewShuttledExpressionQueryBasedToGroupByExpressionMap,
groupByExpressionToViewShuttledExpressionQueryBasedMap,
viewAggregate,
cascadesContext);
// check is equals by equal filter eliminate
Optional<LogicalFilter<Plan>> filterOptional = tempRewrittenPlan.collectFirst(LogicalFilter.class::isInstance);
if (!filterOptional.isPresent()) {
return isGroupByEquals;
}
isGroupByEquals |= isGroupByEqualsAfterEqualFilterEliminate(
(LogicalPlan) tempRewrittenPlan,
queryGroupShuttledExpression,
viewShuttledExpressionQueryBasedToGroupByExpressionMap,
materializationContext);
return isGroupByEquals;
}

/**
* Check group by is equals by equal filter eliminate
* For example query is select a, b, c from t1 where a = 1 and d = 'xx' group by a, b, c;
* mv is select a, b, c, d from t1 group by a, b, c, d;
* the group by expression between query and view is equals after equal filter eliminate
* should not aggregate roll up
* */
private static boolean isGroupByEqualsAfterEqualFilterEliminate(
LogicalPlan tempRewrittenPlan,
Set<Expression> queryGroupShuttledExpression,
Map<Expression, Expression> viewShuttledExprQueryBasedToViewGroupByExprMap,
MaterializationContext materializationContext) {

Map<Expression, Expression> viewShuttledExprToScanExprMapping =
materializationContext.getShuttledExprToScanExprMapping().flattenMap().get(0);
Set<Expression> viewShuttledExprQueryBasedSet = viewShuttledExprQueryBasedToViewGroupByExprMap.keySet();
// view group by expr can not cover query group by expr
if (!viewShuttledExprQueryBasedSet.containsAll(queryGroupShuttledExpression)) {
return false;
}
Set<Expression> viewShouldUniformExpressionSet = new HashSet<>();
// calc the group by expr which is needed to roll up and should be uniform
for (Map.Entry<Expression, Expression> expressionEntry :
viewShuttledExprQueryBasedToViewGroupByExprMap.entrySet()) {
if (queryGroupShuttledExpression.contains(expressionEntry.getKey())) {
// the group expr which query has, do not require uniform
continue;
}
viewShouldUniformExpressionSet.add(expressionEntry.getValue());
}

DataTrait dataTrait = tempRewrittenPlan.computeDataTrait();
for (Expression shouldUniformExpr : viewShouldUniformExpressionSet) {
Expression viewScanExpression = viewShuttledExprToScanExprMapping.get(shouldUniformExpr);
if (viewScanExpression == null) {
return false;
}
if (!(viewScanExpression instanceof Slot)) {
return false;
}
if (!dataTrait.isUniform((Slot) viewScanExpression)) {
return false;
}
}
return true;
}

/**
* Check group by is equal or not after group by eliminate by functional dependency
* Such as query group by expression is (l_orderdate#1, l_supperkey#2)
* materialized view is group by expression is (l_orderdate#4, l_supperkey#5, l_partkey#6)
* materialized view expression mapping is
* {l_orderdate#4:l_orderdate#10, l_supperkey#5:l_supperkey#11, l_partkey#6:l_partkey#12}
* 1. viewShuttledExpressionQueryBasedToGroupByExpressionMap
* is {l_orderdate#1:l_orderdate#10, l_supperkey#2:l_supperkey#11}
* groupByExpressionToViewShuttledExpressionQueryBasedMap
* is {l_orderdate#10:l_orderdate#1, l_supperkey#11:l_supperkey#2:}
* 2. construct projects query used by view group expressions
* projects (l_orderdate#10, l_supperkey#11)
* 3. try to eliminate materialized view group expression
* projects (l_orderdate#10, l_supperkey#11)
* viewAggregate
* 4. check the viewAggregate group by expression is equals queryAggregate expression or not
*/
private static boolean isGroupByEqualsAfterGroupByEliminate(Set<Expression> queryGroupShuttledExpression,
Map<Expression, Expression> viewShuttledExpressionQueryBasedToGroupByExpressionMap,
Map<Expression, Expression> groupByExpressionToViewShuttledExpressionQueryBasedMap,
LogicalAggregate<Plan> viewAggregate,
CascadesContext cascadesContext) {
List<NamedExpression> projects = new ArrayList<>();
// construct projects query used by view group expressions
for (Expression expression : queryGroupShuttledExpression) {
if (!viewShuttledExpressionQueryBasedToGroupByExpressionMap.containsKey(expression)) {
// query group expression is not in view group by expression
Expression chosenExpression = viewShuttledExpressionQueryBasedToGroupByExpressionMap.get(expression);
if (chosenExpression == null) {
return false;
}
Expression chosenExpression = viewShuttledExpressionQueryBasedToGroupByExpressionMap.get(expression);
projects.add(chosenExpression instanceof NamedExpression
? (NamedExpression) chosenExpression : new Alias(chosenExpression));
}
LogicalProject<LogicalAggregate<Plan>> project = new LogicalProject<>(projects, viewAggregate);
// try to eliminate group by expression which is not in query group by expression
// try to eliminate view group by expression which is not in query group by expression
Plan rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getCteChildrenRewriter(childContext,
Expand All @@ -383,20 +479,21 @@ private boolean isGroupByEquals(Pair<Plan, LogicalAggregate<Plan>> queryTopPlanA
if (!aggreagateOptional.isPresent()) {
return false;
}
// check result after view group by eliminate by functional dependency
List<Expression> viewEliminatedGroupByExpressions = aggreagateOptional.get().getGroupByExpressions();
if (viewEliminatedGroupByExpressions.size() != queryGroupShuttledExpression.size()) {
return false;
}
Set<Expression> viewGroupShuttledExpressionQueryBased = new HashSet<>();
for (Expression viewExpression : aggreagateOptional.get().getGroupByExpressions()) {
if (!groupByExpressionToViewShuttledExpressionQueryBasedMap.containsKey(viewExpression)) {
Expression viewExpressionQueryBased =
groupByExpressionToViewShuttledExpressionQueryBasedMap.get(viewExpression);
if (viewExpressionQueryBased == null) {
return false;
}
viewGroupShuttledExpressionQueryBased.add(
groupByExpressionToViewShuttledExpressionQueryBasedMap.get(viewExpression));
viewGroupShuttledExpressionQueryBased.add(viewExpressionQueryBased);
}
return materializationContext instanceof SyncMaterializationContext ? false
: queryGroupShuttledExpression.equals(viewGroupShuttledExpressionQueryBased);
return queryGroupShuttledExpression.equals(viewGroupShuttledExpressionQueryBased);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query1_0_before --
2023-12-10 46.00 33.50 2 0

-- !shape1_0_after --
PhysicalResultSink
--hashAgg[GLOBAL]
----hashAgg[LOCAL]
------filter((mv1_0.o_custkey = 1) and (mv1_0.o_orderkey = 3))
--------PhysicalOlapScan[mv1_0]

-- !query1_0_after --
2023-12-10 46.00 33.50 2 0

-- !query2_0_before --
2 2 2 1.0 1.0 1 1

-- !shape2_0_after --
PhysicalResultSink
--hashAgg[DISTINCT_LOCAL]
----hashAgg[GLOBAL]
------hashAgg[LOCAL]
--------filter((mv2_0.o_orderkey = 1) and (mv2_0.o_orderstatus = 'o'))
----------PhysicalOlapScan[mv2_0]

-- !query2_0_after --
2 2 2 1.0 1.0 1 1

-- !query3_0_before --
yy 11.50 11.50 11.50 1

-- !shape3_0_after --
PhysicalResultSink
--filter((mv3_0.o_orderdate = '2023-12-09') and (mv3_0.o_shippriority = 1))
----PhysicalOlapScan[mv3_0]

-- !query3_0_after --
yy 11.50 11.50 11.50 1

-- !query3_1_before --
mi 56.20 56.20 56.20 1

-- !shape3_1_after --
PhysicalResultSink
--hashAgg[GLOBAL]
----hashAgg[LOCAL]
------filter((orders.o_orderdate = '2023-12-12') and (orders.o_shippriority = 2) and (orders.o_totalprice = 56.20))
--------PhysicalOlapScan[orders]

-- !query3_1_after --
mi 56.20 56.20 56.20 1

-- !query4_0_before --
yy 11.50 11.50 11.50 1

-- !query4_0_after --
yy 11.50 11.50 11.50 1

-- !query5_0_before --
3 2023-12-12 57.40 56.20 2 0

-- !shape5_0_after --
PhysicalResultSink
--hashAgg[GLOBAL]
----hashAgg[LOCAL]
------filter((mv5_0.l_partkey = 2) and (mv5_0.l_shipdate = '2023-12-12'))
--------PhysicalOlapScan[mv5_0]

-- !query5_0_after --
3 2023-12-12 57.40 56.20 2 0

-- !query6_0_before --
2 2 2 2 1.0 1.0 1 1

-- !shape6_0_after --
PhysicalResultSink
--hashAgg[DISTINCT_LOCAL]
----hashAgg[GLOBAL]
------hashAgg[LOCAL]
--------filter((mv6_0.o_orderkey = 1) and (mv6_0.o_orderstatus = 'o'))
----------PhysicalOlapScan[mv6_0]

-- !query6_0_after --
2 2 2 2 1.0 1.0 1 1

-- !query7_0_before --
yy 4 11.50 11.50 11.50 1

-- !shape7_0_after --
PhysicalResultSink
--filter((mv7_0.o_orderdate = '2023-12-09') and (mv7_0.o_shippriority = 1))
----PhysicalOlapScan[mv7_0]

-- !query7_0_after --
yy 4 11.50 11.50 11.50 1

-- !query7_1_before --
yy 4 11.50 11.50 11.50 1

-- !shape7_1_after --
PhysicalResultSink
--hashAgg[GLOBAL]
----hashAgg[LOCAL]
------hashJoin[INNER_JOIN] hashCondition=((t1.l_orderkey = orders.o_orderkey) and (t1.l_shipdate = orders.o_orderdate)) otherCondition=()
--------filter((t1.l_shipdate = '2023-12-09'))
----------PhysicalOlapScan[lineitem]
--------filter((orders.o_orderdate = '2023-12-09') and (orders.o_shippriority = 1) and (orders.o_totalprice = 11.50))
----------PhysicalOlapScan[orders]

-- !query7_1_after --
yy 4 11.50 11.50 11.50 1

-- !query8_0_before --
yy 4 11.50 11.50 11.50 1

-- !shape8_0_after --
PhysicalResultSink
--hashAgg[GLOBAL]
----hashAgg[LOCAL]
------filter((mv8_0.o_orderdate = '2023-12-09'))
--------PhysicalOlapScan[mv8_0]

-- !query8_0_after --
yy 4 11.50 11.50 11.50 1

Loading

0 comments on commit d73e5f2

Please sign in to comment.