Skip to content

Commit

Permalink
[HUDI-5153] Fix the write token name resolution of cdc log file
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Nov 4, 2022
1 parent 2281175 commit 88af560
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,34 +279,26 @@ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, Ho

protected HoodieLogFormat.Writer createLogWriter(
Option<FileSlice> fileSlice, String baseCommitTime) throws IOException {
return createLogWriter(fileSlice, baseCommitTime, "");
return createLogWriter(fileSlice, baseCommitTime, null);
}

protected HoodieLogFormat.Writer createLogWriter(
Option<FileSlice> fileSlice, String baseCommitTime, String fileSuffix) throws IOException {
int logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
long logFileSize = 0L;
String logWriteToken = writeToken + fileSuffix;
String rolloverLogWriteToken = writeToken + fileSuffix;
if (fileSlice.isPresent()) {
Option<HoodieLogFile> latestLogFileOpt = fileSlice.get().getLatestLogFile();
if (latestLogFileOpt.isPresent()) {
HoodieLogFile latestLogFile = latestLogFileOpt.get();
logVersion = latestLogFile.getLogVersion();
logFileSize = latestLogFile.getFileSize();
logWriteToken = FSUtils.getWriteTokenFromLogPath(latestLogFile.getPath());
}
}
Option<FileSlice> fileSlice, String baseCommitTime, String suffix) throws IOException {
Option<HoodieLogFile> latestLogFile = fileSlice.isPresent()
? fileSlice.get().getLatestLogFile()
: Option.empty();

return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId)
.overBaseCommit(baseCommitTime)
.withLogVersion(logVersion)
.withFileSize(logFileSize)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
.withRolloverLogWriteToken(rolloverLogWriteToken)
.withLogWriteToken(logWriteToken)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withSuffix(suffix)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@
public class FSUtils {

private static final Logger LOG = LogManager.getLogger(FSUtils.class);
// Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1
// Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1_1-0-1
// Archive log files are of this pattern - .commits_.archive.1_1-0-1
private static final Pattern LOG_FILE_PATTERN =
Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)(-cdc)?))?");
Pattern.compile("\\.(.+)_(.*)\\.(.+)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(-cdc)?)?");
private static final String LOG_FILE_PREFIX = ".";
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
private static final long MIN_CLEAN_TO_KEEP = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class WriterBuilder {
private Path parentPath;
// Log File Write Token
private String logWriteToken;
// explicit file suffix
private String suffix;
// Rollover Log file write token
private String rolloverLogWriteToken;

Expand All @@ -164,6 +166,11 @@ public WriterBuilder withLogWriteToken(String logWriteToken) {
return this;
}

public WriterBuilder withSuffix(String suffix) {
this.suffix = suffix;
return this;
}

public WriterBuilder withFs(FileSystem fs) {
this.fs = fs;
return this;
Expand Down Expand Up @@ -250,6 +257,13 @@ public Writer build() throws IOException {
logWriteToken = rolloverLogWriteToken;
}

if (suffix != null) {
// a little hacky to simplify the logic: patch the write token with explicit suffix
// instead of adding a new extension
logWriteToken = logWriteToken + suffix;
rolloverLogWriteToken = rolloverLogWriteToken + suffix;
}

Path logPath = new Path(parentPath,
FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, logVersion, logWriteToken));
LOG.info("HoodieLogFile on path " + logPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
Expand Down Expand Up @@ -250,6 +251,42 @@ public void tesLogFileName() {
assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(rlPath));
}

@Test
public void testCdcLogFileName() {
String partitionPath = "2022/11/04/";
String fileName = UUID.randomUUID().toString();
String logFile = FSUtils.makeLogFileName(fileName, ".log", "100", 2, "1-0-1") + HoodieCDCUtils.CDC_LOGFILE_SUFFIX;
Path path = new Path(new Path(partitionPath), logFile);

assertTrue(FSUtils.isLogFile(path));
assertEquals("log", FSUtils.getFileExtensionFromLog(path));
assertEquals(fileName, FSUtils.getFileIdFromLogPath(path));
assertEquals("100", FSUtils.getBaseCommitTimeFromLogPath(path));
assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(path));
assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(path));
assertEquals(0, FSUtils.getStageIdFromLogPath(path));
assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(path));
assertEquals(2, FSUtils.getFileVersionFromLog(path));
}

@Test
public void testArchiveLogFileName() {
String partitionPath = "2022/11/04/";
String fileName = "commits";
String logFile = FSUtils.makeLogFileName(fileName, ".archive", "", 2, "1-0-1");
Path path = new Path(new Path(partitionPath), logFile);

assertFalse(FSUtils.isLogFile(path));
assertEquals("archive", FSUtils.getFileExtensionFromLog(path));
assertEquals(fileName, FSUtils.getFileIdFromLogPath(path));
assertEquals("", FSUtils.getBaseCommitTimeFromLogPath(path));
assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(path));
assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(path));
assertEquals(0, FSUtils.getStageIdFromLogPath(path));
assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(path));
assertEquals(2, FSUtils.getFileVersionFromLog(path));
}

/**
* Test Log File Comparisons when log files do not have write tokens.
*/
Expand Down

0 comments on commit 88af560

Please sign in to comment.