Skip to content

Commit

Permalink
addressing feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Sep 26, 2022
1 parent 319c261 commit 9dc742b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,26 @@ public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetai
case KEEP_LATEST_COMMITS:
case KEEP_LATEST_BY_HOURS:
case KEEP_LATEST_FILE_VERSIONS:
return getPartitionPathsForCleanByCommits(earliestRetainedInstant, config.getCleanerPolicy());
return getPartitionPathsForClean(earliestRetainedInstant, config.getCleanerPolicy());
default:
throw new IllegalStateException("Unknown Cleaner Policy");
}
}

/**
* Return partition paths for cleaning by commits mode.
* @param instantToRetain Earliest Instant to retain
* @return list of partitions
* Return partition paths for cleaning.
* If incremental cleaning mode is enabled:
* Incase of cleaning based on LATEST_COMMITS and LATEST_HOURS, cleaner looks at the range of commit between commit retained during last clean
* and earliest commit to retain for current clean.
* Incase of cleaning based on LATEST_FILE_VERSIONS, cleaner looks at range of commits between lastCompletedCommit during last clean upto latest
* completed in timeline.
* If incremental cleaning is not enabled, all partitions paths are listed based on file listing.
* @param instantToRetain Earliest Instant to retain.
* @param cleaningPolicy cleaning policy used.
* @return list of partitions to be used for cleaning.
* @throws IOException
*/
private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain, HoodieCleaningPolicy cleaningPolicy) throws IOException {
private List<String> getPartitionPathsForClean(Option<HoodieInstant> instantToRetain, HoodieCleaningPolicy cleaningPolicy) throws IOException {
if (cleaningPolicy != HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
if (!instantToRetain.isPresent()) {
LOG.info("No earliest commit to retain. No need to scan partitions !!");
Expand All @@ -173,20 +180,21 @@ private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> in
} else {
// FILE_VERSIONS_RETAINED
if (config.incrementalCleanerModeEnabled()) {
// find the last completed commit from last clean and mark that as earliest commit to retain.
// find the last completed commit from last clean and mark that as starting commit to clean
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
} else {
HoodieCleanMetadata cleanMetadata = null;
try {
cleanMetadata = TimelineMetadataUtils
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if (!StringUtils.isNullOrEmpty(cleanMetadata.getLastCompletedCommitTimestamp())) {
HoodieCleanMetadata finalCleanMetadata = cleanMetadata;
Option<HoodieInstant> lastCompletedInstantDuringLastClean = hoodieTable.getActiveTimeline().filterCompletedInstants()
.filter(entry -> entry.getTimestamp().equals(finalCleanMetadata.getLastCompletedCommitTimestamp())).firstInstant();
Option<HoodieInstant> lastCompletedInstantDuringLastClean = hoodieTable.getActiveTimeline()
.filterCompletedInstants()
.filter(entry -> entry.getTimestamp().equals(finalCleanMetadata.getLastCompletedCommitTimestamp()))
.firstInstant();
if (lastCompletedInstantDuringLastClean.isPresent()) {
return getPartitionPathsForIncrementalCleaning(lastCompletedInstantDuringLastClean.get().getTimestamp(), Option.empty());
}
Expand Down Expand Up @@ -244,6 +252,7 @@ private List<String> getPartitionPathsForIncrementalCleaning(String previousInst
*/
private List<String> getPartitionPathsForFullCleaning() {
// Go to brute force mode of scanning all partitions
LOG.info("Getting partition paths for full cleaning");
try {
// Because the partition of BaseTableMetadata has been deleted,
// all partition information can only be obtained from FileSystemBackedTableMetadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -263,11 +264,9 @@ public void testKeepLatestCommits(
assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
}

/**
* Test Hudi COW Table Cleaner - Keep the latest file versions policy.
*/
@Test
public void testKeepLatestFileVersions() throws Exception {
@ParameterizedTest
@ValueSource(Boolean.class)
public void testKeepLatestFileVersions(boolean ) throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
Expand Down

0 comments on commit 9dc742b

Please sign in to comment.