diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 807f14ca2883c..3cdc6e82d02a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -279,34 +279,26 @@ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, Ho protected HoodieLogFormat.Writer createLogWriter( Option fileSlice, String baseCommitTime) throws IOException { - return createLogWriter(fileSlice, baseCommitTime, ""); + return createLogWriter(fileSlice, baseCommitTime, null); } protected HoodieLogFormat.Writer createLogWriter( - Option fileSlice, String baseCommitTime, String fileSuffix) throws IOException { - int logVersion = HoodieLogFile.LOGFILE_BASE_VERSION; - long logFileSize = 0L; - String logWriteToken = writeToken + fileSuffix; - String rolloverLogWriteToken = writeToken + fileSuffix; - if (fileSlice.isPresent()) { - Option latestLogFileOpt = fileSlice.get().getLatestLogFile(); - if (latestLogFileOpt.isPresent()) { - HoodieLogFile latestLogFile = latestLogFileOpt.get(); - logVersion = latestLogFile.getLogVersion(); - logFileSize = latestLogFile.getFileSize(); - logWriteToken = FSUtils.getWriteTokenFromLogPath(latestLogFile.getPath()); - } - } + Option fileSlice, String baseCommitTime, String suffix) throws IOException { + Option latestLogFile = fileSlice.isPresent() + ? fileSlice.get().getLatestLogFile() + : Option.empty(); + return HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath)) .withFileId(fileId) .overBaseCommit(baseCommitTime) - .withLogVersion(logVersion) - .withFileSize(logFileSize) + .withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION)) + .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L)) .withSizeThreshold(config.getLogFileMaxSize()) .withFs(fs) - .withRolloverLogWriteToken(rolloverLogWriteToken) - .withLogWriteToken(logWriteToken) + .withRolloverLogWriteToken(writeToken) + .withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken)) + .withSuffix(suffix) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index eb5fbe917985c..e73fbee3df0e9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -77,9 +77,10 @@ public class FSUtils { private static final Logger LOG = LogManager.getLogger(FSUtils.class); - // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1 + // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1_1-0-1 + // Archive log files are of this pattern - .commits_.archive.1_1-0-1 private static final Pattern LOG_FILE_PATTERN = - Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)(-cdc)?))?"); + Pattern.compile("\\.(.+)_(.*)\\.(.+)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(-cdc)?)?"); private static final String LOG_FILE_PREFIX = "."; private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; private static final long MIN_CLEAN_TO_KEEP = 10; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index 569b4a23b683b..0bd1f451b88b7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -141,6 +141,8 @@ class WriterBuilder { private Path parentPath; // Log File Write Token private String logWriteToken; + // explicit file suffix + private String suffix; // Rollover Log file write token private String rolloverLogWriteToken; @@ -164,6 +166,11 @@ public WriterBuilder withLogWriteToken(String logWriteToken) { return this; } + public WriterBuilder withSuffix(String suffix) { + this.suffix = suffix; + return this; + } + public WriterBuilder withFs(FileSystem fs) { this.fs = fs; return this; @@ -250,6 +257,13 @@ public Writer build() throws IOException { logWriteToken = rolloverLogWriteToken; } + if (suffix != null) { + // a little hacky to simplify the logic: patch the write token with explicit suffix + // instead of adding a new extension + logWriteToken = logWriteToken + suffix; + rolloverLogWriteToken = rolloverLogWriteToken + suffix; + } + Path logPath = new Path(parentPath, FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, logVersion, logWriteToken)); LOG.info("HoodieLogFile on path " + logPath); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index 481bb1dd452da..d8bdaca0782d9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -250,6 +251,42 @@ public void tesLogFileName() { assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(rlPath)); } + @Test + public void testCdcLogFileName() { + String partitionPath = "2022/11/04/"; + String fileName = UUID.randomUUID().toString(); + String logFile = FSUtils.makeLogFileName(fileName, ".log", "100", 2, "1-0-1") + HoodieCDCUtils.CDC_LOGFILE_SUFFIX; + Path path = new Path(new Path(partitionPath), logFile); + + assertTrue(FSUtils.isLogFile(path)); + assertEquals("log", FSUtils.getFileExtensionFromLog(path)); + assertEquals(fileName, FSUtils.getFileIdFromLogPath(path)); + assertEquals("100", FSUtils.getBaseCommitTimeFromLogPath(path)); + assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(path)); + assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(path)); + assertEquals(0, FSUtils.getStageIdFromLogPath(path)); + assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(path)); + assertEquals(2, FSUtils.getFileVersionFromLog(path)); + } + + @Test + public void testArchiveLogFileName() { + String partitionPath = "2022/11/04/"; + String fileName = "commits"; + String logFile = FSUtils.makeLogFileName(fileName, ".archive", "", 2, "1-0-1"); + Path path = new Path(new Path(partitionPath), logFile); + + assertFalse(FSUtils.isLogFile(path)); + assertEquals("archive", FSUtils.getFileExtensionFromLog(path)); + assertEquals(fileName, FSUtils.getFileIdFromLogPath(path)); + assertEquals("", FSUtils.getBaseCommitTimeFromLogPath(path)); + assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(path)); + assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(path)); + assertEquals(0, FSUtils.getStageIdFromLogPath(path)); + assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(path)); + assertEquals(2, FSUtils.getFileVersionFromLog(path)); + } + /** * Test Log File Comparisons when log files do not have write tokens. */