diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index a85df2a2307d..82c6de576149 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -195,8 +195,10 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo writeStatus.getStat().setFileId(fileId); setWriteStatusPath(); - // Create Marker file - createMarkerFile(partitionPath, newFileName); + // Create Marker file, + // uses name of `newFilePath` instead of `newFileName` + // in case the sub-class may roll over the file handle name. + createMarkerFile(partitionPath, newFilePath.getName()); // Create the writer for writing the new version file fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java index 24da25b20be1..cf912f620a56 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java @@ -137,10 +137,8 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write. */ protected String newFileNameWithRollover(int rollNumber) { - // make the intermediate file as hidden - final String fileID = "." + this.fileId; return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber, - fileID, hoodieTable.getBaseFileExtension()); + this.fileId, hoodieTable.getBaseFileExtension()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index e1117712634c..1bff89713b7f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -158,11 +158,17 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write. */ protected String newFileNameWithRollover(int rollNumber) { - // make the intermediate file as hidden return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber, this.fileId, hoodieTable.getBaseFileExtension()); } + @Override + protected void setWriteStatusPath() { + // if there was rollover, should set up the path as the initial new file path. + Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath; + writeStatus.getStat().setPath(new Path(config.getBasePath()), path); + } + @Override public List close() { try {