Skip to content

Commit

Permalink
[HUDI-5358] Fix flaky tests in TestCleanerInsertAndCleanByCommits (ap…
Browse files Browse the repository at this point in the history
…ache#7420)

This PR fixes the validation logic inside cleaner tests in TestCleanerInsertAndCleanByCommits.

In the tests, the KEEP_LATEST_COMMITS cleaner policy is used. This policy first figures out the earliest commit to retain based on the config of the number of retained commits (hoodie.cleaner.commits.retained). Then, for each file group, one more version before the earliest commit to retain is also kept from cleaning. The commit for the version can be different among file groups. For example, given the following commits, with x denoting a base file written for the corresponding commit and file group,

         c1  c2  c3  c4  c5
fg1:     x        x   x  x
fg2:          x   x   x  x
with hoodie.cleaner.commits.retained=3, no files are cleaned.

However, the current validation logic only statically picks the one commit before the earliest commit to retain in the Hudi timeline, i.e., c2 in the above example, for all file groups, marking the base file written by c1 for fg1 to be expected for cleaning, which does not match the KEEP_LATEST_COMMITS cleaner policy. The validation logic is fixed according to the expected cleaner behavior.
  • Loading branch information
yihua authored and Alexey Kudinkin committed Dec 14, 2022
1 parent f1d643e commit 172c438
Showing 1 changed file with 104 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -46,11 +48,16 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_COMMITS;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static org.apache.hudi.table.TestCleaner.insertFirstBigBatchForClientCleanerTest;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
Expand Down Expand Up @@ -117,7 +124,7 @@ private void testInsertAndCleanByCommits(
int maxCommits = 3; // keep upto 3 commits from the past
HoodieWriteConfig cfg = getConfigBuilder(true)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withCleanerPolicy(KEEP_LATEST_COMMITS)
.withAsyncClean(isAsync).retainCommits(maxCommits).build())
.withParallelism(PARALLELISM, PARALLELISM)
.withBulkInsertParallelism(PARALLELISM)
Expand All @@ -138,6 +145,7 @@ private void testInsertAndCleanByCommits(
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, recordInsertGenWrappedFunction, insertFn);

Map<String, List<HoodieWriteStat>> commitWriteStatsMap = new HashMap<>();
// Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
for (int i = 0; i < 8; i++) {
String newCommitTime = makeNewCommitTime();
Expand All @@ -147,42 +155,106 @@ private void testInsertAndCleanByCommits(
List<WriteStatus> statuses = upsertFn.apply(client, jsc().parallelize(records, PARALLELISM), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
commitWriteStatsMap.put(
newCommitTime,
statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));

metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table1 = HoodieSparkTable.create(cfg, context(), metaClient);
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
HoodieInstant lastInstant = activeTimeline.lastInstant().get();
if (cfg.isAsyncClean()) {
activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp());
validateFilesAfterCleaning(
HoodieSparkTable.create(cfg, context(), metaClient),
commitWriteStatsMap,
dataGen.getPartitionPaths());
}
}
}

/**
* Validates the data files in a Hudi table based on the `KEEP_LATEST_COMMITS` cleaner policy.
*
* @param table {@link HoodieTable} instance.
* @param commitWriteStatsMap The cache for the list of write stats of each commit.
* @param partitionPaths List of partitions to validate.
*/
private void validateFilesAfterCleaning(
HoodieTable table,
Map<String, List<HoodieWriteStat>> commitWriteStatsMap,
String[] partitionPaths) {
assertEquals(KEEP_LATEST_COMMITS, table.getConfig().getCleanerPolicy());
boolean isAsyncClean = table.getConfig().isAsyncClean();
int maxCommitsToRetain = table.getConfig().getCleanerCommitsRetained();
HoodieTimeline commitsTimeline = table.getCompletedCommitsTimeline();
HoodieInstant lastInstant = commitsTimeline.lastInstant().get();

if (isAsyncClean) {
commitsTimeline = commitsTimeline.findInstantsBefore(lastInstant.getTimestamp());
}
// This corresponds to the `earliestCommitToRetain` in {@code CleanPlanner::getFilesToCleanKeepingLatestCommits}
Option<HoodieInstant> earliestRetainedCommit = commitsTimeline.nthFromLastInstant(maxCommitsToRetain - 1);
// A final timeline to be used in Lambda function
HoodieTimeline timeline = commitsTimeline;
// Mapping of <partition path, file group ID> to expected set of instant timestamps
Map<Pair<String, String>, Set<String>> expectedInstantTimeMap = new HashMap<>();
TableFileSystemView fsView = table.getFileSystemView();
// Remaining file groups to figure out the one version before earliestRetainedCommit
Set<Pair<String, String>> remainingFileGroupSet = new HashSet<>();

for (String partitionPath : partitionPaths) {
remainingFileGroupSet.addAll(
fsView.getAllFileGroups(partitionPath)
.map(fileGroup -> Pair.of(partitionPath, fileGroup.getFileGroupId().getFileId()))
.collect(Collectors.toList()));
}
// With KEEP_LATEST_COMMITS cleaner policy, for each file group, we need to figure out
// the latest version before earliestCommitToRetain, which is also kept from cleaning.
// The timeline of commits is traversed in reverse order to achieve this.
for (HoodieInstant instant : commitsTimeline.getReverseOrderedInstants().collect(Collectors.toList())) {
List<HoodieWriteStat> hoodieWriteStatList = commitWriteStatsMap.computeIfAbsent(instant.getTimestamp(), newInstant -> {
try {
return HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(
timeline.filter(inst -> inst.getTimestamp().equals(newInstant))
.firstInstant().get()).get(),
HoodieCommitMetadata.class)
.getWriteStats();
} catch (IOException e) {
return Collections.EMPTY_LIST;
}
// NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
// commit
Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
if (earliestRetainedCommit.isPresent()) {
acceptableCommits
.removeAll(activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp())
.getInstants().collect(Collectors.toSet()));
acceptableCommits.add(earliestRetainedCommit.get());
});
hoodieWriteStatList.forEach(writeStat -> {
Pair<String, String> partitionFileIdPair = Pair.of(writeStat.getPartitionPath(), writeStat.getFileId());
if (remainingFileGroupSet.contains(partitionFileIdPair)) {
if (earliestRetainedCommit.isPresent()
&& HoodieTimeline.compareTimestamps(
instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestRetainedCommit.get().getTimestamp())) {
remainingFileGroupSet.remove(partitionFileIdPair);
}
expectedInstantTimeMap.computeIfAbsent(partitionFileIdPair, k -> new HashSet<>())
.add(instant.getTimestamp());
}
});
if (remainingFileGroupSet.isEmpty()) {
break;
}
}

TableFileSystemView fsView = table1.getFileSystemView();
// Need to ensure the following
for (String partitionPath : dataGen.getPartitionPaths()) {
List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
Set<String> commitTimes = new HashSet<>();
fileGroup.getAllBaseFiles().forEach(value -> {
LOG.debug("Data File - " + value);
commitTimes.add(value.getCommitTime());
});
if (cfg.isAsyncClean()) {
commitTimes.remove(lastInstant.getTimestamp());
}
assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes,
"Only contain acceptable versions of file should be present");
}
// Need to ensure the following
for (String partitionPath : partitionPaths) {
List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
Set<String> commitTimes = new HashSet<>();
fileGroup.getAllBaseFiles().forEach(value -> {
LOG.debug("Data File - " + value);
commitTimes.add(value.getCommitTime());
});
if (isAsyncClean) {
commitTimes.remove(lastInstant.getTimestamp());
}

assertEquals(
expectedInstantTimeMap.get(
Pair.of(partitionPath, fileGroup.getFileGroupId().getFileId())),
commitTimes,
"Only contain acceptable versions of file should be present");
}
}
}
Expand Down

0 comments on commit 172c438

Please sign in to comment.