Skip to content

Commit

Permalink
[fix](publish) Pick Catch exception in genPublishTask to make one fai…
Browse files Browse the repository at this point in the history
…led txn does not block the other txns (#37724) (#38042)

Pick #37724
  • Loading branch information
mymeiyi authored Jul 18, 2024
1 parent ea457aa commit 7dd01b1
Showing 1 changed file with 54 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,58 +108,68 @@ private void traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState
if (transactionState.hasSendTask()) {
continue;
}
List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());

try {
beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState, tableCommitInfo));
} catch (MetaNotFoundException e) {
LOG.warn("exception occur when trying to get rollup tablets info", e);
}
try {
genPublishTask(allBackends, transactionState, createPublishVersionTaskTime, beIdToBaseTabletIds,
batchTask);
} catch (Throwable t) {
LOG.error("errors while generate publish task for transaction: {}", transactionState, t);
}
}
if (!batchTask.getAllTasks().isEmpty()) {
AgentTaskExecutor.submit(batchTask);
}
}

List<TPartitionVersionInfo> partitionVersionInfos = new ArrayList<>(partitionCommitInfos.size());
for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(commitInfo.getPartitionId(),
commitInfo.getVersion(), 0);
partitionVersionInfos.add(versionInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("try to publish version info partitionid [{}], version [{}]",
commitInfo.getPartitionId(),
commitInfo.getVersion());
}
}
private void genPublishTask(List<Long> allBackends, TransactionState transactionState,
long createPublishVersionTaskTime, Map<Long, Set<Long>> beIdToBaseTabletIds, AgentBatchTask batchTask) {
List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());

Set<Long> publishBackends = transactionState.getPublishVersionTasks().keySet();
// public version tasks are not persisted in catalog, so publishBackends may be empty.
// so we have to try publish to all backends;
if (publishBackends.isEmpty()) {
// could not just add to it, should new a new object, or the back map will destroyed
publishBackends = Sets.newHashSet();
publishBackends.addAll(allBackends);
try {
beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState, tableCommitInfo));
} catch (MetaNotFoundException e) {
LOG.warn("exception occur when trying to get rollup tablets info", e);
}
}

for (long backendId : publishBackends) {
PublishVersionTask task = new PublishVersionTask(backendId,
transactionState.getTransactionId(),
transactionState.getDbId(),
partitionVersionInfos,
createPublishVersionTaskTime);
task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, Collections.emptySet()));
// add to AgentTaskQueue for handling finish report.
// not check return value, because the add will success
AgentTaskQueue.addTask(task);
batchTask.addTask(task);
transactionState.addPublishVersionTask(backendId, task);
List<TPartitionVersionInfo> partitionVersionInfos = new ArrayList<>(partitionCommitInfos.size());
for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(commitInfo.getPartitionId(),
commitInfo.getVersion(), 0);
partitionVersionInfos.add(versionInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("try to publish version info partitionid [{}], version [{}]",
commitInfo.getPartitionId(),
commitInfo.getVersion());
}
transactionState.setSendedTask();
LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(),
transactionState.getDbId());
}
if (!batchTask.getAllTasks().isEmpty()) {
AgentTaskExecutor.submit(batchTask);

Set<Long> publishBackends = transactionState.getPublishVersionTasks().keySet();
// public version tasks are not persisted in catalog, so publishBackends may be empty.
// so we have to try publish to all backends;
if (publishBackends.isEmpty()) {
// could not just add to it, should new a new object, or the back map will destroyed
publishBackends = Sets.newHashSet();
publishBackends.addAll(allBackends);
}

for (long backendId : publishBackends) {
PublishVersionTask task = new PublishVersionTask(backendId,
transactionState.getTransactionId(),
transactionState.getDbId(),
partitionVersionInfos,
createPublishVersionTaskTime);
task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, Collections.emptySet()));
// add to AgentTaskQueue for handling finish report.
// not check return value, because the add will success
AgentTaskQueue.addTask(task);
batchTask.addTask(task);
transactionState.addPublishVersionTask(backendId, task);
}
transactionState.setSendedTask();
LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(),
transactionState.getDbId());
}

private void tryFinishTxn(List<TransactionState> readyTransactionStates,
Expand Down

0 comments on commit 7dd01b1

Please sign in to comment.