diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index 74db83d7863810..5cc577626a1008 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -274,6 +274,11 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca } if (invalidPartitions == null) { // if mv can not offer any partition for query, query rewrite bail out to avoid cycle run + materializationContext.recordFailReason(queryStructInfo, + "mv can not offer any partition for query", + () -> String.format("mv partition info %s", + ((AsyncMaterializationContext) materializationContext).getMtmv() + .getMvPartitionInfo())); return rewriteResults; } boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext); @@ -281,16 +286,26 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca invalidPartitions; if (partitionNeedUnion) { MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv(); - Plan originPlanWithFilter = StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(), - mtmv.getMvPartitionInfo().getPartitionCol(), cascadesContext); - if (finalInvalidPartitions.value().isEmpty() || originPlanWithFilter == null) { + Pair planAndNeedAddFilterPair = + StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(), + mtmv.getMvPartitionInfo().getRelatedCol(), cascadesContext); + if (planAndNeedAddFilterPair == null) { + materializationContext.recordFailReason(queryStructInfo, + "Add filter to base table fail when union rewrite", + () -> String.format("invalidPartitions are %s, queryPlan is %s, partition column is %s", + invalidPartitions, queryPlan.treeString(), + mtmv.getMvPartitionInfo().getPartitionCol())); + continue; + } + if (finalInvalidPartitions.value().isEmpty() || !planAndNeedAddFilterPair.value()) { + // if invalid base table filter is empty or doesn't need to add filter on base table, // only need remove mv invalid partition rewrittenPlan = rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key()); } else { // For rewrittenPlan which contains materialized view should remove invalid partition ids List children = Lists.newArrayList( rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key()), - originPlanWithFilter); + planAndNeedAddFilterPair.key()); // Union query materialized view and source table rewrittenPlan = new LogicalUnion(Qualifier.ALL, queryPlan.getOutput().stream().map(NamedExpression.class::cast) @@ -452,6 +467,7 @@ protected Pair>, Map>> // If related base table create partitions or mv is created with ttl, need base table union Sets.difference(queryUsedBaseTablePartitionNameSet, mvValidBaseTablePartitionNameSet) .copyInto(baseTableNeedUnionPartitionNameSet); + // Construct result map Map> mvPartitionNeedRemoveNameMap = new HashMap<>(); if (!mvNeedRemovePartitionNameSet.isEmpty()) { mvPartitionNeedRemoveNameMap.put(new BaseTableInfo(mtmv), mvNeedRemovePartitionNameSet); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java index 4dcb74571192fe..d8fcf4a2c5378a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java @@ -728,26 +728,32 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, /** * Add filter on table scan according to table filter map + * + * @return Pair(Plan, Boolean) first is the added filter plan, value is the identifier that represent whether + * need to add filter. + * return null if add filter fail. */ - public static Plan addFilterOnTableScan(Plan queryPlan, Map> partitionOnOriginPlan, - String partitionColumn, - CascadesContext parentCascadesContext) { + public static Pair addFilterOnTableScan(Plan queryPlan, Map> partitionOnOriginPlan, String partitionColumn, CascadesContext parentCascadesContext) { // Firstly, construct filter form invalid partition, this filter should be added on origin plan PredicateAddContext predicateAddContext = new PredicateAddContext(partitionOnOriginPlan, partitionColumn); Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(), predicateAddContext); - if (!predicateAddContext.isAddSuccess()) { + if (!predicateAddContext.isHandleSuccess()) { return null; } + if (!predicateAddContext.isNeedAddFilter()) { + return Pair.of(queryPlan, false); + } // Deep copy the plan to avoid the plan output is the same with the later union output, this may cause // exec by mistake queryPlanWithUnionFilter = new LogicalPlanDeepCopier().deepCopy( (LogicalPlan) queryPlanWithUnionFilter, new DeepCopierContext()); // rbo rewrite after adding filter on origin plan - return MaterializedViewUtils.rewriteByRules(parentCascadesContext, context -> { + return Pair.of(MaterializedViewUtils.rewriteByRules(parentCascadesContext, context -> { Rewriter.getWholeTreeRewriter(context).execute(); return context.getRewritePlan(); - }, queryPlanWithUnionFilter, queryPlan); + }, queryPlanWithUnionFilter, queryPlan), true); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index a2b03e04f420ef..ba1a054752b55a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.mtmv.BaseTableInfo; +import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; @@ -49,7 +50,6 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; -import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; @@ -227,7 +227,6 @@ public Plan visitUnboundRelation(UnboundRelation unboundRelation, PredicateAddCo unboundRelation.getNameParts()); TableIf table = RelationUtil.getTable(tableQualifier, Env.getCurrentEnv()); if (predicates.getPredicates().containsKey(table)) { - predicates.setAddSuccess(true); return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.getPredicates().get(table))), unboundRelation); } @@ -262,48 +261,55 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, if (predicates.isEmpty()) { return catalogRelation; } + TableIf table = catalogRelation.getTable(); if (predicates.getPredicates() != null) { - TableIf table = catalogRelation.getTable(); if (predicates.getPredicates().containsKey(table)) { - predicates.setAddSuccess(true); return new LogicalFilter<>( ImmutableSet.of(ExpressionUtils.or(predicates.getPredicates().get(table))), catalogRelation); } } if (predicates.getPartition() != null && predicates.getPartitionName() != null) { - if (!(catalogRelation instanceof LogicalOlapScan)) { + if (!(table instanceof MTMVRelatedTableIf)) { return catalogRelation; } for (Map.Entry> filterTableEntry : predicates.getPartition().entrySet()) { - LogicalOlapScan olapScan = (LogicalOlapScan) catalogRelation; - OlapTable targetTable = olapScan.getTable(); - if (!Objects.equals(new BaseTableInfo(targetTable), filterTableEntry.getKey())) { + if (!Objects.equals(new BaseTableInfo(table), filterTableEntry.getKey())) { continue; } Slot partitionSlot = null; - for (Slot slot : olapScan.getOutput()) { + for (Slot slot : catalogRelation.getOutput()) { if (((SlotReference) slot).getName().equals(predicates.getPartitionName())) { partitionSlot = slot; break; } } if (partitionSlot == null) { + predicates.setHandleSuccess(false); return catalogRelation; } // if partition has no data, doesn't add filter Set partitionHasDataItems = new HashSet<>(); + MTMVRelatedTableIf targetTable = (MTMVRelatedTableIf) table; for (String partitionName : filterTableEntry.getValue()) { Partition partition = targetTable.getPartition(partitionName); - if (!targetTable.selectNonEmptyPartitionIds(Lists.newArrayList(partition.getId())).isEmpty()) { - // Add filter only when partition has filter - partitionHasDataItems.add(targetTable.getPartitionInfo().getItem(partition.getId())); + if (!(targetTable instanceof OlapTable)) { + // check partition is have data or not, only support olap table + break; + } + if (!((OlapTable) targetTable).selectNonEmptyPartitionIds( + Lists.newArrayList(partition.getId())).isEmpty()) { + // Add filter only when partition has data + partitionHasDataItems.add( + ((OlapTable) targetTable).getPartitionInfo().getItem(partition.getId())); } } + if (partitionHasDataItems.isEmpty()) { + predicates.setNeedAddFilter(false); + } if (!partitionHasDataItems.isEmpty()) { Set partitionExpressions = constructPredicates(partitionHasDataItems, partitionSlot); - predicates.setAddSuccess(true); return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(partitionExpressions)), catalogRelation); } @@ -322,7 +328,9 @@ public static class PredicateAddContext { private final Map> predicates; private final Map> partition; private final String partitionName; - private boolean addSuccess = false; + private boolean handleSuccess = true; + // when add filter by partition, if partition has no data, doesn't need to add filter. should be false + private boolean needAddFilter = true; public PredicateAddContext(Map> predicates) { this(predicates, null, null); @@ -356,12 +364,20 @@ public boolean isEmpty() { return predicates == null && partition == null; } - public boolean isAddSuccess() { - return addSuccess; + public boolean isHandleSuccess() { + return handleSuccess; + } + + public void setHandleSuccess(boolean handleSuccess) { + this.handleSuccess = handleSuccess; + } + + public boolean isNeedAddFilter() { + return needAddFilter; } - public void setAddSuccess(boolean addSuccess) { - this.addSuccess = addSuccess; + public void setNeedAddFilter(boolean needAddFilter) { + this.needAddFilter = needAddFilter; } } } diff --git a/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out b/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out index 1aec66cf42f557..bf22739583a3f0 100644 --- a/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out +++ b/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out @@ -63,3 +63,45 @@ 2023-10-18 2023-10-18 2 3 109.20 2023-10-19 2023-10-19 2 3 99.50 +-- !query_17_0_before -- +2023-10-18 2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 2023-10-21 \N 2 3 \N +2023-11-21 2023-11-21 \N 2 3 \N + +-- !query_17_0_after -- +2023-10-18 2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 2023-10-21 \N 2 3 \N +2023-11-21 2023-11-21 \N 2 3 \N + +-- !query_18_0_before -- +2023-10-18 2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2023-10-19 2 3 99.50 + +-- !query_18_0_after -- +2023-10-18 2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2023-10-19 2 3 99.50 + +-- !query_19_0_before -- +2023-10-18 2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 2023-10-21 \N 2 3 \N +2023-11-21 2023-11-21 \N 2 3 \N +2023-11-22 2023-11-22 \N 2 3 \N + +-- !query_19_0_after -- +2023-10-18 2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 2023-10-21 \N 2 3 \N +2023-11-21 2023-11-21 \N 2 3 \N +2023-11-22 2023-11-22 \N 2 3 \N + +-- !query_20_0_before -- +2023-10-18 2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2023-10-19 2 3 99.50 + +-- !query_20_0_after -- +2023-10-18 2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2023-10-19 2 3 99.50 + diff --git a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy index 198d98086660e8..9808f578d64da6 100644 --- a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy +++ b/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy @@ -40,7 +40,7 @@ suite("partition_mv_rewrite") { ) DUPLICATE KEY(o_orderkey, o_custkey) PARTITION BY RANGE(o_orderdate)( - FROM ('2023-10-16') TO ('2023-11-01') INTERVAL 1 DAY + FROM ('2023-10-16') TO ('2023-11-30') INTERVAL 1 DAY ) DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3 PROPERTIES ( @@ -74,7 +74,7 @@ suite("partition_mv_rewrite") { ) DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber) PARTITION BY RANGE(l_shipdate) - (FROM ('2023-10-16') TO ('2023-11-01') INTERVAL 1 DAY) + (FROM ('2023-10-16') TO ('2023-11-30') INTERVAL 1 DAY) DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3 PROPERTIES ( "replication_num" = "1" @@ -132,10 +132,12 @@ suite("partition_mv_rewrite") { l_suppkey; """ - sql """DROP MATERIALIZED VIEW IF EXISTS mv_10086""" - sql """DROP TABLE IF EXISTS mv_10086""" + + def mv_name = "mv_10086" + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}""" + sql """DROP TABLE IF EXISTS ${mv_name}""" sql""" - CREATE MATERIALIZED VIEW mv_10086 + CREATE MATERIALIZED VIEW ${mv_name} BUILD IMMEDIATE REFRESH AUTO ON MANUAL partition by(l_shipdate) DISTRIBUTED BY RANDOM BUCKETS 2 @@ -144,10 +146,7 @@ suite("partition_mv_rewrite") { ${mv_def_sql} """ - def mv_name = "mv_10086" - - def job_name = getJobName(db, mv_name); - waitingMTMVTaskFinished(job_name) + waitingMTMVTaskFinished(getJobName(db, mv_name)) explain { sql("${all_partition_sql}") @@ -185,7 +184,6 @@ suite("partition_mv_rewrite") { } order_qt_query_4_0_after "${partition_sql}" - // base table add partition sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO" waitingMTMVTaskFinished(getJobName(db, mv_name)) @@ -217,7 +215,6 @@ suite("partition_mv_rewrite") { } order_qt_query_8_0_after "${partition_sql}" - // base table delete partition test sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO" waitingMTMVTaskFinished(getJobName(db, mv_name)) @@ -371,4 +368,165 @@ suite("partition_mv_rewrite") { order_qt_query_16_0_after "${ttl_partition_sql}" sql """ DROP MATERIALIZED VIEW IF EXISTS ${ttl_mv_name}""" + + + // date roll up mv + def roll_up_mv_def_sql = """ + select date_trunc(`l_shipdate`, 'day') as col1, l_shipdate, o_orderdate, l_partkey, + l_suppkey, sum(o_totalprice) as sum_total + from lineitem + left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate + group by + col1, + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey; + """ + + def roll_up_all_partition_sql = """ + select date_trunc(`l_shipdate`, 'day') as col1, l_shipdate, o_orderdate, l_partkey, + l_suppkey, sum(o_totalprice) as sum_total + from lineitem + left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate + group by + col1, + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey; + """ + + def roll_up_partition_sql = """ + select date_trunc(`l_shipdate`, 'day') as col1, l_shipdate, o_orderdate, l_partkey, + l_suppkey, sum(o_totalprice) as sum_total + from lineitem + left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate + where (l_shipdate>= '2023-10-18' and l_shipdate <= '2023-10-19') + group by + col1, + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey; + """ + + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}""" + sql """DROP TABLE IF EXISTS ${mv_name}""" + sql""" + CREATE MATERIALIZED VIEW ${mv_name} + BUILD IMMEDIATE REFRESH AUTO ON MANUAL + partition by (date_trunc(`col1`, 'month')) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${roll_up_mv_def_sql} + """ + waitingMTMVTaskFinished(getJobName(db, mv_name)) + + explain { + sql("${roll_up_all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + explain { + sql("${roll_up_partition_sql}") + contains("${mv_name}(${mv_name})") + } + // base table add partition + sql """ + insert into lineitem values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-11-21', '2023-11-21', '2023-11-21', 'a', 'b', 'yyyyyyyyy'); + """ + + // enable union rewrite + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_17_0_before "${roll_up_all_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${roll_up_all_partition_sql}") + // should rewrite successful when union rewrite enalbe if base table add new partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_17_0_after "${roll_up_all_partition_sql}" + + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_18_0_before "${roll_up_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${roll_up_partition_sql}") + // should rewrite successfully when union rewrite enable if doesn't query new partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_18_0_after "${roll_up_partition_sql}" + + + def check_rewrite_but_not_chose = { query_sql, mv_name_param -> + explain { + sql("${query_sql}") + check {result -> + def splitResult = result.split("MaterializedViewRewriteFail") + splitResult.length == 2 ? splitResult[0].contains(mv_name_param) : false + } + } + } + + + // base table partition add data + sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO" + waitingMTMVTaskFinished(getJobName(db, mv_name)) + sql """ + insert into lineitem values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-11-21', '2023-11-21', '2023-11-21', 'd', 'd', 'yyyyyyyyy'), + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-11-22', '2023-11-22', '2023-11-22', 'd', 'd', 'yyyyyyyyy'); + """ + + // enable union rewrite + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_19_0_before "${roll_up_all_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${roll_up_all_partition_sql}") + // should rewrite successful when union rewrite enalbe if base table add new partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_19_0_after "${roll_up_all_partition_sql}" + + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_20_0_before "${roll_up_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${roll_up_partition_sql}") + // should rewrite successfully when union rewrite enable if doesn't query new partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_20_0_after "${roll_up_partition_sql}" + + + // base table delete partition + sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO" + waitingMTMVTaskFinished(getJobName(db, mv_name)) + sql """ ALTER TABLE lineitem DROP PARTITION IF EXISTS p_20231121 FORCE; + """ + + // enable union rewrite +// this depends on getting corret partitions when base table delete partition, tmp comment +// sql "SET enable_materialized_view_rewrite=false" +// order_qt_query_21_0_before "${roll_up_all_partition_sql}" +// sql "SET enable_materialized_view_rewrite=true" +// explain { +// sql("${roll_up_all_partition_sql}") +// // should rewrite successful when union rewrite enalbe if base table add new partition +// contains("${mv_name}(${mv_name})") +// } +// order_qt_query_21_0_after "${roll_up_all_partition_sql}" +// +// sql "SET enable_materialized_view_rewrite=false" +// order_qt_query_22_0_before "${roll_up_partition_sql}" +// sql "SET enable_materialized_view_rewrite=true" +// explain { +// sql("${roll_up_partition_sql}") +// // should rewrite successfully when union rewrite enable if doesn't query new partition +// contains("${mv_name}(${mv_name})") +// } +// order_qt_query_22_0_after "${roll_up_partition_sql}" }