Skip to content

Commit

Permalink
[HUDI-5223] Partial failover for flink (#7208)
Browse files Browse the repository at this point in the history
Before the patch, when there are partial failover within the write tasks, the write task current instant was initialized as the latest inflight instant, the write task then waits for a new instant to write with so hangs and failover continuously.

For a task recovered from failover (with attempt number greater than 0), the latest inflight instant can actually be reused, the intermediate data files can be cleaned with MARGER files post commit.
  • Loading branch information
danny0405 authored Nov 16, 2022
1 parent 7e7b3a8 commit d3f9577
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,7 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName,
break;
}

// Override the old file name,
// In rare cases, when a checkpoint was aborted and the instant time
// is reused, the merge handle generates a new file name
// with the reused instant time of last checkpoint, which is duplicate,
// use the same name file as new base file in case data loss.
oldFilePath = newFilePath;
rolloverPaths.add(oldFilePath);
rolloverPaths.add(newFilePath);
newFileName = newFileNameWithRollover(rollNumber++);
newFilePath = makeNewFilePath(partitionPath, newFileName);
LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,9 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
// -------------------------------------------------------------------------

private void restoreWriteMetadata() throws Exception {
String lastInflight = lastPendingInstant();
boolean eventSent = false;
for (WriteMetadataEvent event : this.writeMetadataState.get()) {
if (Objects.equals(lastInflight, event.getInstantTime())) {
if (Objects.equals(this.currentInstant, event.getInstantTime())) {
// Reset taskID for event
event.setTaskID(taskID);
// The checkpoint succeed but the meta does not commit,
Expand All @@ -211,6 +210,15 @@ private void restoreWriteMetadata() throws Exception {
}

private void sendBootstrapEvent() {
int attemptId = getRuntimeContext().getAttemptNumber();
if (attemptId > 0) {
// either a partial or global failover, reuses the current inflight instant
if (this.currentInstant != null) {
LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]", taskID, this.currentInstant, attemptId);
this.currentInstant = null;
}
return;
}
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}
Expand Down

0 comments on commit d3f9577

Please sign in to comment.