Skip to content

Commit

Permalink
Merge branch 'master' into 20231214_set_user_prop
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Dec 17, 2023
2 parents 8867435 + b06f3ed commit d627def
Show file tree
Hide file tree
Showing 43 changed files with 1,461 additions and 281 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ statement
(REFRESH refreshMethod? refreshTrigger?)?
(KEY keys=identifierList)?
(COMMENT STRING_LITERAL)?
(PARTITION BY LEFT_PAREN partitionKey = identifier RIGHT_PAREN)?
(DISTRIBUTED BY (HASH hashKeys=identifierList | RANDOM) (BUCKETS (INTEGER_VALUE | AUTO))?)?
propertyClause?
AS query #createMTMV
| REFRESH MATERIALIZED VIEW mvName=multipartIdentifier #refreshMTMV
| REFRESH MATERIALIZED VIEW mvName=multipartIdentifier (partitionSpec | COMPLETE)? #refreshMTMV
| ALTER MATERIALIZED VIEW mvName=multipartIdentifier ((RENAME newName=identifier)
| (REFRESH (refreshMethod | refreshTrigger | refreshMethod refreshTrigger))
| (SET LEFT_PAREN fileProperties=propertyItemList RIGHT_PAREN)) #alterMTMV
Expand Down Expand Up @@ -158,15 +159,11 @@ refreshTrigger
;

refreshSchedule
: EVERY INTEGER_VALUE mvRefreshUnit (STARTS STRING_LITERAL)?
;

mvRefreshUnit
: SECOND | MINUTE | HOUR | DAY | WEEK
: EVERY INTEGER_VALUE refreshUnit = identifier (STARTS STRING_LITERAL)?
;

refreshMethod
: COMPLETE
: COMPLETE | AUTO
;

identifierOrText
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Index;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelation;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -31,17 +33,21 @@ public class CreateMTMVStmt extends CreateTableStmt {
private final String querySql;
private final EnvInfo envInfo;
private Map<String, String> mvProperties;
private MTMVPartitionInfo mvPartitionInfo;
private MTMVRelation relation;

public CreateMTMVStmt(boolean ifNotExists, TableName mvName, List<Column> columns,
MTMVRefreshInfo refreshInfo, KeysDesc keyDesc, DistributionDesc distributionDesc,
Map<String, String> properties, Map<String, String> mvProperties, String querySql, String comment,
EnvInfo envInfo) {
super(ifNotExists, false, mvName, columns, new ArrayList<Index>(), DEFAULT_ENGINE_NAME, keyDesc, null,
EnvInfo envInfo, PartitionDesc partitionDesc, MTMVPartitionInfo mvPartitionInfo, MTMVRelation relation) {
super(ifNotExists, false, mvName, columns, new ArrayList<Index>(), DEFAULT_ENGINE_NAME, keyDesc, partitionDesc,
distributionDesc, properties, null, comment, null, null);
this.refreshInfo = refreshInfo;
this.querySql = querySql;
this.envInfo = envInfo;
this.mvProperties = mvProperties;
this.mvPartitionInfo = mvPartitionInfo;
this.relation = relation;
}

public MTMVRefreshInfo getRefreshInfo() {
Expand All @@ -59,4 +65,12 @@ public EnvInfo getEnvInfo() {
public Map<String, String> getMvProperties() {
return mvProperties;
}

public MTMVPartitionInfo getMvPartitionInfo() {
return mvPartitionInfo;
}

public MTMVRelation getRelation() {
return relation;
}
}
56 changes: 49 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,25 @@
package org.apache.doris.catalog;

import org.apache.doris.catalog.OlapTableFactory.MTMVParams;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVJobInfo;
import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.mtmv.MVCache;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -41,6 +45,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;


Expand All @@ -62,8 +67,10 @@ public class MTMV extends OlapTable {
private Map<String, String> mvProperties;
@SerializedName("r")
private MTMVRelation relation;
// Should update after every fresh
private MVCache mvCache;
@SerializedName("mpi")
private MTMVPartitionInfo mvPartitionInfo;
// Should update after every fresh, not persist
private MTMVCache cache;

// For deserialization
public MTMV() {
Expand All @@ -87,6 +94,8 @@ public MTMV() {
this.status = new MTMVStatus();
this.jobInfo = new MTMVJobInfo(MTMVJobManager.MTMV_JOB_PREFIX + params.tableId);
this.mvProperties = params.mvProperties;
this.mvPartitionInfo = params.mvPartitionInfo;
this.relation = params.relation;
mvRwLock = new ReentrantReadWriteLock(true);
}

Expand Down Expand Up @@ -119,12 +128,12 @@ public MTMVRelation getRelation() {
return relation;
}

public MVCache getMvCache() {
return mvCache;
public MTMVCache getCache() {
return cache;
}

public void setMvCache(MVCache mvCache) {
this.mvCache = mvCache;
public void setCache(MTMVCache cache) {
this.cache = cache;
}

public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) {
Expand All @@ -148,6 +157,12 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation) {
this.status.setSchemaChangeDetail(null);
this.status.setRefreshState(MTMVRefreshState.SUCCESS);
this.relation = relation;
try {
this.cache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this));
} catch (Throwable e) {
this.cache = null;
LOG.warn("generate cache failed", e);
}
} else {
this.status.setRefreshState(MTMVRefreshState.FAIL);
}
Expand All @@ -170,10 +185,36 @@ public long getGracePeriod() {
}
}

public Set<String> getExcludedTriggerTables() {
if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
return Sets.newHashSet();
}
String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
return Sets.newHashSet(split);
}

public MTMVCache getOrGenerateCache() throws AnalysisException {
if (cache == null) {
writeMvLock();
try {
if (cache == null) {
this.cache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this));
}
} finally {
writeMvUnlock();
}
}
return cache;
}

public Map<String, String> getMvProperties() {
return mvProperties;
}

public MTMVPartitionInfo getMvPartitionInfo() {
return mvPartitionInfo;
}

public void readMvLock() {
this.mvRwLock.readLock().lock();
}
Expand Down Expand Up @@ -207,6 +248,7 @@ public void readFields(DataInput in) throws IOException {
jobInfo = materializedView.jobInfo;
mvProperties = materializedView.mvProperties;
relation = materializedView.relation;
mvPartitionInfo = materializedView.mvPartitionInfo;
}

}
22 changes: 22 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,17 @@ public Partition getPartition(String partitionName) {
return getPartition(partitionName, true);
}

public Partition getPartitionOrAnalysisException(String partitionName) throws AnalysisException {
Partition partition = getPartition(partitionName, false);
if (partition == null) {
partition = getPartition(partitionName, true);
}
if (partition == null) {
throw new AnalysisException("partition not found: " + partitionName);
}
return partition;
}

// get partition by name
public Partition getPartition(String partitionName, boolean isTempPartition) {
if (isTempPartition) {
Expand All @@ -965,6 +976,17 @@ public Partition getPartition(long partitionId) {
return partition;
}

public Partition getPartitionOrAnalysisException(long partitionId) throws AnalysisException {
Partition partition = idToPartition.get(partitionId);
if (partition == null) {
partition = tempPartitions.getPartition(partitionId);
}
if (partition == null) {
throw new AnalysisException("partition not found: " + partitionId);
}
return partition;
}

// select the non-empty partition ids belonging to this table.
//
// ATTN: partitions not belonging to this table will be filtered.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.doris.analysis.DdlStmt;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelation;

import com.google.common.base.Preconditions;

Expand All @@ -49,6 +51,8 @@ public static class MTMVParams extends BuildParams {
public EnvInfo envInfo;
public String querySql;
public Map<String, String> mvProperties;
public MTMVPartitionInfo mvPartitionInfo;
public MTMVRelation relation;
}

private BuildParams params;
Expand Down Expand Up @@ -158,6 +162,22 @@ private OlapTableFactory withEnvInfo(EnvInfo envInfo) {
return this;
}

private OlapTableFactory withMvPartitionInfo(MTMVPartitionInfo mvPartitionInfo) {
Preconditions.checkState(params instanceof MTMVParams, "Invalid argument for "
+ params.getClass().getSimpleName());
MTMVParams mtmvParams = (MTMVParams) params;
mtmvParams.mvPartitionInfo = mvPartitionInfo;
return this;
}

private OlapTableFactory withMvRelation(MTMVRelation relation) {
Preconditions.checkState(params instanceof MTMVParams, "Invalid argument for "
+ params.getClass().getSimpleName());
MTMVParams mtmvParams = (MTMVParams) params;
mtmvParams.relation = relation;
return this;
}

public OlapTableFactory withExtraParams(DdlStmt stmt) {
boolean isMaterializedView = stmt instanceof CreateMTMVStmt;
if (!isMaterializedView) {
Expand All @@ -168,6 +188,8 @@ public OlapTableFactory withExtraParams(DdlStmt stmt) {
return withRefreshInfo(createMTMVStmt.getRefreshInfo())
.withQuerySql(createMTMVStmt.getQuerySql())
.withMvProperties(createMTMVStmt.getMvProperties())
.withMvPartitionInfo(createMTMVStmt.getMvPartitionInfo())
.withMvRelation(createMTMVStmt.getRelation())
.withEnvInfo(createMTMVStmt.getEnvInfo());
}
}
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ public long getVisibleVersionTime() {
return visibleVersionTime;
}

/**
* if visibleVersion is 1, do not return creation time but 0
*
* @return
*/
public long getVisibleVersionTimeIgnoreInit() {
if (visibleVersion == 1) {
return 0L;
}
return visibleVersionTime;
}

// The method updateVisibleVersionAndVersionHash is called when fe restart, the visibleVersionTime is updated
private void setVisibleVersion(long visibleVersion) {
this.visibleVersion = visibleVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ public PartitionItem getItem(long partitionId) {
return item;
}

public PartitionItem getItemOrAnalysisException(long partitionId) throws AnalysisException {
PartitionItem item = idToItem.get(partitionId);
if (item == null) {
item = idToTempItem.get(partitionId);
}
if (item == null) {
throw new AnalysisException("PartitionItem not found: " + partitionId);
}
return item;
}

public void setItem(long partitionId, boolean isTemp, PartitionItem item) {
setItemInternal(partitionId, isTemp, item);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
Expand All @@ -37,17 +38,20 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.mtmv.MTMVUtil;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -68,7 +72,7 @@ public class PartitionsProcDir implements ProcDirInterface {
.add("State").add("PartitionKey").add("Range").add("DistributionKey")
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime").add("RemoteStoragePolicy")
.add("LastConsistencyCheckTime").add("DataSize").add("IsInMemory").add("ReplicaAllocation")
.add("IsMutable")
.add("IsMutable").add("SyncWithBaseTables").add("UnsyncTables")
.build();

private Database db;
Expand Down Expand Up @@ -303,6 +307,20 @@ private List<List<Comparable>> getPartitionInfos() {
partitionInfo.add(tblPartitionInfo.getReplicaAllocation(partitionId).toCreateStmt());

partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId));
if (olapTable instanceof MTMV) {
try {
List<String> partitionUnSyncTables = MTMVUtil
.getPartitionUnSyncTables((MTMV) olapTable, partitionId);
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
partitionInfo.add(partitionUnSyncTables.toString());
} catch (AnalysisException e) {
partitionInfo.add(false);
partitionInfo.add(e.getMessage());
}
} else {
partitionInfo.add(true);
partitionInfo.add(FeConstants.null_string);
}

partitionInfos.add(partitionInfo);
}
Expand Down
Loading

0 comments on commit d627def

Please sign in to comment.