Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5358] Fix flaky tests in TestCleanerInsertAndCleanByCommits #7420

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -46,11 +48,16 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_COMMITS;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static org.apache.hudi.table.TestCleaner.insertFirstBigBatchForClientCleanerTest;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
Expand Down Expand Up @@ -117,7 +124,7 @@ private void testInsertAndCleanByCommits(
int maxCommits = 3; // keep upto 3 commits from the past
HoodieWriteConfig cfg = getConfigBuilder(true)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withCleanerPolicy(KEEP_LATEST_COMMITS)
.withAsyncClean(isAsync).retainCommits(maxCommits).build())
.withParallelism(PARALLELISM, PARALLELISM)
.withBulkInsertParallelism(PARALLELISM)
Expand All @@ -138,6 +145,7 @@ private void testInsertAndCleanByCommits(
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, recordInsertGenWrappedFunction, insertFn);

Map<String, List<HoodieWriteStat>> commitWriteStatsMap = new HashMap<>();
// Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
for (int i = 0; i < 8; i++) {
String newCommitTime = makeNewCommitTime();
Expand All @@ -147,42 +155,106 @@ private void testInsertAndCleanByCommits(
List<WriteStatus> statuses = upsertFn.apply(client, jsc().parallelize(records, PARALLELISM), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
commitWriteStatsMap.put(
newCommitTime,
statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));

metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table1 = HoodieSparkTable.create(cfg, context(), metaClient);
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
HoodieInstant lastInstant = activeTimeline.lastInstant().get();
if (cfg.isAsyncClean()) {
activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp());
validateFilesAfterCleaning(
HoodieSparkTable.create(cfg, context(), metaClient),
commitWriteStatsMap,
dataGen.getPartitionPaths());
}
}
}

/**
* Validates the data files in a Hudi table based on the `KEEP_LATEST_COMMITS` cleaner policy.
*
* @param table {@link HoodieTable} instance.
* @param commitWriteStatsMap The cache for the list of write stats of each commit.
* @param partitionPaths List of partitions to validate.
*/
private void validateFilesAfterCleaning(
HoodieTable table,
Map<String, List<HoodieWriteStat>> commitWriteStatsMap,
String[] partitionPaths) {
assertEquals(KEEP_LATEST_COMMITS, table.getConfig().getCleanerPolicy());
boolean isAsyncClean = table.getConfig().isAsyncClean();
int maxCommitsToRetain = table.getConfig().getCleanerCommitsRetained();
HoodieTimeline commitsTimeline = table.getCompletedCommitsTimeline();
HoodieInstant lastInstant = commitsTimeline.lastInstant().get();

if (isAsyncClean) {
commitsTimeline = commitsTimeline.findInstantsBefore(lastInstant.getTimestamp());
}
// This corresponds to the `earliestCommitToRetain` in {@code CleanPlanner::getFilesToCleanKeepingLatestCommits}
Option<HoodieInstant> earliestRetainedCommit = commitsTimeline.nthFromLastInstant(maxCommitsToRetain - 1);
// A final timeline to be used in Lambda function
HoodieTimeline timeline = commitsTimeline;
// Mapping of <partition path, file group ID> to expected set of instant timestamps
Map<Pair<String, String>, Set<String>> expectedInstantTimeMap = new HashMap<>();
TableFileSystemView fsView = table.getFileSystemView();
// Remaining file groups to figure out the one version before earliestRetainedCommit
Set<Pair<String, String>> remainingFileGroupSet = new HashSet<>();

for (String partitionPath : partitionPaths) {
remainingFileGroupSet.addAll(
fsView.getAllFileGroups(partitionPath)
.map(fileGroup -> Pair.of(partitionPath, fileGroup.getFileGroupId().getFileId()))
.collect(Collectors.toList()));
}
// With KEEP_LATEST_COMMITS cleaner policy, for each file group, we need to figure out
// the latest version before earliestCommitToRetain, which is also kept from cleaning.
// The timeline of commits is traversed in reverse order to achieve this.
for (HoodieInstant instant : commitsTimeline.getReverseOrderedInstants().collect(Collectors.toList())) {
List<HoodieWriteStat> hoodieWriteStatList = commitWriteStatsMap.computeIfAbsent(instant.getTimestamp(), newInstant -> {
try {
return HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(
timeline.filter(inst -> inst.getTimestamp().equals(newInstant))
.firstInstant().get()).get(),
HoodieCommitMetadata.class)
.getWriteStats();
} catch (IOException e) {
return Collections.EMPTY_LIST;
}
// NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
// commit
Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstantsAsStream().collect(Collectors.toSet());
if (earliestRetainedCommit.isPresent()) {
acceptableCommits
.removeAll(activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp())
.getInstantsAsStream().collect(Collectors.toSet()));
acceptableCommits.add(earliestRetainedCommit.get());
});
hoodieWriteStatList.forEach(writeStat -> {
Pair<String, String> partitionFileIdPair = Pair.of(writeStat.getPartitionPath(), writeStat.getFileId());
if (remainingFileGroupSet.contains(partitionFileIdPair)) {
if (earliestRetainedCommit.isPresent()
&& HoodieTimeline.compareTimestamps(
instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestRetainedCommit.get().getTimestamp())) {
remainingFileGroupSet.remove(partitionFileIdPair);
}
expectedInstantTimeMap.computeIfAbsent(partitionFileIdPair, k -> new HashSet<>())
.add(instant.getTimestamp());
}
});
if (remainingFileGroupSet.isEmpty()) {
break;
}
}

TableFileSystemView fsView = table1.getFileSystemView();
// Need to ensure the following
for (String partitionPath : dataGen.getPartitionPaths()) {
List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
Set<String> commitTimes = new HashSet<>();
fileGroup.getAllBaseFiles().forEach(value -> {
LOG.debug("Data File - " + value);
commitTimes.add(value.getCommitTime());
});
if (cfg.isAsyncClean()) {
commitTimes.remove(lastInstant.getTimestamp());
}
assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes,
"Only contain acceptable versions of file should be present");
}
// Need to ensure the following
for (String partitionPath : partitionPaths) {
List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
Set<String> commitTimes = new HashSet<>();
fileGroup.getAllBaseFiles().forEach(value -> {
LOG.debug("Data File - " + value);
commitTimes.add(value.getCommitTime());
});
if (isAsyncClean) {
commitTimes.remove(lastInstant.getTimestamp());
}

assertEquals(
expectedInstantTimeMap.get(
Pair.of(partitionPath, fileGroup.getFileGroupId().getFileId())),
commitTimes,
"Only contain acceptable versions of file should be present");
}
}
}
Expand Down