diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index f837d08afc02..48d510c545b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -248,17 +248,17 @@ private Pair> getFilesToCleanKeepingLatestVersions( while (fileSliceIterator.hasNext() && keepVersions > 0) { // Skip this most recent version + fileSliceIterator.next(); + keepVersions--; + } + // Delete the remaining files + while (fileSliceIterator.hasNext()) { FileSlice nextSlice = fileSliceIterator.next(); Option dataFile = nextSlice.getBaseFile(); if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { // do not clean up a savepoint data file continue; } - keepVersions--; - } - // Delete the remaining files - while (fileSliceIterator.hasNext()) { - FileSlice nextSlice = fileSliceIterator.next(); deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 6952a96a4d5d..92eeac85535c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -178,6 +178,104 @@ public void testSavepointAndRollback() throws Exception { } } + /** + * Test case for rollback-savepoint with KEEP_LATEST_FILE_VERSIONS policy. + */ + @Test + public void testSavepointAndRollbackWithKeepLatestFileVersionPolicy() throws Exception { + HoodieWriteConfig cfg = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()).build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath); + + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + /** + * Write 2 (updates) + */ + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + client.savepoint("hoodie-unit-test", "test"); + + /** + * Write 3 (updates) + */ + newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + HoodieWriteConfig config = getConfig(); + List partitionPaths = + FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient); + final BaseFileOnlyView view1 = table.getBaseFileOnlyView(); + + List dataFiles = partitionPaths.stream().flatMap(s -> { + return view1.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003")); + }).collect(Collectors.toList()); + assertEquals(3, dataFiles.size(), "The data files for commit 003 should be present"); + + dataFiles = partitionPaths.stream().flatMap(s -> { + return view1.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002")); + }).collect(Collectors.toList()); + assertEquals(3, dataFiles.size(), "The data files for commit 002 should be present"); + + /** + * Write 4 (updates) + */ + newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieSparkTable.create(getConfig(), context, metaClient); + final BaseFileOnlyView view2 = table.getBaseFileOnlyView(); + + dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList()); + assertEquals(3, dataFiles.size(), "The data files for commit 004 should be present"); + + // rollback to savepoint 002 + HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get(); + client.restoreToSavepoint(savepoint.getTimestamp()); + + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieSparkTable.create(getConfig(), context, metaClient); + final BaseFileOnlyView view3 = table.getBaseFileOnlyView(); + dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"))).collect(Collectors.toList()); + assertEquals(3, dataFiles.size(), "The data files for commit 002 be available"); + + dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"))).collect(Collectors.toList()); + assertEquals(0, dataFiles.size(), "The data files for commit 003 should be rolled back"); + + dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList()); + assertEquals(0, dataFiles.size(), "The data files for commit 004 should be rolled back"); + } + } + /** * Test Cases for effects of rolling back completed/inflight commits. */