Skip to content

Commit

Permalink
Addressing feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Sep 22, 2022
1 parent 60beb89 commit 058f2ae
Showing 1 changed file with 57 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -135,7 +136,7 @@ public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetai
case KEEP_LATEST_COMMITS:
case KEEP_LATEST_BY_HOURS:
case KEEP_LATEST_FILE_VERSIONS:
return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
return getPartitionPathsForCleanByCommits(earliestRetainedInstant, config.getCleanerPolicy());
default:
throw new IllegalStateException("Unknown Cleaner Policy");
}
Expand All @@ -147,23 +148,52 @@ public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetai
* @return list of partitions
* @throws IOException
*/
private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
if (!instantToRetain.isPresent()) {
LOG.info("No earliest commit to retain. No need to scan partitions !!");
return Collections.emptyList();
}
private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain, HoodieCleaningPolicy cleaningPolicy) throws IOException {
if (cleaningPolicy != HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
if (!instantToRetain.isPresent()) {
LOG.info("No earliest commit to retain. No need to scan partitions !!");
return Collections.emptyList();
}

if (config.incrementalCleanerModeEnabled()) {
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
} else {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
if (config.incrementalCleanerModeEnabled()) {
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
} else {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
return getPartitionPathsForIncrementalCleaning(cleanMetadata.getEarliestCommitToRetain(), instantToRetain);
}
}
}
}
} else {
// FILE_VERSIONS_RETAINED
if (config.incrementalCleanerModeEnabled()) {
// find the last completed commit from last clean and mark that as earliest commit to retain.
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
} else {
HoodieCleanMetadata cleanMetadata = null;
try {
cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
if (!StringUtils.isNullOrEmpty(cleanMetadata.getLastCompletedCommitTimestamp())) {
HoodieCleanMetadata finalCleanMetadata = cleanMetadata;
Option<HoodieInstant> lastCompletedInstantDuringLastClean = hoodieTable.getActiveTimeline().filterCompletedInstants()
.filter(entry -> entry.getTimestamp().equals(finalCleanMetadata.getLastCompletedCommitTimestamp())).firstInstant();
if (lastCompletedInstantDuringLastClean.isPresent()) {
return getPartitionPathsForIncrementalCleaning(lastCompletedInstantDuringLastClean.get().getTimestamp(), Option.empty());
}
}
} catch (IOException e) {
LOG.error("Failed to parse clean commit metadata for " + lastClean.get().toString());
}
}
}
}
Expand All @@ -173,19 +203,24 @@ private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> in

/**
* Use Incremental Mode for finding partition paths.
* @param cleanMetadata
* @param previousInstantRetained previous instant retained.
* @param newInstantToRetain
* @return
*/
private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
private List<String> getPartitionPathsForIncrementalCleaning(String previousInstantRetained,
Option<HoodieInstant> newInstantToRetain) {
LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ "since last cleaned at " + previousInstantRetained
+ ". New Instant to retain : " + newInstantToRetain);
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(

Stream<HoodieInstant> filteredInstantsGreaterThan = hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
previousInstantRetained));
Stream<HoodieInstant> filteredInstants = newInstantToRetain.isPresent() ?
filteredInstantsGreaterThan.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())) : filteredInstantsGreaterThan;

return filteredInstants.flatMap(instant -> {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
Expand Down Expand Up @@ -510,8 +545,6 @@ public Option<HoodieInstant> getEarliestCommitToRetain() {
String earliestTimeToRetain = HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(hoursRetained).toInstant()));
earliestCommitToRetain = Option.fromJavaOptional(commitTimeline.getInstants().filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst());
} else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
earliestCommitToRetain = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
}
return earliestCommitToRetain;
}
Expand Down

0 comments on commit 058f2ae

Please sign in to comment.