Skip to content

Commit

Permalink
[fix](mtmv) Fix data wrong if base table add new partition when query…
Browse files Browse the repository at this point in the history
… rewrite by partition rolled up mv (#36414)

This is brought by #35562 

When mv is partition rolled up mv, which is rolled up by date_trunc. If
base table add new partition.
if query rewrite successfully by the partition mv, the data will lost
the new partition data. This pr fix this problem. For example as following:

mv def is:

CREATE MATERIALIZED VIEW roll_up_mv
BUILD IMMEDIATE REFRESH AUTO ON MANUAL
partition by (date_trunc(`col1`, 'month'))
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS
select date_trunc(`l_shipdate`, 'day') as col1, l_shipdate, o_orderdate, l_partkey,
   l_suppkey, sum(o_totalprice) as sum_total
   from lineitem
left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate
   group by
   col1,
   l_shipdate,
   o_orderdate,
   l_partkey,
   l_suppkey;

if run the insert comand

insert into lineitem values
    (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-11-21', '2023-11-21', '2023-11-21', 'a', 'b', 'yyyyyyyyy');

then run query as following, result will not return the 2023-11-21 partition data

select date_trunc(`l_shipdate`, 'day') as col1, l_shipdate, o_orderdate, l_partkey,
   l_suppkey, sum(o_totalprice) as sum_total
   from lineitem
left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate
   group by
   col1,
   l_shipdate,
   o_orderdate,
   l_partkey,
   l_suppkey;
  • Loading branch information
seawinde authored Jun 21, 2024
1 parent 86fc14e commit 5e009b5
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,23 +274,38 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
}
if (invalidPartitions == null) {
// if mv can not offer any partition for query, query rewrite bail out to avoid cycle run
materializationContext.recordFailReason(queryStructInfo,
"mv can not offer any partition for query",
() -> String.format("mv partition info %s",
((AsyncMaterializationContext) materializationContext).getMtmv()
.getMvPartitionInfo()));
return rewriteResults;
}
boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext);
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> finalInvalidPartitions =
invalidPartitions;
if (partitionNeedUnion) {
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
Plan originPlanWithFilter = StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(),
mtmv.getMvPartitionInfo().getPartitionCol(), cascadesContext);
if (finalInvalidPartitions.value().isEmpty() || originPlanWithFilter == null) {
Pair<Plan, Boolean> planAndNeedAddFilterPair =
StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(),
mtmv.getMvPartitionInfo().getRelatedCol(), cascadesContext);
if (planAndNeedAddFilterPair == null) {
materializationContext.recordFailReason(queryStructInfo,
"Add filter to base table fail when union rewrite",
() -> String.format("invalidPartitions are %s, queryPlan is %s, partition column is %s",
invalidPartitions, queryPlan.treeString(),
mtmv.getMvPartitionInfo().getPartitionCol()));
continue;
}
if (finalInvalidPartitions.value().isEmpty() || !planAndNeedAddFilterPair.value()) {
// if invalid base table filter is empty or doesn't need to add filter on base table,
// only need remove mv invalid partition
rewrittenPlan = rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key());
} else {
// For rewrittenPlan which contains materialized view should remove invalid partition ids
List<Plan> children = Lists.newArrayList(
rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key()),
originPlanWithFilter);
planAndNeedAddFilterPair.key());
// Union query materialized view and source table
rewrittenPlan = new LogicalUnion(Qualifier.ALL,
queryPlan.getOutput().stream().map(NamedExpression.class::cast)
Expand Down Expand Up @@ -452,6 +467,7 @@ protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>>
// If related base table create partitions or mv is created with ttl, need base table union
Sets.difference(queryUsedBaseTablePartitionNameSet, mvValidBaseTablePartitionNameSet)
.copyInto(baseTableNeedUnionPartitionNameSet);
// Construct result map
Map<BaseTableInfo, Set<String>> mvPartitionNeedRemoveNameMap = new HashMap<>();
if (!mvNeedRemovePartitionNameSet.isEmpty()) {
mvPartitionNeedRemoveNameMap.put(new BaseTableInfo(mtmv), mvNeedRemovePartitionNameSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,26 +728,32 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,

/**
* Add filter on table scan according to table filter map
*
* @return Pair(Plan, Boolean) first is the added filter plan, value is the identifier that represent whether
* need to add filter.
* return null if add filter fail.
*/
public static Plan addFilterOnTableScan(Plan queryPlan, Map<BaseTableInfo, Set<String>> partitionOnOriginPlan,
String partitionColumn,
CascadesContext parentCascadesContext) {
public static Pair<Plan, Boolean> addFilterOnTableScan(Plan queryPlan, Map<BaseTableInfo,
Set<String>> partitionOnOriginPlan, String partitionColumn, CascadesContext parentCascadesContext) {
// Firstly, construct filter form invalid partition, this filter should be added on origin plan
PredicateAddContext predicateAddContext = new PredicateAddContext(partitionOnOriginPlan, partitionColumn);
Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(),
predicateAddContext);
if (!predicateAddContext.isAddSuccess()) {
if (!predicateAddContext.isHandleSuccess()) {
return null;
}
if (!predicateAddContext.isNeedAddFilter()) {
return Pair.of(queryPlan, false);
}
// Deep copy the plan to avoid the plan output is the same with the later union output, this may cause
// exec by mistake
queryPlanWithUnionFilter = new LogicalPlanDeepCopier().deepCopy(
(LogicalPlan) queryPlanWithUnionFilter, new DeepCopierContext());
// rbo rewrite after adding filter on origin plan
return MaterializedViewUtils.rewriteByRules(parentCascadesContext, context -> {
return Pair.of(MaterializedViewUtils.rewriteByRules(parentCascadesContext, context -> {
Rewriter.getWholeTreeRewriter(context).execute();
return context.getRewritePlan();
}, queryPlanWithUnionFilter, queryPlan);
}, queryPlanWithUnionFilter, queryPlan), true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
Expand All @@ -49,7 +50,6 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
Expand Down Expand Up @@ -227,7 +227,6 @@ public Plan visitUnboundRelation(UnboundRelation unboundRelation, PredicateAddCo
unboundRelation.getNameParts());
TableIf table = RelationUtil.getTable(tableQualifier, Env.getCurrentEnv());
if (predicates.getPredicates().containsKey(table)) {
predicates.setAddSuccess(true);
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.getPredicates().get(table))),
unboundRelation);
}
Expand Down Expand Up @@ -262,48 +261,55 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
if (predicates.isEmpty()) {
return catalogRelation;
}
TableIf table = catalogRelation.getTable();
if (predicates.getPredicates() != null) {
TableIf table = catalogRelation.getTable();
if (predicates.getPredicates().containsKey(table)) {
predicates.setAddSuccess(true);
return new LogicalFilter<>(
ImmutableSet.of(ExpressionUtils.or(predicates.getPredicates().get(table))),
catalogRelation);
}
}
if (predicates.getPartition() != null && predicates.getPartitionName() != null) {
if (!(catalogRelation instanceof LogicalOlapScan)) {
if (!(table instanceof MTMVRelatedTableIf)) {
return catalogRelation;
}
for (Map.Entry<BaseTableInfo, Set<String>> filterTableEntry : predicates.getPartition().entrySet()) {
LogicalOlapScan olapScan = (LogicalOlapScan) catalogRelation;
OlapTable targetTable = olapScan.getTable();
if (!Objects.equals(new BaseTableInfo(targetTable), filterTableEntry.getKey())) {
if (!Objects.equals(new BaseTableInfo(table), filterTableEntry.getKey())) {
continue;
}
Slot partitionSlot = null;
for (Slot slot : olapScan.getOutput()) {
for (Slot slot : catalogRelation.getOutput()) {
if (((SlotReference) slot).getName().equals(predicates.getPartitionName())) {
partitionSlot = slot;
break;
}
}
if (partitionSlot == null) {
predicates.setHandleSuccess(false);
return catalogRelation;
}
// if partition has no data, doesn't add filter
Set<PartitionItem> partitionHasDataItems = new HashSet<>();
MTMVRelatedTableIf targetTable = (MTMVRelatedTableIf) table;
for (String partitionName : filterTableEntry.getValue()) {
Partition partition = targetTable.getPartition(partitionName);
if (!targetTable.selectNonEmptyPartitionIds(Lists.newArrayList(partition.getId())).isEmpty()) {
// Add filter only when partition has filter
partitionHasDataItems.add(targetTable.getPartitionInfo().getItem(partition.getId()));
if (!(targetTable instanceof OlapTable)) {
// check partition is have data or not, only support olap table
break;
}
if (!((OlapTable) targetTable).selectNonEmptyPartitionIds(
Lists.newArrayList(partition.getId())).isEmpty()) {
// Add filter only when partition has data
partitionHasDataItems.add(
((OlapTable) targetTable).getPartitionInfo().getItem(partition.getId()));
}
}
if (partitionHasDataItems.isEmpty()) {
predicates.setNeedAddFilter(false);
}
if (!partitionHasDataItems.isEmpty()) {
Set<Expression> partitionExpressions =
constructPredicates(partitionHasDataItems, partitionSlot);
predicates.setAddSuccess(true);
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(partitionExpressions)),
catalogRelation);
}
Expand All @@ -322,7 +328,9 @@ public static class PredicateAddContext {
private final Map<TableIf, Set<Expression>> predicates;
private final Map<BaseTableInfo, Set<String>> partition;
private final String partitionName;
private boolean addSuccess = false;
private boolean handleSuccess = true;
// when add filter by partition, if partition has no data, doesn't need to add filter. should be false
private boolean needAddFilter = true;

public PredicateAddContext(Map<TableIf, Set<Expression>> predicates) {
this(predicates, null, null);
Expand Down Expand Up @@ -356,12 +364,20 @@ public boolean isEmpty() {
return predicates == null && partition == null;
}

public boolean isAddSuccess() {
return addSuccess;
public boolean isHandleSuccess() {
return handleSuccess;
}

public void setHandleSuccess(boolean handleSuccess) {
this.handleSuccess = handleSuccess;
}

public boolean isNeedAddFilter() {
return needAddFilter;
}

public void setAddSuccess(boolean addSuccess) {
this.addSuccess = addSuccess;
public void setNeedAddFilter(boolean needAddFilter) {
this.needAddFilter = needAddFilter;
}
}
}
42 changes: 42 additions & 0 deletions regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,45 @@
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_17_0_before --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50
2023-10-21 2023-10-21 \N 2 3 \N
2023-11-21 2023-11-21 \N 2 3 \N

-- !query_17_0_after --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50
2023-10-21 2023-10-21 \N 2 3 \N
2023-11-21 2023-11-21 \N 2 3 \N

-- !query_18_0_before --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50

-- !query_18_0_after --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50

-- !query_19_0_before --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50
2023-10-21 2023-10-21 \N 2 3 \N
2023-11-21 2023-11-21 \N 2 3 \N
2023-11-22 2023-11-22 \N 2 3 \N

-- !query_19_0_after --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50
2023-10-21 2023-10-21 \N 2 3 \N
2023-11-21 2023-11-21 \N 2 3 \N
2023-11-22 2023-11-22 \N 2 3 \N

-- !query_20_0_before --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50

-- !query_20_0_after --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50

Loading

0 comments on commit 5e009b5

Please sign in to comment.