Skip to content

Commit

Permalink
[HUDI-4736] Fix inflight clean action preventing clean service to con…
Browse files Browse the repository at this point in the history
…tinue when multiple cleans are not allowed (#6536)
  • Loading branch information
yihua authored and yuzhaojing committed Sep 22, 2022
1 parent 6572693 commit 7db20e9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,6 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));

HoodieCleanMetadata metadata = null;
HoodieTable table = createTable(config, hadoopConf);
if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
LOG.info("Cleaner started");
Expand All @@ -866,15 +865,16 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
table.getMetaClient().reloadActiveTimeline();
}
}

metadata = table.clean(context, cleanInstantTime, skipLocking);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElapsedMs" + durationMs);
}
// Proceeds to execute any requested or inflight clean instances in the timeline
HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElapsedMs" + durationMs);
}
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void testBulkInsertPreppedAndCleanByVersions() throws Exception {


/**
* Tests no more than 1 clean is scheduled/executed if HoodieCompactionConfig.allowMultipleCleanSchedule config is disabled.
* Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple config is set to false.
*/
@Test
public void testMultiClean() {
Expand Down Expand Up @@ -299,22 +299,27 @@ public void testMultiClean() {
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();

// Initiate another clean. The previous leftover clean will be attempted first, followed by another clean
// due to the commit above.
// Try to schedule another clean
String newCleanInstantTime = "00" + index++;
HoodieCleanMetadata cleanMetadata = client.clean(newCleanInstantTime);
// subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false
assertNull(cleanMetadata);

// let the old clean complete
table = HoodieSparkTable.create(writeConfig, context);
cleanMetadata = table.clean(context, cleanInstantTime, false);
// When hoodie.clean.allow.multiple is set to false, a new clean action should not be scheduled.
// The existing requested clean should complete execution.
assertNotNull(cleanMetadata);
assertTrue(metaClient.reloadActiveTimeline().getCleanerTimeline()
.filterCompletedInstants().containsInstant(cleanInstantTime));
assertFalse(metaClient.getActiveTimeline().getCleanerTimeline()
.containsInstant(newCleanInstantTime));

// 1 file cleaned
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1);
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0);
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1);

// any new clean should go ahead
// Now that there is no requested or inflight clean instant, a new clean action can be scheduled
cleanMetadata = client.clean(newCleanInstantTime);
// subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false
assertNotNull(cleanMetadata);
assertTrue(metaClient.reloadActiveTimeline().getCleanerTimeline()
.containsInstant(newCleanInstantTime));

// 1 file cleaned
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1);
Expand Down

0 comments on commit 7db20e9

Please sign in to comment.