From d3f957755abf76c64ff06fac6d857cba9bdbbacf Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 16 Nov 2022 14:47:38 +0800 Subject: [PATCH] [HUDI-5223] Partial failover for flink (#7208) Before the patch, when there are partial failover within the write tasks, the write task current instant was initialized as the latest inflight instant, the write task then waits for a new instant to write with so hangs and failover continuously. For a task recovered from failover (with attempt number greater than 0), the latest inflight instant can actually be reused, the intermediate data files can be cleaned with MARGER files post commit. --- .../java/org/apache/hudi/io/FlinkMergeHandle.java | 8 +------- .../sink/common/AbstractStreamWriteFunction.java | 12 ++++++++++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 69121a9a0483..a44783f99e43 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -143,13 +143,7 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, break; } - // Override the old file name, - // In rare cases, when a checkpoint was aborted and the instant time - // is reused, the merge handle generates a new file name - // with the reused instant time of last checkpoint, which is duplicate, - // use the same name file as new base file in case data loss. - oldFilePath = newFilePath; - rolloverPaths.add(oldFilePath); + rolloverPaths.add(newFilePath); newFileName = newFileNameWithRollover(rollNumber++); newFilePath = makeNewFilePath(partitionPath, newFileName); LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 674cd3588aaf..1f2394618464 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -192,10 +192,9 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { // ------------------------------------------------------------------------- private void restoreWriteMetadata() throws Exception { - String lastInflight = lastPendingInstant(); boolean eventSent = false; for (WriteMetadataEvent event : this.writeMetadataState.get()) { - if (Objects.equals(lastInflight, event.getInstantTime())) { + if (Objects.equals(this.currentInstant, event.getInstantTime())) { // Reset taskID for event event.setTaskID(taskID); // The checkpoint succeed but the meta does not commit, @@ -211,6 +210,15 @@ private void restoreWriteMetadata() throws Exception { } private void sendBootstrapEvent() { + int attemptId = getRuntimeContext().getAttemptNumber(); + if (attemptId > 0) { + // either a partial or global failover, reuses the current inflight instant + if (this.currentInstant != null) { + LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]", taskID, this.currentInstant, attemptId); + this.currentInstant = null; + } + return; + } this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID)); LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); }