Skip to content

Commit

Permalink
[fix](mtmv) Fix track partition column fail when date_trunc in group …
Browse files Browse the repository at this point in the history
…by (#36175)

This is brought by #35562

At the pr above when you create partition materialized view as
following, which would fail with the message:
Unable to find a suitable base table for partitioning

CREATE MATERIALIZED VIEW mvName
BUILD IMMEDIATE REFRESH AUTO ON MANUAL
PARTITION BY (date_trunc(month_alias, 'month'))
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES (
  'replication_num' = '1'
)
AS
SELECT date_trunc(`k2`,'day') AS month_alias, k3, count(*) 
FROM tableName GROUP BY date_trunc(`k2`,'day'), k3;

This pr supports to create partition materialized view when `date_trunc`
in group by cluause.
  • Loading branch information
seawinde authored Jun 21, 2024
1 parent faa2c17 commit 4c8e66b
Show file tree
Hide file tree
Showing 4 changed files with 369 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;

import java.util.ArrayList;
import java.util.BitSet;
Expand Down Expand Up @@ -312,58 +313,12 @@ private static final class MaterializedViewIncrementChecker extends

@Override
public Void visitLogicalProject(LogicalProject<? extends Plan> project, IncrementCheckerContext context) {
NamedExpression mvPartitionColumn = context.getMvPartitionColumn();
List<Slot> output = project.getOutput();
if (context.getMvPartitionColumn().isColumnFromTable()) {
return visit(project, context);
}
for (Slot projectSlot : output) {
if (!projectSlot.equals(mvPartitionColumn.toSlot())) {
continue;
}
if (projectSlot.isColumnFromTable()) {
context.setMvPartitionColumn(projectSlot);
} else {
// should be only use date_trunc
Expression shuttledExpression =
ExpressionUtils.shuttleExpressionWithLineage(projectSlot, project, new BitSet());
// merge date_trunc
shuttledExpression = new ExpressionNormalization().rewrite(shuttledExpression,
new ExpressionRewriteContext(context.getCascadesContext()));

List<Expression> expressions = shuttledExpression.collectToList(Expression.class::isInstance);
for (Expression expression : expressions) {
if (SUPPORT_EXPRESSION_TYPES.stream().noneMatch(
supportExpression -> supportExpression.isAssignableFrom(expression.getClass()))) {
context.addFailReason(
String.format("partition column use invalid implicit expression, invalid "
+ "expression is %s", expression));
return null;
}
}
List<DateTrunc> dataTruncExpressions =
shuttledExpression.collectToList(DateTrunc.class::isInstance);
if (dataTruncExpressions.size() != 1) {
// mv time unit level is little then query
context.addFailReason("partition column time unit level should be "
+ "greater than sql select column");
return null;
}
Optional<Slot> columnExpr =
shuttledExpression.getArgument(0).collectFirst(Slot.class::isInstance);
if (!columnExpr.isPresent() || !columnExpr.get().isColumnFromTable()) {
context.addFailReason(String.format("partition reference column should be direct column "
+ "rather then expression except date_trunc, columnExpr is %s", columnExpr));
return null;
}
context.setPartitionExpression(shuttledExpression);
context.setMvPartitionColumn(columnExpr.get());
}
return visit(project, context);
boolean isValid = checkPartition(output, project, context);
if (!isValid) {
return null;
}
context.addFailReason(String.format("partition reference column should be direct column "
+ "rather then expression except date_trunc, current project is %s", project));
return null;
return visit(project, context);
}

@Override
Expand Down Expand Up @@ -465,18 +420,8 @@ public Void visitLogicalAggregate(LogicalAggregate<? extends Plan> aggregate,
context.addFailReason("group by sets is empty, doesn't contain the target partition");
return null;
}
Set<Column> originalGroupbyExprSet = new HashSet<>();
groupByExprSet.forEach(groupExpr -> {
if (groupExpr instanceof SlotReference && groupExpr.isColumnFromTable()) {
originalGroupbyExprSet.add(((SlotReference) groupExpr).getColumn().get());
}
});
SlotReference contextPartitionColumn = getContextPartitionColumn(context);
if (contextPartitionColumn == null) {
return null;
}
if (!originalGroupbyExprSet.contains(contextPartitionColumn.getColumn().get())) {
context.addFailReason("group by sets doesn't contain the target partition");
boolean isValid = checkPartition(groupByExprSet, aggregate, context);
if (!isValid) {
return null;
}
return visit(aggregate, context);
Expand Down Expand Up @@ -508,6 +453,8 @@ public Void visit(Plan plan, IncrementCheckerContext context) {
|| plan instanceof LogicalWindow) {
return super.visit(plan, context);
}
context.addFailReason(String.format("Unsupported plan operate in track partition, "
+ "the invalid plan node is %s", plan.getClass().getSimpleName()));
return null;
}

Expand Down Expand Up @@ -543,6 +490,99 @@ private SlotReference getContextPartitionColumn(IncrementCheckerContext context)
}
return (SlotReference) context.getMvPartitionColumn();
}

/**
* Given a partition named expression and expressionsToCheck, check the partition is valid
* example 1:
* partition expression is date_trunc(date_alias#25, 'hour') AS `date_trunc(date_alias, 'hour')`#30
* expressionsToCheck is date_trunc(date_alias, 'hour')#30
* expressionsToCheck is the slot to partition expression, but they are expression
* example 2:
* partition expression is L_SHIPDATE#10
* expressionsToCheck isL_SHIPDATE#10
* both of them are slot
* example 3:
* partition expression is date_trunc(L_SHIPDATE#10, 'hour')#30
* expressionsToCheck is L_SHIPDATE#10
* all above should check successfully
* */
private static boolean checkPartition(Collection<? extends Expression> expressionsToCheck, Plan plan,
IncrementCheckerContext context) {
NamedExpression partitionColumn = context.getMvPartitionColumn();
for (Expression projectSlot : expressionsToCheck) {
if (projectSlot.isColumnFromTable() && projectSlot.equals(partitionColumn.toSlot())) {
continue;
}
// check the expression which use partition column
Expression expressionToCheck =
ExpressionUtils.shuttleExpressionWithLineage(projectSlot, plan, new BitSet());
// merge date_trunc
expressionToCheck = new ExpressionNormalization().rewrite(expressionToCheck,
new ExpressionRewriteContext(context.getCascadesContext()));

Expression partitionExpression = context.getPartitionExpression().isPresent()
? context.getPartitionExpression().get() :
ExpressionUtils.shuttleExpressionWithLineage(partitionColumn, plan, new BitSet());
// merge date_trunc
partitionExpression = new ExpressionNormalization().rewrite(partitionExpression,
new ExpressionRewriteContext(context.getCascadesContext()));

Set<SlotReference> expressionToCheckColumns =
expressionToCheck.collectToSet(SlotReference.class::isInstance);
Set<SlotReference> partitionColumns =
partitionExpression.collectToSet(SlotReference.class::isInstance);
if (Sets.intersection(expressionToCheckColumns, partitionColumns).isEmpty()
|| expressionToCheckColumns.isEmpty() || partitionColumns.isEmpty()) {
// this expression doesn't use partition column
continue;
}
if (expressionToCheckColumns.size() > 1 || partitionColumns.size() > 1) {
context.addFailReason(
String.format("partition expression use more than one slot reference, invalid "
+ "expressionToCheckColumns is %s, partitionColumnDateColumns is %s",
expressionToCheckColumns, partitionColumns));
return false;
}
List<Expression> expressions = expressionToCheck.collectToList(Expression.class::isInstance);
for (Expression expression : expressions) {
if (SUPPORT_EXPRESSION_TYPES.stream().noneMatch(
supportExpression -> supportExpression.isAssignableFrom(expression.getClass()))) {
context.addFailReason(
String.format("column to check use invalid implicit expression, invalid "
+ "expression is %s", expression));
return false;
}
}
List<Expression> partitionExpressions = partitionExpression.collectToList(
Expression.class::isInstance);
for (Expression expression : partitionExpressions) {
if (SUPPORT_EXPRESSION_TYPES.stream().noneMatch(
supportExpression -> supportExpression.isAssignableFrom(expression.getClass()))) {
context.addFailReason(
String.format("partition column use invalid implicit expression, invalid "
+ "expression is %s", expression));
return false;
}
}
List<DateTrunc> expressionToCheckDataTruncList =
expressionToCheck.collectToList(DateTrunc.class::isInstance);
List<DateTrunc> partitionColumnDateTrucList =
partitionExpression.collectToList(DateTrunc.class::isInstance);
if (expressionToCheckDataTruncList.size() > 1 || partitionColumnDateTrucList.size() > 1) {
// mv time unit level is little then query
context.addFailReason("partition column time unit level should be "
+ "greater than sql select column");
return false;
}
if (!partitionColumn.isColumnFromTable()) {
context.setMvPartitionColumn(partitionColumns.iterator().next());
}
if (!context.getPartitionExpression().isPresent()) {
context.setPartitionExpression(partitionExpression);
}
}
return true;
}
}

private static final class IncrementCheckerContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,99 @@ public void testPartitionDateTruncShouldNotTrack() {
});
}

@Test
public void testPartitionDateTruncShouldTrack() {
PlanChecker.from(connectContext)
.checkExplain("SELECT date_trunc(t1.L_SHIPDATE, 'day') as date_alias, t2.O_ORDERDATE, t1.L_QUANTITY, t2.O_ORDERSTATUS, "
+ "count(distinct case when t1.L_SUPPKEY > 0 then t2.O_ORDERSTATUS else null end) as cnt_1 "
+ "from "
+ " (select * from "
+ " lineitem "
+ " where L_SHIPDATE in ('2017-01-30')) t1 "
+ "left join "
+ " (select * from "
+ " orders "
+ " where O_ORDERDATE in ('2017-01-30')) t2 "
+ "on t1.L_ORDERKEY = t2.O_ORDERKEY "
+ "group by "
+ "t1.L_SHIPDATE, "
+ "t2.O_ORDERDATE, "
+ "t1.L_QUANTITY, "
+ "t2.O_ORDERSTATUS;",
nereidsPlanner -> {
Plan rewrittenPlan = nereidsPlanner.getRewrittenPlan();
RelatedTableInfo relatedTableInfo =
MaterializedViewUtils.getRelatedTableInfo("date_alias", "month",
rewrittenPlan, nereidsPlanner.getCascadesContext());
checkRelatedTableInfo(relatedTableInfo,
"lineitem",
"L_SHIPDATE",
true);
});
}

@Test
public void testPartitionDateTruncInGroupByShouldTrack() {
PlanChecker.from(connectContext)
.checkExplain("SELECT date_trunc(t1.L_SHIPDATE, 'day') as date_alias, t2.O_ORDERDATE, t1.L_QUANTITY, t2.O_ORDERSTATUS, "
+ "count(distinct case when t1.L_SUPPKEY > 0 then t2.O_ORDERSTATUS else null end) as cnt_1 "
+ "from "
+ " (select * from "
+ " lineitem "
+ " where L_SHIPDATE in ('2017-01-30')) t1 "
+ "left join "
+ " (select * from "
+ " orders "
+ " where O_ORDERDATE in ('2017-01-30')) t2 "
+ "on t1.L_ORDERKEY = t2.O_ORDERKEY "
+ "group by "
+ "date_alias, "
+ "t2.O_ORDERDATE, "
+ "t1.L_QUANTITY, "
+ "t2.O_ORDERSTATUS;",
nereidsPlanner -> {
Plan rewrittenPlan = nereidsPlanner.getRewrittenPlan();
RelatedTableInfo relatedTableInfo =
MaterializedViewUtils.getRelatedTableInfo("date_alias", "month",
rewrittenPlan, nereidsPlanner.getCascadesContext());
checkRelatedTableInfo(relatedTableInfo,
"lineitem",
"L_SHIPDATE",
true);
});
}

@Test
public void testPartitionDateTruncExpressionInGroupByShouldTrack() {
PlanChecker.from(connectContext)
.checkExplain("SELECT date_trunc(t1.L_SHIPDATE, 'day') as date_alias, t2.O_ORDERDATE, t1.L_QUANTITY, t2.O_ORDERSTATUS, "
+ "count(distinct case when t1.L_SUPPKEY > 0 then t2.O_ORDERSTATUS else null end) as cnt_1 "
+ "from "
+ " (select * from "
+ " lineitem "
+ " where L_SHIPDATE in ('2017-01-30')) t1 "
+ "left join "
+ " (select * from "
+ " orders "
+ " where O_ORDERDATE in ('2017-01-30')) t2 "
+ "on t1.L_ORDERKEY = t2.O_ORDERKEY "
+ "group by "
+ "date_trunc(t1.L_SHIPDATE, 'day'), "
+ "t2.O_ORDERDATE, "
+ "t1.L_QUANTITY, "
+ "t2.O_ORDERSTATUS;",
nereidsPlanner -> {
Plan rewrittenPlan = nereidsPlanner.getRewrittenPlan();
RelatedTableInfo relatedTableInfo =
MaterializedViewUtils.getRelatedTableInfo("date_alias", "month",
rewrittenPlan, nereidsPlanner.getCascadesContext());
checkRelatedTableInfo(relatedTableInfo,
"lineitem",
"L_SHIPDATE",
true);
});
}

@Test
public void getRelatedTableInfoWhenMultiBaseTablePartition() {
PlanChecker.from(connectContext)
Expand Down
60 changes: 45 additions & 15 deletions regression-test/data/mtmv_p0/test_rollup_partition_mtmv.out
Original file line number Diff line number Diff line change
@@ -1,18 +1,33 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !date_list_month --
1 2020-01-01
2 2020-01-02
3 2020-02-01
1 2020-01-01 2020-01-01
2 2020-01-02 2020-01-02
3 2020-02-01 2020-02-01

-- !date_list_month_partition_by_column --
2020-01-01 1 2020-01-01
2020-01-01 2 2020-01-02
2020-02-01 3 2020-02-01
2020-01-01 1 2020-01-01 2020-01-01
2020-01-01 2 2020-01-02 2020-01-02
2020-02-01 3 2020-02-01 2020-02-01

-- !date_list_month_level --
2020-01-01 1 2020-01-01
2020-01-02 2 2020-01-02
2020-02-01 3 2020-02-01
2020-01-01 1 2020-01-01 2020-01-01
2020-01-02 2 2020-01-02 2020-01-02
2020-02-01 3 2020-02-01 2020-02-01

-- !date_list_month_level_agg --
2020-01-01 1 1
2020-01-02 2 1
2020-02-01 3 1

-- !date_list_month_level_agg_multi --
2020-01-01 2020-01-01 1
2020-01-02 2020-01-02 1
2020-02-01 2020-02-01 1

-- !date_list_month_level_agg --
2020-01-01 1
2020-01-02 1
2020-02-01 1

-- !date_list_year_partition_by_column --

Expand All @@ -22,17 +37,32 @@
3 2020==02==01

-- !date_range_month --
1 2020-01-01
2 2020-01-02
3 2020-02-01
1 2020-01-01 2020-01-01
2 2020-01-02 2020-01-02
3 2020-02-01 2020-02-01

-- !date_range_month_partition_by_column --
2020-01-01 1 2020-01-01
2020-01-01 2 2020-01-02
2020-02-01 3 2020-02-01
2020-01-01 1 2020-01-01 2020-01-01
2020-01-01 2 2020-01-02 2020-01-02
2020-02-01 3 2020-02-01 2020-02-01

-- !date_range_month_level --
2020-01-01
2020-01-02
2020-02-01

-- !date_range_month_level_agg --
2020-01-01 1 1
2020-01-02 2 1
2020-02-01 3 1

-- !date_range_month_level_agg_multi --
2020-01-01 1 1
2020-01-02 2 1
2020-02-01 3 1

-- !date_range_month_level_agg_direct --
2020-01-01 1
2020-01-02 1
2020-02-01 1

Loading

0 comments on commit 4c8e66b

Please sign in to comment.