Skip to content

Commit

Permalink
[fix](publish) Fix publish failed because because "task" is null (#37531
Browse files Browse the repository at this point in the history
)

## Proposed changes

```
2024-07-08 00:43:27,149 ERROR (PUBLISH_VERSION|33) [PublishVersionDaemon.runAfterCatalogReady():73] errors while publish version to all backends
java.lang.NullPointerException: Cannot invoke "org.apache.doris.task.PublishVersionTask.isFinished()" because "task" is null
at org.apache.doris.transaction.PublishVersionDaemon.lambda$tryFinishTxn$0(PublishVersionDaemon.java:163) ~[doris-fe.jar:1.2-SNAPSHOT]
at java.util.HashMap.forEach(HashMap.java:1421) ~[?:?]
at org.apache.doris.transaction.PublishVersionDaemon.tryFinishTxn(PublishVersionDaemon.java:160) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.transaction.PublishVersionDaemon.publishVersion(PublishVersionDaemon.java:96) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.transaction.PublishVersionDaemon.runAfterCatalogReady(PublishVersionDaemon.java:70) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.common.util.MasterDaemon.runOneCycle(MasterDaemon.java:58) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.common.util.Daemon.run(Daemon.java:116) ~[doris-fe.jar:1.2-SNAPSHOT]
```


1. When try finish one txn, catch the exception to make the failed txn
does not block the other txns.
2. In the original way, when commit txn, add a <be_id, null publish
task> to publish tasks, and then when publish txn, reset the null
publish task to a new publish task.
This pr modify it to when commit txn, record the involved be ids, and
then when publish txn, generate the publish tasks to all involved bes.
3. There is also a bug of `tableIdToTabletDeltaRows` in transaction
state, it records all ready txn infos, because the variable scope is out
of `for (TransactionState transactionState : readyTransactionStates)`
  • Loading branch information
mymeiyi authored and dataroaring committed Jul 17, 2024
1 parent 5313143 commit ed9b3e9
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,9 @@ public void setTableIdTabletsDeltaRows(Map<Long, Map<Long, Long>> tableIdToTable
public Map<Long, Map<Long, Long>> getTableIdToTabletDeltaRows() {
return tableIdToTabletDeltaRows;
}

@Override
public String toString() {
return super.toString() + ", txnId=" + transactionId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1475,12 +1475,7 @@ protected void unprotectedPreCommitTransaction2PC(TransactionState transactionSt
}
// persist transactionState
unprotectUpsertTransactionState(transactionState, false);

// add publish version tasks. set task to null as a placeholder.
// tasks will be created when publishing version.
for (long backendId : totalInvolvedBackends) {
transactionState.addPublishVersionTask(backendId, null);
}
transactionState.setInvolvedBackends(totalInvolvedBackends);
}

private PartitionCommitInfo generatePartitionCommitInfo(OlapTable table, long partitionId, long partitionVersion) {
Expand Down Expand Up @@ -1508,12 +1503,7 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S
}
// persist transactionState
unprotectUpsertTransactionState(transactionState, false);

// add publish version tasks. set task to null as a placeholder.
// tasks will be created when publishing version.
for (long backendId : totalInvolvedBackends) {
transactionState.addPublishVersionTask(backendId, null);
}
transactionState.setInvolvedBackends(totalInvolvedBackends);
}

private void checkBeforeUnprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds) {
Expand Down Expand Up @@ -1581,9 +1571,6 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S
}
// persist transactionState
unprotectUpsertTransactionState(transactionState, false);

// add publish version tasks. set task to null as a placeholder.
// tasks will be created when publishing version.
transactionState.setInvolvedBackends(totalInvolvedBackends);
}

Expand Down Expand Up @@ -2686,7 +2673,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
PublishVersionTask publishVersionTask = null;
if (publishVersionTasks != null) {
List<PublishVersionTask> matchedTasks = publishVersionTasks.stream()
.filter(t -> t.getTransactionId() == subTransactionId
.filter(t -> t != null && t.getTransactionId() == subTransactionId
&& t.getPartitionVersionInfos().stream()
.anyMatch(s -> s.getPartitionId() == partitionId))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,81 +152,93 @@ private void genPublishTask(List<Long> allBackends, TransactionState transaction
private void tryFinishTxn(List<TransactionState> readyTransactionStates,
SystemInfoService infoService, GlobalTransactionMgrIface globalTransactionMgr,
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap();
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
AtomicBoolean hasBackendAliveAndUnfinishedTask = new AtomicBoolean(false);
Set<Long> notFinishTaskBe = Sets.newHashSet();
transactionState.getPublishVersionTasks().forEach((key, tasks) -> {
long beId = key;
for (PublishVersionTask task : tasks) {
if (task.isFinished()) {
calculateTaskUpdateRows(tableIdToTabletDeltaRows, task);
} else {
if (infoService.checkBackendAlive(task.getBackendId())) {
hasBackendAliveAndUnfinishedTask.set(true);
}
notFinishTaskBe.add(beId);
try {
// try to finish the transaction, if failed just retry in next loop
tryFinishOneTxn(transactionState, infoService, globalTransactionMgr, partitionVisibleVersions,
backendPartitions);
} catch (Throwable t) {
LOG.error("errors while finish transaction: {}, publish tasks: {}", transactionState,
transactionState.getPublishVersionTasks(), t);
}
} // end for readyTransactionStates
}

private void tryFinishOneTxn(TransactionState transactionState, SystemInfoService infoService,
GlobalTransactionMgrIface globalTransactionMgr,
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap();
AtomicBoolean hasBackendAliveAndUnfinishedTask = new AtomicBoolean(false);
Set<Long> notFinishTaskBe = Sets.newHashSet();
transactionState.getPublishVersionTasks().forEach((key, tasks) -> {
long beId = key;
for (PublishVersionTask task : tasks) {
if (task.isFinished()) {
calculateTaskUpdateRows(tableIdToTabletDeltaRows, task);
} else {
if (infoService.checkBackendAlive(task.getBackendId())) {
hasBackendAliveAndUnfinishedTask.set(true);
}
notFinishTaskBe.add(beId);
}
});
}
});

transactionState.setTableIdToTabletDeltaRows(tableIdToTabletDeltaRows);
if (LOG.isDebugEnabled()) {
LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe, transactionState);
transactionState.setTableIdToTabletDeltaRows(tableIdToTabletDeltaRows);
if (LOG.isDebugEnabled()) {
LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe, transactionState);
}
boolean isPublishSlow = false;
long totalNum = transactionState.getPublishVersionTasks().keySet().size();
boolean allUnFinishTaskIsSlow = notFinishTaskBe.stream().allMatch(beId -> {
Backend be = infoService.getBackend(beId);
if (be == null) {
return false;
}
boolean isPublishSlow = false;
long totalNum = transactionState.getPublishVersionTasks().keySet().size();
boolean allUnFinishTaskIsSlow = notFinishTaskBe.stream().allMatch(beId -> {
Backend be = infoService.getBackend(beId);
if (be == null) {
return false;
}
return be.getPublishTaskLastTimeAccumulated() > Config.publish_version_queued_limit_number;
});
if (totalNum - notFinishTaskBe.size() > totalNum / 2 && allUnFinishTaskIsSlow) {
if (LOG.isDebugEnabled()) {
LOG.debug(" finishNum {}, txn publish tasks {}, notFinishTaskBe {}",
totalNum - notFinishTaskBe.size(), transactionState.getPublishVersionTasks().keySet(),
notFinishTaskBe);
}
isPublishSlow = true;
return be.getPublishTaskLastTimeAccumulated() > Config.publish_version_queued_limit_number;
});
if (totalNum - notFinishTaskBe.size() > totalNum / 2 && allUnFinishTaskIsSlow) {
if (LOG.isDebugEnabled()) {
LOG.debug(" finishNum {}, txn publish tasks {}, notFinishTaskBe {}",
totalNum - notFinishTaskBe.size(), transactionState.getPublishVersionTasks().keySet(),
notFinishTaskBe);
}
isPublishSlow = true;
}

boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask.get() || transactionState.isPublishTimeout()
|| isPublishSlow
|| DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions);
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e);
}
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
// if finish transaction state failed, then update publish version time, should check
// to finish after some interval
transactionState.updateSendTaskTime();
if (LOG.isDebugEnabled()) {
LOG.debug("publish version for transaction {} failed", transactionState);
}
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask.get() || transactionState.isPublishTimeout()
|| isPublishSlow
|| DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions);
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e);
}
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
// if finish transaction state failed, then update publish version time, should check
// to finish after some interval
transactionState.updateSendTaskTime();
if (LOG.isDebugEnabled()) {
LOG.debug("publish version for transaction {} failed", transactionState);
}
}
}

if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
transactionState.getPublishVersionTasks().values().forEach(tasks -> {
for (PublishVersionTask task : tasks) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
}
});
transactionState.pruneAfterVisible();
if (MetricRepo.isInit) {
long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
transactionState.getPublishVersionTasks().values().forEach(tasks -> {
for (PublishVersionTask task : tasks) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
}
});
transactionState.pruneAfterVisible();
if (MetricRepo.isInit) {
long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
}
} // end for readyTransactionStates
}
}

// Merge task tablets update rows to tableToTabletsDelta.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public void setErrorReplicas(Set<Long> newErrorReplicas) {
}

public void addPublishVersionTask(Long backendId, PublishVersionTask task) {
if (this.subTxnIdToTableCommitInfo.isEmpty()) {
if (this.subTxnIds == null) {
this.publishVersionTasks.put(backendId, Lists.newArrayList(task));
} else {
this.publishVersionTasks.computeIfAbsent(backendId, k -> Lists.newArrayList()).add(task);
Expand Down Expand Up @@ -822,7 +822,7 @@ public String getErrMsg() {
public void pruneAfterVisible() {
publishVersionTasks.clear();
tableIdToTabletDeltaRows.clear();
// TODO if subTransactionStates can be cleared?
involvedBackends.clear();
}

public void setSchemaForPartialUpdate(OlapTable olapTable) {
Expand Down

0 comments on commit ed9b3e9

Please sign in to comment.