diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java index 816a93718706d..4874fe7bb9ffe 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java @@ -22,16 +22,18 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.fs.ConsistencyGuardConfig; -import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -46,11 +48,16 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_COMMITS; import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.table.TestCleaner.insertFirstBigBatchForClientCleanerTest; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; @@ -117,7 +124,7 @@ private void testInsertAndCleanByCommits( int maxCommits = 3; // keep upto 3 commits from the past HoodieWriteConfig cfg = getConfigBuilder(true) .withCleanConfig(HoodieCleanConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .withCleanerPolicy(KEEP_LATEST_COMMITS) .withAsyncClean(isAsync).retainCommits(maxCommits).build()) .withParallelism(PARALLELISM, PARALLELISM) .withBulkInsertParallelism(PARALLELISM) @@ -138,6 +145,7 @@ private void testInsertAndCleanByCommits( HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, recordInsertGenWrappedFunction, insertFn); + Map> commitWriteStatsMap = new HashMap<>(); // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. for (int i = 0; i < 8; i++) { String newCommitTime = makeNewCommitTime(); @@ -147,42 +155,106 @@ private void testInsertAndCleanByCommits( List statuses = upsertFn.apply(client, jsc().parallelize(records, PARALLELISM), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); + commitWriteStatsMap.put( + newCommitTime, + statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList())); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table1 = HoodieSparkTable.create(cfg, context(), metaClient); - HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); - HoodieInstant lastInstant = activeTimeline.lastInstant().get(); - if (cfg.isAsyncClean()) { - activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp()); + validateFilesAfterCleaning( + HoodieSparkTable.create(cfg, context(), metaClient), + commitWriteStatsMap, + dataGen.getPartitionPaths()); + } + } + } + + /** + * Validates the data files in a Hudi table based on the `KEEP_LATEST_COMMITS` cleaner policy. + * + * @param table {@link HoodieTable} instance. + * @param commitWriteStatsMap The cache for the list of write stats of each commit. + * @param partitionPaths List of partitions to validate. + */ + private void validateFilesAfterCleaning( + HoodieTable table, + Map> commitWriteStatsMap, + String[] partitionPaths) { + assertEquals(KEEP_LATEST_COMMITS, table.getConfig().getCleanerPolicy()); + boolean isAsyncClean = table.getConfig().isAsyncClean(); + int maxCommitsToRetain = table.getConfig().getCleanerCommitsRetained(); + HoodieTimeline commitsTimeline = table.getCompletedCommitsTimeline(); + HoodieInstant lastInstant = commitsTimeline.lastInstant().get(); + + if (isAsyncClean) { + commitsTimeline = commitsTimeline.findInstantsBefore(lastInstant.getTimestamp()); + } + // This corresponds to the `earliestCommitToRetain` in {@code CleanPlanner::getFilesToCleanKeepingLatestCommits} + Option earliestRetainedCommit = commitsTimeline.nthFromLastInstant(maxCommitsToRetain - 1); + // A final timeline to be used in Lambda function + HoodieTimeline timeline = commitsTimeline; + // Mapping of to expected set of instant timestamps + Map, Set> expectedInstantTimeMap = new HashMap<>(); + TableFileSystemView fsView = table.getFileSystemView(); + // Remaining file groups to figure out the one version before earliestRetainedCommit + Set> remainingFileGroupSet = new HashSet<>(); + + for (String partitionPath : partitionPaths) { + remainingFileGroupSet.addAll( + fsView.getAllFileGroups(partitionPath) + .map(fileGroup -> Pair.of(partitionPath, fileGroup.getFileGroupId().getFileId())) + .collect(Collectors.toList())); + } + // With KEEP_LATEST_COMMITS cleaner policy, for each file group, we need to figure out + // the latest version before earliestCommitToRetain, which is also kept from cleaning. + // The timeline of commits is traversed in reverse order to achieve this. + for (HoodieInstant instant : commitsTimeline.getReverseOrderedInstants().collect(Collectors.toList())) { + List hoodieWriteStatList = commitWriteStatsMap.computeIfAbsent(instant.getTimestamp(), newInstant -> { + try { + return HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails( + timeline.filter(inst -> inst.getTimestamp().equals(newInstant)) + .firstInstant().get()).get(), + HoodieCommitMetadata.class) + .getWriteStats(); + } catch (IOException e) { + return Collections.EMPTY_LIST; } - // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest - // commit - Option earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits); - Set acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet()); - if (earliestRetainedCommit.isPresent()) { - acceptableCommits - .removeAll(activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp()) - .getInstants().collect(Collectors.toSet())); - acceptableCommits.add(earliestRetainedCommit.get()); + }); + hoodieWriteStatList.forEach(writeStat -> { + Pair partitionFileIdPair = Pair.of(writeStat.getPartitionPath(), writeStat.getFileId()); + if (remainingFileGroupSet.contains(partitionFileIdPair)) { + if (earliestRetainedCommit.isPresent() + && HoodieTimeline.compareTimestamps( + instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestRetainedCommit.get().getTimestamp())) { + remainingFileGroupSet.remove(partitionFileIdPair); + } + expectedInstantTimeMap.computeIfAbsent(partitionFileIdPair, k -> new HashSet<>()) + .add(instant.getTimestamp()); } + }); + if (remainingFileGroupSet.isEmpty()) { + break; + } + } - TableFileSystemView fsView = table1.getFileSystemView(); - // Need to ensure the following - for (String partitionPath : dataGen.getPartitionPaths()) { - List fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); - for (HoodieFileGroup fileGroup : fileGroups) { - Set commitTimes = new HashSet<>(); - fileGroup.getAllBaseFiles().forEach(value -> { - LOG.debug("Data File - " + value); - commitTimes.add(value.getCommitTime()); - }); - if (cfg.isAsyncClean()) { - commitTimes.remove(lastInstant.getTimestamp()); - } - assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes, - "Only contain acceptable versions of file should be present"); - } + // Need to ensure the following + for (String partitionPath : partitionPaths) { + List fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); + for (HoodieFileGroup fileGroup : fileGroups) { + Set commitTimes = new HashSet<>(); + fileGroup.getAllBaseFiles().forEach(value -> { + LOG.debug("Data File - " + value); + commitTimes.add(value.getCommitTime()); + }); + if (isAsyncClean) { + commitTimes.remove(lastInstant.getTimestamp()); } + + assertEquals( + expectedInstantTimeMap.get( + Pair.of(partitionPath, fileGroup.getFileGroupId().getFileId())), + commitTimes, + "Only contain acceptable versions of file should be present"); } } }