From 8bfa22df43cc23e5bd89438c6e012e915257b324 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Mon, 29 Aug 2022 15:14:29 -0700 Subject: [PATCH 1/2] [HUDI-4736] Fix inflight clean action preventing clean service to continue when multiple cleans are not allowed --- .../hudi/client/BaseHoodieWriteClient.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index e309dd07d4cd0..8c8cf67d618b9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -857,7 +857,6 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(), HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking)); - HoodieCleanMetadata metadata = null; HoodieTable table = createTable(config, hadoopConf); if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) { LOG.info("Cleaner started"); @@ -866,15 +865,16 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN); table.getMetaClient().reloadActiveTimeline(); } + } - metadata = table.clean(context, cleanInstantTime, skipLocking); - if (timerContext != null && metadata != null) { - long durationMs = metrics.getDurationInMs(timerContext.stop()); - metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); - LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files" - + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() - + " cleanerElapsedMs" + durationMs); - } + // Proceeds to execute any requested or inflight clean instances in the timeline + HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking); + if (timerContext != null && metadata != null) { + long durationMs = metrics.getDurationInMs(timerContext.stop()); + metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); + LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files" + + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + + " cleanerElapsedMs" + durationMs); } return metadata; } From 150990a2838ad3bf7e657063248922db078c3b25 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 16 Sep 2022 17:50:29 -0700 Subject: [PATCH 2/2] Fix the test around hoodie.clean.allow.multiple --- .../org/apache/hudi/table/TestCleaner.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index c12b3bd294e76..0f40c28508e7f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -255,7 +255,7 @@ public void testBulkInsertPreppedAndCleanByVersions() throws Exception { /** - * Tests no more than 1 clean is scheduled/executed if HoodieCompactionConfig.allowMultipleCleanSchedule config is disabled. + * Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple config is set to false. */ @Test public void testMultiClean() { @@ -299,22 +299,27 @@ public void testMultiClean() { client.startCommitWithTime(newCommitTime); client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - // Initiate another clean. The previous leftover clean will be attempted first, followed by another clean - // due to the commit above. + // Try to schedule another clean String newCleanInstantTime = "00" + index++; HoodieCleanMetadata cleanMetadata = client.clean(newCleanInstantTime); - // subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false - assertNull(cleanMetadata); - - // let the old clean complete - table = HoodieSparkTable.create(writeConfig, context); - cleanMetadata = table.clean(context, cleanInstantTime, false); + // When hoodie.clean.allow.multiple is set to false, a new clean action should not be scheduled. + // The existing requested clean should complete execution. assertNotNull(cleanMetadata); + assertTrue(metaClient.reloadActiveTimeline().getCleanerTimeline() + .filterCompletedInstants().containsInstant(cleanInstantTime)); + assertFalse(metaClient.getActiveTimeline().getCleanerTimeline() + .containsInstant(newCleanInstantTime)); + + // 1 file cleaned + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1); + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0); + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1); - // any new clean should go ahead + // Now that there is no requested or inflight clean instant, a new clean action can be scheduled cleanMetadata = client.clean(newCleanInstantTime); - // subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false assertNotNull(cleanMetadata); + assertTrue(metaClient.reloadActiveTimeline().getCleanerTimeline() + .containsInstant(newCleanInstantTime)); // 1 file cleaned assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1);