Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify partition's version name to what it means #334

Merged
merged 3 commits into from
Nov 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/alter/RollupHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private void processAddRollup(AddRollupClause alterClause, Database db, OlapTabl
// has to set failed verison and version hash here, because there will be no load after rollup
// so that if not set here, last failed version will not be set
rollupReplica.updateVersionInfo(rollupReplica.getVersion(), rollupReplica.getVersionHash(),
partition.getCurrentVersion(), partition.getCurrentVersionHash(),
partition.getCommittedVersion(), partition.getCommittedVersionHash(),
rollupReplica.getLastSuccessVersion(), rollupReplica.getLastSuccessVersionHash());
if (isRestore) {
rollupReplica.setState(ReplicaState.NORMAL);
Expand Down
8 changes: 4 additions & 4 deletions fe/src/main/java/org/apache/doris/alter/RollupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public synchronized void updateRollupReplicaInfo(long partitionId, long indexId,
throw new MetaNotFoundException("cannot find replica in tablet[" + tabletId + "], backend[" + backendId
+ "]");
}
replica.updateInfo(version, versionHash, dataSize, rowCount);
replica.updateVersionInfo(version, versionHash, dataSize, rowCount);
LOG.debug("rollup replica[{}] info updated. schemaHash:{}", replica.getId(), schemaHash);
}

Expand Down Expand Up @@ -587,7 +587,7 @@ public synchronized void handleFinishedReplica(AgentTask task, TTabletInfo finis
// yiguolei: not check version here because the replica's first version will be set by rollup job
// the version is not set now
// the finish task thread doesn't own db lock here, maybe a bug?
rollupReplica.updateInfo(version, versionHash, dataSize, rowCount);
rollupReplica.updateVersionInfo(version, versionHash, dataSize, rowCount);

setReplicaFinished(partitionId, rollupReplicaId);
rollupReplica.setState(ReplicaState.NORMAL);
Expand Down Expand Up @@ -735,7 +735,7 @@ public int tryFinishJob() {
// 3. add rollup finished version to base index
MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
if (baseIndex != null) {
baseIndex.setRollupIndexInfo(rollupIndexId, partition.getCommittedVersion());
baseIndex.setRollupIndexInfo(rollupIndexId, partition.getVisibleVersion());
}
Preconditions.checkState(partition.getState() == PartitionState.ROLLUP);
partition.setState(PartitionState.NORMAL);
Expand Down Expand Up @@ -840,7 +840,7 @@ public void replayFinishing(Database db) {

MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
if (baseIndex != null) {
baseIndex.setRollupIndexInfo(rollupIndexId, partition.getCommittedVersion());
baseIndex.setRollupIndexInfo(rollupIndexId, partition.getVisibleVersion());
}

partition.createRollupIndex(rollupIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ public void handleFinishedReplica(AgentTask task, TTabletInfo finishTabletInfo,
long dataSize = finishTabletInfo.getData_size();
long rowCount = finishTabletInfo.getRow_count();
// do not need check version > replica.getVersion, because the new replica's version is first set by sc
replica.updateInfo(version, versionHash, dataSize, rowCount);
replica.updateVersionInfo(version, versionHash, dataSize, rowCount);
} finally {
db.writeUnlock();
}
Expand Down
35 changes: 17 additions & 18 deletions fe/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@

package org.apache.doris.backup;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.doris.analysis.TableRef;
import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.BrokerMgr.BrokerAddress;
Expand All @@ -40,17 +50,6 @@
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -377,33 +376,33 @@ private void prepareAndSendSnapshotTask() {

// snapshot partitions
for (Partition partition : partitions) {
long committedVersion = partition.getCommittedVersion();
long committedVersionHash = partition.getCommittedVersionHash();
long visibleVersion = partition.getVisibleVersion();
long visibleVersionHash = partition.getVisibleVersionHash();
List<MaterializedIndex> indexes = partition.getMaterializedIndices();
for (MaterializedIndex index : indexes) {
int schemaHash = tbl.getSchemaHashByIndexId(index.getId());
List<Tablet> tablets = index.getTablets();
for (Tablet tablet : tablets) {
Replica replica = chooseReplica(tablet, committedVersion, committedVersionHash);
Replica replica = chooseReplica(tablet, visibleVersion, visibleVersionHash);
if (replica == null) {
status = new Status(ErrCode.COMMON_ERROR,
"faild to choose replica to make snapshot for tablet " + tablet.getId()
+ ". committed version: " + committedVersion
+ ", committed version hash: " + committedVersionHash);
+ ". visible version: " + visibleVersion
+ ", visible version hash: " + visibleVersionHash);
return;
}
SnapshotTask task = new SnapshotTask(null, replica.getBackendId(), tablet.getId(),
jobId, dbId, tbl.getId(), partition.getId(),
index.getId(), tablet.getId(),
committedVersion, committedVersionHash,
visibleVersion, visibleVersionHash,
schemaHash, timeoutMs, false /* not restore task */);
batchTask.addTask(task);
unfinishedTaskIds.add(tablet.getId());
}
}

LOG.info("snapshot for partition {}, version: {}, version hash: {}",
partition.getId(), committedVersion, committedVersionHash);
partition.getId(), visibleVersion, visibleVersionHash);
}
}

Expand Down
4 changes: 2 additions & 2 deletions fe/src/main/java/org/apache/doris/backup/BackupJobInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ public static BackupJobInfo fromCatalog(long backupTime, String label, String db
BackupPartitionInfo partitionInfo = new BackupPartitionInfo();
partitionInfo.id = partition.getId();
partitionInfo.name = partition.getName();
partitionInfo.version = partition.getCommittedVersion();
partitionInfo.versionHash = partition.getCommittedVersionHash();
partitionInfo.version = partition.getVisibleVersion();
partitionInfo.versionHash = partition.getVisibleVersionHash();
tableInfo.partitions.put(partitionInfo.name, partitionInfo);
// indexes
for (MaterializedIndex index : partition.getMaterializedIndices()) {
Expand Down
8 changes: 4 additions & 4 deletions fe/src/main/java/org/apache/doris/backup/BackupJob_D.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,17 +365,17 @@ private void getMeta(Database db, Map<String, List<? extends Writable>> pathToWr

// save version info
partitionIdToVersionInfo.put(partitionId,
new Pair<Long, Long>(partition.getCommittedVersion(),
partition.getCommittedVersionHash()));
new Pair<Long, Long>(partition.getVisibleVersion(),
partition.getVisibleVersionHash()));
}
} else {
Preconditions.checkState(partitionIds.size() == 1);
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
// save version info
partitionIdToVersionInfo.put(partitionId,
new Pair<Long, Long>(partition.getCommittedVersion(),
partition.getCommittedVersionHash()));
new Pair<Long, Long>(partition.getVisibleVersion(),
partition.getVisibleVersionHash()));
}
}
} // end for tables
Expand Down
16 changes: 8 additions & 8 deletions fe/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ private void checkAndPrepareMeta() {
SnapshotTask task = new SnapshotTask(null, replica.getBackendId(), signature,
jobId, db.getId(),
tbl.getId(), part.getId(), index.getId(), tablet.getId(),
part.getCommittedVersion(), part.getCommittedVersionHash(),
part.getVisibleVersion(), part.getVisibleVersionHash(),
tbl.getSchemaHashByIndexId(index.getId()), timeoutMs,
true /* is restore task*/);
batchTask.addTask(task);
Expand Down Expand Up @@ -763,8 +763,8 @@ private Partition resetPartitionForRestore(OlapTable localTbl, OlapTable remoteT
}

// save version info for creating replicas
long committedVersion = remotePart.getCommittedVersion();
long committedVersionHash = remotePart.getCommittedVersionHash();
long visibleVersion = remotePart.getVisibleVersion();
long visibleVersionHash = remotePart.getVisibleVersionHash();

// tablets
for (MaterializedIndex remoteIdx : remotePart.getMaterializedIndices()) {
Expand All @@ -789,7 +789,7 @@ private Partition resetPartitionForRestore(OlapTable localTbl, OlapTable remoteT
for (Long beId : beIds) {
long newReplicaId = catalog.getNextId();
Replica newReplica = new Replica(newReplicaId, beId, ReplicaState.NORMAL,
committedVersion, committedVersionHash);
visibleVersion, visibleVersionHash);
newTablet.addReplica(newReplica, true /* is restore */);
}
}
Expand Down Expand Up @@ -1141,15 +1141,15 @@ private Status allTabletCommitted(boolean isReplay) {
}

// update partition committed version
part.updateCommitVersionAndVersionHash(entry.getValue().first, entry.getValue().second);
part.updateVisibleVersionAndVersionHash(entry.getValue().first, entry.getValue().second);

// we also need to update the replica version of these overwritten restored partitions
for (MaterializedIndex idx : part.getMaterializedIndices()) {
for (Tablet tablet : idx.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
if (!replica.checkVersionCatchUp(part.getCommittedVersion(),
part.getCommittedVersionHash())) {
replica.updateInfo(part.getCommittedVersion(), part.getCommittedVersionHash(),
if (!replica.checkVersionCatchUp(part.getVisibleVersion(),
part.getVisibleVersionHash())) {
replica.updateVersionInfo(part.getVisibleVersion(), part.getVisibleVersionHash(),
replica.getDataSize(), replica.getRowCount());
}
}
Expand Down
10 changes: 5 additions & 5 deletions fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -3097,10 +3097,10 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long

// version and version hash
if (versionInfo != null) {
partition.updateCommitVersionAndVersionHash(versionInfo.first, versionInfo.second);
partition.updateVisibleVersionAndVersionHash(versionInfo.first, versionInfo.second);
}
long version = partition.getCommittedVersion();
long versionHash = partition.getCommittedVersionHash();
long version = partition.getVisibleVersion();
long versionHash = partition.getVisibleVersionHash();

for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
Expand Down Expand Up @@ -3640,7 +3640,7 @@ public static void getDdlStmt(Table table, List<String> createTableStmt, List<St
Preconditions.checkState(partitionId.size() == 1);
partition = olapTable.getPartition(partitionId.get(0));
}
sb.append(Joiner.on(",").join(partition.getCommittedVersion(), partition.getCommittedVersionHash()))
sb.append(Joiner.on(",").join(partition.getVisibleVersion(), partition.getVisibleVersionHash()))
.append("\"");
}

Expand Down Expand Up @@ -3759,7 +3759,7 @@ public static void getDdlStmt(Table table, List<String> createTableStmt, List<St
sb.append(entry.getValue().upperEndpoint().toSql());

sb.append("(\"version_info\" = \"");
sb.append(Joiner.on(",").join(partition.getCommittedVersion(), partition.getCommittedVersionHash()))
sb.append(Joiner.on(",").join(partition.getVisibleVersion(), partition.getVisibleVersionHash()))
.append("\"");
if (replicationNum > 0) {
sb.append(", \"replication_num\" = \"").append(replicationNum).append("\"");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public synchronized long getNextId() {
return nextId++;
} else {
batchEndId = batchEndId + BATCH_ID_INTERVAL;
editLog.logSaveNextId(batchEndId);
if (editLog != null) {
// add this check just for unit test
editLog.logSaveNextId(batchEndId);
}
return nextId++;
}
}
Expand Down
6 changes: 3 additions & 3 deletions fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private static List<List<String>> getTabletStatus(String dbName, String tblName,

for (String partName : partitions) {
Partition partition = olapTable.getPartition(partName);
long committedVersion = partition.getCommittedVersion();
long visibleVersion = partition.getVisibleVersion();
short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId());

for (MaterializedIndex index : partition.getMaterializedIndices()) {
Expand All @@ -92,7 +92,7 @@ private static List<List<String>> getTabletStatus(String dbName, String tblName,
if (be == null || !be.isAvailable()) {
status = ReplicaStatus.DEAD;
} else {
if (replica.getVersion() < committedVersion
if (replica.getVersion() < visibleVersion
|| replica.getLastFailedVersion() > 0) {
status = ReplicaStatus.VERSION_ERROR;
}
Expand All @@ -108,7 +108,7 @@ private static List<List<String>> getTabletStatus(String dbName, String tblName,
row.add(String.valueOf(replica.getVersion()));
row.add(String.valueOf(replica.getLastFailedVersion()));
row.add(String.valueOf(replica.getLastSuccessVersion()));
row.add(String.valueOf(committedVersion));
row.add(String.valueOf(visibleVersion));
row.add(String.valueOf(replica.getVersionCount()));
row.add(replica.getState().name());
row.add(status.name());
Expand Down
10 changes: 5 additions & 5 deletions fe/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public Status resetIdsForRestore(Catalog catalog, Database db, int restoreReplic
for (Long beId : beIds) {
long newReplicaId = catalog.getNextId();
Replica replica = new Replica(newReplicaId, beId, ReplicaState.NORMAL,
partition.getCommittedVersion(), partition.getCommittedVersionHash());
partition.getVisibleVersion(), partition.getVisibleVersionHash());
newTablet.addReplica(replica, true /* is restore */);
}
}
Expand Down Expand Up @@ -586,8 +586,8 @@ public AlterTableStmt toAddPartitionStmt(String dbName, String partitionName) {

Partition partition = nameToPartition.get(partitionName);
Map<String, String> properties = Maps.newHashMap();
long version = partition.getCommittedVersion();
long versionHash = partition.getCommittedVersionHash();
long version = partition.getVisibleVersion();
long versionHash = partition.getVisibleVersionHash();
properties.put(PropertyAnalyzer.PROPERTIES_VERSION_INFO, version + "," + versionHash);
properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
String.valueOf(partitionInfo.getReplicationNum(partition.getId())));
Expand Down Expand Up @@ -624,8 +624,8 @@ public CreateTableStmt toCreateTableStmt(String dbName) {
// and partition version info here for non-partitioned table
Partition partition = getPartition(name);
Preconditions.checkNotNull(partition);
long version = partition.getCommittedVersion();
long versionHash = partition.getCommittedVersionHash();
long version = partition.getVisibleVersion();
long versionHash = partition.getVisibleVersionHash();
String versionProp = Joiner.on(",").join(version, versionHash);
properties.put(PropertyAnalyzer.PROPERTIES_VERSION_INFO, versionProp);
}
Expand Down
Loading