Skip to content

Commit

Permalink
[HUDI-4255] Make the flink merge and replace handle intermediate file…
Browse files Browse the repository at this point in the history
… visible (apache#5866)
  • Loading branch information
danny0405 authored Jun 15, 2022
1 parent 25bbff6 commit 0811bb3
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,10 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
writeStatus.getStat().setFileId(fileId);
setWriteStatusPath();

// Create Marker file
createMarkerFile(partitionPath, newFileName);
// Create Marker file,
// uses name of `newFilePath` instead of `newFileName`
// in case the sub-class may roll over the file handle name.
createMarkerFile(partitionPath, newFilePath.getName());

// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName,
* Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
*/
protected String newFileNameWithRollover(int rollNumber) {
// make the intermediate file as hidden
final String fileID = "." + this.fileId;
return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber,
fileID, hoodieTable.getBaseFileExtension());
this.fileId, hoodieTable.getBaseFileExtension());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,17 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName,
* Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
*/
protected String newFileNameWithRollover(int rollNumber) {
// make the intermediate file as hidden
return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber,
this.fileId, hoodieTable.getBaseFileExtension());
}

@Override
protected void setWriteStatusPath() {
// if there was rollover, should set up the path as the initial new file path.
Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
writeStatus.getStat().setPath(new Path(config.getBasePath()), path);
}

@Override
public List<WriteStatus> close() {
try {
Expand Down

0 comments on commit 0811bb3

Please sign in to comment.