diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 69121a9a0483..a44783f99e43 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -143,13 +143,7 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, break; } - // Override the old file name, - // In rare cases, when a checkpoint was aborted and the instant time - // is reused, the merge handle generates a new file name - // with the reused instant time of last checkpoint, which is duplicate, - // use the same name file as new base file in case data loss. - oldFilePath = newFilePath; - rolloverPaths.add(oldFilePath); + rolloverPaths.add(newFilePath); newFileName = newFileNameWithRollover(rollNumber++); newFilePath = makeNewFilePath(partitionPath, newFileName); LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 674cd3588aaf..1f2394618464 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -192,10 +192,9 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { // ------------------------------------------------------------------------- private void restoreWriteMetadata() throws Exception { - String lastInflight = lastPendingInstant(); boolean eventSent = false; for (WriteMetadataEvent event : this.writeMetadataState.get()) { - if (Objects.equals(lastInflight, event.getInstantTime())) { + if (Objects.equals(this.currentInstant, event.getInstantTime())) { // Reset taskID for event event.setTaskID(taskID); // The checkpoint succeed but the meta does not commit, @@ -211,6 +210,15 @@ private void restoreWriteMetadata() throws Exception { } private void sendBootstrapEvent() { + int attemptId = getRuntimeContext().getAttemptNumber(); + if (attemptId > 0) { + // either a partial or global failover, reuses the current inflight instant + if (this.currentInstant != null) { + LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]", taskID, this.currentInstant, attemptId); + this.currentInstant = null; + } + return; + } this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID)); LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); }