Skip to content

Commit

Permalink
[HUDI-6401] Should not throw exception when create marker file for lo…
Browse files Browse the repository at this point in the history
…g file (#9003)
  • Loading branch information
guanziyue authored Jun 19, 2023
1 parent 94adcf2 commit ea72350
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,8 @@ protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback {

@Override
public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
// we use create rather than createIfNotExists because create method can trigger marker-based early conflict detection.
WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
return writeMarkers.create(partitionPath, logFileToAppend.getFileName(), IOType.APPEND,
return writeMarkers.createIfNotExists(partitionPath, logFileToAppend.getFileName(), IOType.APPEND,
config, fileId, hoodieTable.getMetaClient().getActiveTimeline()).isPresent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,34 @@ public Option<Path> createIfNotExists(String partitionPath, String fileName, IOT
return create(partitionPath, fileName, type, true);
}

/**
* Creates a marker if the marker does not exist.
* This can invoke marker-based early conflict detection when enabled for multi-writers.
*
* @param partitionPath partition path in the table
* @param fileName file name
* @param type write IO type
* @param writeConfig Hudi write configs.
* @param fileId File ID.
* @param activeTimeline Active timeline for the write operation.
* @return the marker path.
*/
public Option<Path> createIfNotExists(String partitionPath, String fileName, IOType type, HoodieWriteConfig writeConfig,
String fileId, HoodieActiveTimeline activeTimeline) {
if (writeConfig.isEarlyConflictDetectionEnable()
&& writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
HoodieTimeline pendingCompactionTimeline = activeTimeline.filterPendingCompactionTimeline();
HoodieTimeline pendingReplaceTimeline = activeTimeline.filterPendingReplaceTimeline();
// TODO If current is compact or clustering then create marker directly without early conflict detection.
// Need to support early conflict detection between table service and common writers.
if (pendingCompactionTimeline.containsInstant(instantTime) || pendingReplaceTimeline.containsInstant(instantTime)) {
return create(partitionPath, fileName, type, true);
}
return createWithEarlyConflictDetection(partitionPath, fileName, type, false, writeConfig, fileId, activeTimeline);
}
return create(partitionPath, fileName, type, true);
}

/**
* Quietly deletes the marker directory.
*
Expand Down

0 comments on commit ea72350

Please sign in to comment.