Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4736] Fix inflight clean action preventing clean service to continue when multiple cleans are not allowed #6536

Merged
merged 2 commits into from
Sep 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,6 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));

HoodieCleanMetadata metadata = null;
HoodieTable table = createTable(config, hadoopConf);
if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
LOG.info("Cleaner started");
Expand All @@ -866,15 +865,16 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
table.getMetaClient().reloadActiveTimeline();
}
}

metadata = table.clean(context, cleanInstantTime, skipLocking);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElapsedMs" + durationMs);
}
// Proceeds to execute any requested or inflight clean instances in the timeline
HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElapsedMs" + durationMs);
}
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void testBulkInsertPreppedAndCleanByVersions() throws Exception {


/**
* Tests no more than 1 clean is scheduled/executed if HoodieCompactionConfig.allowMultipleCleanSchedule config is disabled.
* Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple config is set to false.
*/
@Test
public void testMultiClean() {
Expand Down Expand Up @@ -299,22 +299,27 @@ public void testMultiClean() {
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();

// Initiate another clean. The previous leftover clean will be attempted first, followed by another clean
// due to the commit above.
// Try to schedule another clean
String newCleanInstantTime = "00" + index++;
HoodieCleanMetadata cleanMetadata = client.clean(newCleanInstantTime);
// subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false
assertNull(cleanMetadata);

// let the old clean complete
table = HoodieSparkTable.create(writeConfig, context);
cleanMetadata = table.clean(context, cleanInstantTime, false);
// When hoodie.clean.allow.multiple is set to false, a new clean action should not be scheduled.
// The existing requested clean should complete execution.
assertNotNull(cleanMetadata);
assertTrue(metaClient.reloadActiveTimeline().getCleanerTimeline()
.filterCompletedInstants().containsInstant(cleanInstantTime));
assertFalse(metaClient.getActiveTimeline().getCleanerTimeline()
.containsInstant(newCleanInstantTime));

// 1 file cleaned
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1);
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0);
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1);

// any new clean should go ahead
// Now that there is no requested or inflight clean instant, a new clean action can be scheduled
cleanMetadata = client.clean(newCleanInstantTime);
// subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false
assertNotNull(cleanMetadata);
assertTrue(metaClient.reloadActiveTimeline().getCleanerTimeline()
.containsInstant(newCleanInstantTime));

// 1 file cleaned
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1);
Expand Down