From ff817d9009cca6d3c337e728b16d289897cd4466 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 9 Dec 2022 14:03:47 -0800 Subject: [PATCH] [HUDI-5358] Fix flaky tests in TestCleanerInsertAndCleanByCommits (#7420) This PR fixes the validation logic inside cleaner tests in TestCleanerInsertAndCleanByCommits. In the tests, the KEEP_LATEST_COMMITS cleaner policy is used. This policy first figures out the earliest commit to retain based on the config of the number of retained commits (hoodie.cleaner.commits.retained). Then, for each file group, one more version before the earliest commit to retain is also kept from cleaning. The commit for the version can be different among file groups. For example, given the following commits, with x denoting a base file written for the corresponding commit and file group, c1 c2 c3 c4 c5 fg1: x x x x fg2: x x x x with hoodie.cleaner.commits.retained=3, no files are cleaned. However, the current validation logic only statically picks the one commit before the earliest commit to retain in the Hudi timeline, i.e., c2 in the above example, for all file groups, marking the base file written by c1 for fg1 to be expected for cleaning, which does not match the KEEP_LATEST_COMMITS cleaner policy. The validation logic is fixed according to the expected cleaner behavior. --- .../TestCleanerInsertAndCleanByCommits.java | 136 +++++++++++++----- 1 file changed, 104 insertions(+), 32 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java index 816a93718706d..4874fe7bb9ffe 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java @@ -22,16 +22,18 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.fs.ConsistencyGuardConfig; -import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -46,11 +48,16 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_COMMITS; import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.table.TestCleaner.insertFirstBigBatchForClientCleanerTest; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; @@ -117,7 +124,7 @@ private void testInsertAndCleanByCommits( int maxCommits = 3; // keep upto 3 commits from the past HoodieWriteConfig cfg = getConfigBuilder(true) .withCleanConfig(HoodieCleanConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .withCleanerPolicy(KEEP_LATEST_COMMITS) .withAsyncClean(isAsync).retainCommits(maxCommits).build()) .withParallelism(PARALLELISM, PARALLELISM) .withBulkInsertParallelism(PARALLELISM) @@ -138,6 +145,7 @@ private void testInsertAndCleanByCommits( HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, recordInsertGenWrappedFunction, insertFn); + Map> commitWriteStatsMap = new HashMap<>(); // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. for (int i = 0; i < 8; i++) { String newCommitTime = makeNewCommitTime(); @@ -147,42 +155,106 @@ private void testInsertAndCleanByCommits( List statuses = upsertFn.apply(client, jsc().parallelize(records, PARALLELISM), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); + commitWriteStatsMap.put( + newCommitTime, + statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList())); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table1 = HoodieSparkTable.create(cfg, context(), metaClient); - HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); - HoodieInstant lastInstant = activeTimeline.lastInstant().get(); - if (cfg.isAsyncClean()) { - activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp()); + validateFilesAfterCleaning( + HoodieSparkTable.create(cfg, context(), metaClient), + commitWriteStatsMap, + dataGen.getPartitionPaths()); + } + } + } + + /** + * Validates the data files in a Hudi table based on the `KEEP_LATEST_COMMITS` cleaner policy. + * + * @param table {@link HoodieTable} instance. + * @param commitWriteStatsMap The cache for the list of write stats of each commit. + * @param partitionPaths List of partitions to validate. + */ + private void validateFilesAfterCleaning( + HoodieTable table, + Map> commitWriteStatsMap, + String[] partitionPaths) { + assertEquals(KEEP_LATEST_COMMITS, table.getConfig().getCleanerPolicy()); + boolean isAsyncClean = table.getConfig().isAsyncClean(); + int maxCommitsToRetain = table.getConfig().getCleanerCommitsRetained(); + HoodieTimeline commitsTimeline = table.getCompletedCommitsTimeline(); + HoodieInstant lastInstant = commitsTimeline.lastInstant().get(); + + if (isAsyncClean) { + commitsTimeline = commitsTimeline.findInstantsBefore(lastInstant.getTimestamp()); + } + // This corresponds to the `earliestCommitToRetain` in {@code CleanPlanner::getFilesToCleanKeepingLatestCommits} + Option earliestRetainedCommit = commitsTimeline.nthFromLastInstant(maxCommitsToRetain - 1); + // A final timeline to be used in Lambda function + HoodieTimeline timeline = commitsTimeline; + // Mapping of to expected set of instant timestamps + Map, Set> expectedInstantTimeMap = new HashMap<>(); + TableFileSystemView fsView = table.getFileSystemView(); + // Remaining file groups to figure out the one version before earliestRetainedCommit + Set> remainingFileGroupSet = new HashSet<>(); + + for (String partitionPath : partitionPaths) { + remainingFileGroupSet.addAll( + fsView.getAllFileGroups(partitionPath) + .map(fileGroup -> Pair.of(partitionPath, fileGroup.getFileGroupId().getFileId())) + .collect(Collectors.toList())); + } + // With KEEP_LATEST_COMMITS cleaner policy, for each file group, we need to figure out + // the latest version before earliestCommitToRetain, which is also kept from cleaning. + // The timeline of commits is traversed in reverse order to achieve this. + for (HoodieInstant instant : commitsTimeline.getReverseOrderedInstants().collect(Collectors.toList())) { + List hoodieWriteStatList = commitWriteStatsMap.computeIfAbsent(instant.getTimestamp(), newInstant -> { + try { + return HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails( + timeline.filter(inst -> inst.getTimestamp().equals(newInstant)) + .firstInstant().get()).get(), + HoodieCommitMetadata.class) + .getWriteStats(); + } catch (IOException e) { + return Collections.EMPTY_LIST; } - // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest - // commit - Option earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits); - Set acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet()); - if (earliestRetainedCommit.isPresent()) { - acceptableCommits - .removeAll(activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp()) - .getInstants().collect(Collectors.toSet())); - acceptableCommits.add(earliestRetainedCommit.get()); + }); + hoodieWriteStatList.forEach(writeStat -> { + Pair partitionFileIdPair = Pair.of(writeStat.getPartitionPath(), writeStat.getFileId()); + if (remainingFileGroupSet.contains(partitionFileIdPair)) { + if (earliestRetainedCommit.isPresent() + && HoodieTimeline.compareTimestamps( + instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestRetainedCommit.get().getTimestamp())) { + remainingFileGroupSet.remove(partitionFileIdPair); + } + expectedInstantTimeMap.computeIfAbsent(partitionFileIdPair, k -> new HashSet<>()) + .add(instant.getTimestamp()); } + }); + if (remainingFileGroupSet.isEmpty()) { + break; + } + } - TableFileSystemView fsView = table1.getFileSystemView(); - // Need to ensure the following - for (String partitionPath : dataGen.getPartitionPaths()) { - List fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); - for (HoodieFileGroup fileGroup : fileGroups) { - Set commitTimes = new HashSet<>(); - fileGroup.getAllBaseFiles().forEach(value -> { - LOG.debug("Data File - " + value); - commitTimes.add(value.getCommitTime()); - }); - if (cfg.isAsyncClean()) { - commitTimes.remove(lastInstant.getTimestamp()); - } - assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes, - "Only contain acceptable versions of file should be present"); - } + // Need to ensure the following + for (String partitionPath : partitionPaths) { + List fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); + for (HoodieFileGroup fileGroup : fileGroups) { + Set commitTimes = new HashSet<>(); + fileGroup.getAllBaseFiles().forEach(value -> { + LOG.debug("Data File - " + value); + commitTimes.add(value.getCommitTime()); + }); + if (isAsyncClean) { + commitTimes.remove(lastInstant.getTimestamp()); } + + assertEquals( + expectedInstantTimeMap.get( + Pair.of(partitionPath, fileGroup.getFileGroupId().getFileId())), + commitTimes, + "Only contain acceptable versions of file should be present"); } } }