Skip to content

Commit

Permalink
[fix](mtmv) second level MTMV always refresh all partition by mistake (
Browse files Browse the repository at this point in the history
…#38698)

if:
-  create mv2 as select * from mv1;
- create mv1 as select * from t1;
- t1 has 2 partitions: p1,p2

when t1 insert data to p1;

mv1 will refresh p1

mv2 will refresh p1 and p2

fix:
should not check if data is sync between mv2 and t1
  • Loading branch information
zddr authored and dataroaring committed Aug 2, 2024
1 parent 259ec25 commit 7ec1a8e
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void run() throws JobException {
.subList(start, end > needRefreshPartitions.size() ? needRefreshPartitions.size() : end));
// need get names before exec
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil
.generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionNames,
.generatePartitionSnapshots(mtmv, relation.getBaseTablesOneLevel(), execPartitionNames,
partitionMappings);
exec(ctx, execPartitionNames, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
Expand Down Expand Up @@ -288,7 +288,7 @@ public void before() throws JobException {
* @throws DdlException
*/
private void refreshHmsTable() throws AnalysisException, DdlException {
for (BaseTableInfo tableInfo : relation.getBaseTables()) {
for (BaseTableInfo tableInfo : relation.getBaseTablesOneLevel()) {
TableIf tableIf = MTMVUtil.getTable(tableInfo);
if (tableIf instanceof HMSExternalTable) {
HMSExternalTable hmsTable = (HMSExternalTable) tableIf;
Expand Down Expand Up @@ -450,7 +450,8 @@ public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> part
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables(),
boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTablesOneLevel(),
mtmv.getExcludedTriggerTables(),
partitionMappings);
if (fresh) {
return Lists.newArrayList();
Expand All @@ -461,7 +462,8 @@ public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> part
}
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables(), partitionMappings);
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTablesOneLevel(),
partitionMappings);
}

public MTMVTaskContext getTaskContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public static boolean isMTMVSync(MTMV mtmv) {
return false;
}
try {
return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet(), mtmv.calculatePartitionMappings());
return isMTMVSync(mtmv, mtmvRelation.getBaseTablesOneLevel(), Sets.newHashSet(),
mtmv.calculatePartitionMappings());
} catch (AnalysisException e) {
LOG.warn("isMTMVSync failed: ", e);
return false;
Expand Down Expand Up @@ -254,7 +255,7 @@ private static List<String> getPartitionUnSyncTables(MTMV mtmv, String partition
Set<String> relatedPartitionNames)
throws AnalysisException {
List<String> res = Lists.newArrayList();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) {
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTablesOneLevel()) {
TableIf table = MTMVUtil.getTable(baseTableInfo);
if (!(table instanceof MTMVRelatedTableIf)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
partitionMappings = mtmv.calculatePartitionMappings();
}
if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getName(),
partitionMappings.get(partition.getName()), mtmvRelation.getBaseTables(),
partitionMappings.get(partition.getName()), mtmvRelation.getBaseTablesOneLevel(),
Sets.newHashSet())) {
res.add(partition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotF
* @return
*/
public static boolean mtmvContainsExternalTable(MTMV mtmv) {
Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables();
Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTablesOneLevel();
for (BaseTableInfo baseTableInfo : baseTables) {
if (baseTableInfo.getCtlId() != InternalCatalog.INTERNAL_CATALOG_ID) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc
minTimes = 0;
result = true;

relation.getBaseTables();
relation.getBaseTablesOneLevel();
minTimes = 0;
result = baseTables;

Expand Down
6 changes: 6 additions & 0 deletions regression-test/data/mtmv_p0/test_multi_level_mtmv.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
-- !mv2 --
1 1

-- !mv1_should_one_partition --
["p_2"]

-- !mv2_should_one_partition --
["p_2"]

-- !status1 --
multi_level_mtmv1 SCHEMA_CHANGE SUCCESS

Expand Down
31 changes: 25 additions & 6 deletions regression-test/suites/mtmv_p0/test_multi_level_mtmv.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ suite("test_multi_level_mtmv") {
k1 int,
k2 int
)
PARTITION BY LIST(`k1`)
(
PARTITION `p1` VALUES IN ('1'),
PARTITION `p2` VALUES IN ('2')
)
DISTRIBUTED BY HASH(k1) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
Expand All @@ -40,34 +45,48 @@ suite("test_multi_level_mtmv") {

sql """
CREATE MATERIALIZED VIEW ${mv1}
BUILD DEFERRED REFRESH COMPLETE ON MANUAL
BUILD DEFERRED REFRESH AUTO ON MANUAL
partition by(k1)
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS
SELECT * FROM ${tableName};
"""
def jobName1 = getJobName("regression_test_mtmv_p0", mv1);
sql """
REFRESH MATERIALIZED VIEW ${mv1} AUTO
"""
waitingMTMVTaskFinished(jobName1)
waitingMTMVTaskFinishedByMvName(mv1)
order_qt_mv1 "select * from ${mv1}"

sql """
CREATE MATERIALIZED VIEW ${mv2}
BUILD DEFERRED REFRESH COMPLETE ON MANUAL
BUILD DEFERRED REFRESH AUTO ON MANUAL
partition by(k1)
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS
SELECT * FROM ${mv1};
"""
def jobName2 = getJobName("regression_test_mtmv_p0", mv2);
sql """
REFRESH MATERIALIZED VIEW ${mv2} AUTO
"""
waitingMTMVTaskFinished(jobName2)
waitingMTMVTaskFinishedByMvName(mv2)
order_qt_mv2 "select * from ${mv2}"

sql """
INSERT INTO ${tableName} VALUES(2,2);
"""
sql """
REFRESH MATERIALIZED VIEW ${mv1} AUTO
"""
waitingMTMVTaskFinishedByMvName(mv1)
order_qt_mv1_should_one_partition "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv1}' order by CreateTime desc limit 1"
sql """
REFRESH MATERIALIZED VIEW ${mv2} AUTO
"""
waitingMTMVTaskFinishedByMvName(mv2)
order_qt_mv2_should_one_partition "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv2}' order by CreateTime desc limit 1"

// drop table
sql """
drop table ${tableName}
Expand Down

0 comments on commit 7ec1a8e

Please sign in to comment.