Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4878] Fix incremental cleaner use case #6498

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -134,37 +135,73 @@ public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetai
switch (config.getCleanerPolicy()) {
case KEEP_LATEST_COMMITS:
case KEEP_LATEST_BY_HOURS:
return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
case KEEP_LATEST_FILE_VERSIONS:
return getPartitionPathsForFullCleaning();
return getPartitionPathsForClean(earliestRetainedInstant, config.getCleanerPolicy());
default:
throw new IllegalStateException("Unknown Cleaner Policy");
}
}

/**
* Return partition paths for cleaning by commits mode.
* @param instantToRetain Earliest Instant to retain
* @return list of partitions
* Return partition paths for cleaning.
* If incremental cleaning mode is enabled:
* Incase of cleaning based on LATEST_COMMITS and LATEST_HOURS, cleaner looks at the range of commit between commit retained during last clean
* and earliest commit to retain for current clean.
* Incase of cleaning based on LATEST_FILE_VERSIONS, cleaner looks at range of commits between lastCompletedCommit during last clean upto latest
* completed in timeline.
* If incremental cleaning is not enabled, all partitions paths are listed based on file listing.
* @param instantToRetain Earliest Instant to retain.
* @param cleaningPolicy cleaning policy used.
* @return list of partitions to be used for cleaning.
* @throws IOException
*/
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
if (!instantToRetain.isPresent()) {
LOG.info("No earliest commit to retain. No need to scan partitions !!");
return Collections.emptyList();
}
private List<String> getPartitionPathsForClean(Option<HoodieInstant> instantToRetain, HoodieCleaningPolicy cleaningPolicy) throws IOException {
if (cleaningPolicy != HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
if (!instantToRetain.isPresent()) {
LOG.info("No earliest commit to retain. No need to scan partitions !!");
return Collections.emptyList();
}

if (config.incrementalCleanerModeEnabled()) {
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
} else {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
if (config.incrementalCleanerModeEnabled()) {
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
} else {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
return getPartitionPathsForIncrementalCleaning(cleanMetadata.getEarliestCommitToRetain(), instantToRetain);
}
}
}
}
} else {
// FILE_VERSIONS_RETAINED
if (config.incrementalCleanerModeEnabled()) {
// find the last completed commit from last clean and mark that as starting commit to clean
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
} else {
try {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
if (!StringUtils.isNullOrEmpty(cleanMetadata.getLastCompletedCommitTimestamp())) {
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
HoodieCleanMetadata finalCleanMetadata = cleanMetadata;
Option<HoodieInstant> lastCompletedInstantDuringLastClean = hoodieTable.getActiveTimeline()
.filterCompletedInstants()
.filter(entry -> entry.getTimestamp().equals(finalCleanMetadata.getLastCompletedCommitTimestamp()))
.firstInstant();
if (lastCompletedInstantDuringLastClean.isPresent()) {
return getPartitionPathsForIncrementalCleaning(lastCompletedInstantDuringLastClean.get().getTimestamp(), Option.empty());
}
}
} catch (IOException e) {
LOG.error("Failed to parse clean commit metadata for " + lastClean.get().toString());
}
}
}
}
Expand All @@ -174,34 +211,39 @@ private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> in

/**
* Use Incremental Mode for finding partition paths.
* @param cleanMetadata
* @param previousInstantRetained previous instant retained.
* @param newInstantToRetain
* @return
*/
private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
private List<String> getPartitionPathsForIncrementalCleaning(String previousInstantRetained,
Option<HoodieInstant> newInstantToRetain) {
LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ "since last cleaned at " + previousInstantRetained
+ ". New Instant to retain : " + newInstantToRetain);
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(

Stream<HoodieInstant> filteredInstantsGreaterThan = hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
} else {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).distinct().collect(Collectors.toList());
previousInstantRetained));
Stream<HoodieInstant> filteredInstants = newInstantToRetain.isPresent()
? filteredInstantsGreaterThan.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())) : filteredInstantsGreaterThan;

return filteredInstants.flatMap(instant -> {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
} else {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).distinct().collect(Collectors.toList());
}

/**
Expand All @@ -210,6 +252,7 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
*/
private List<String> getPartitionPathsForFullCleaning() {
// Go to brute force mode of scanning all partitions
LOG.info("Getting partition paths for full cleaning");
try {
// Because the partition of BaseTableMetadata has been deleted,
// all partition information can only be obtained from FileSystemBackedTableMetadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -263,16 +264,14 @@ public void testKeepLatestCommits(
assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
}

/**
* Test Hudi COW Table Cleaner - Keep the latest file versions policy.
*/
@Test
public void testKeepLatestFileVersions() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testKeepLatestFileVersions(boolean incrementalCleaningEnabled) throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).withIncrementalCleaningMode(incrementalCleaningEnabled).build())
.build();

HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
Expand Down