Skip to content

Commit

Permalink
[HUDI-4515] Fix savepoints will be cleaned in keeping latest versions…
Browse files Browse the repository at this point in the history
… policy (#6267)
  • Loading branch information
Zouxxyy authored Aug 24, 2022
1 parent 1879efa commit ca8a57a
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,17 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(

while (fileSliceIterator.hasNext() && keepVersions > 0) {
// Skip this most recent version
fileSliceIterator.next();
keepVersions--;
}
// Delete the remaining files
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
// do not clean up a savepoint data file
continue;
}
keepVersions--;
}
// Delete the remaining files
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,104 @@ public void testSavepointAndRollback() throws Exception {
}
}

/**
* Test case for rollback-savepoint with KEEP_LATEST_FILE_VERSIONS policy.
*/
@Test
public void testSavepointAndRollbackWithKeepLatestFileVersionPolicy() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);

/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);

List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);

/**
* Write 2 (updates)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);

records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);

client.savepoint("hoodie-unit-test", "test");

/**
* Write 3 (updates)
*/
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);

records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
HoodieWriteConfig config = getConfig();
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
final BaseFileOnlyView view1 = table.getBaseFileOnlyView();

List<HoodieBaseFile> dataFiles = partitionPaths.stream().flatMap(s -> {
return view1.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"));
}).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 003 should be present");

dataFiles = partitionPaths.stream().flatMap(s -> {
return view1.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 002 should be present");

/**
* Write 4 (updates)
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);

records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);

metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(getConfig(), context, metaClient);
final BaseFileOnlyView view2 = table.getBaseFileOnlyView();

dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 004 should be present");

// rollback to savepoint 002
HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get();
client.restoreToSavepoint(savepoint.getTimestamp());

metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(getConfig(), context, metaClient);
final BaseFileOnlyView view3 = table.getBaseFileOnlyView();
dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"))).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 002 be available");

dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"))).collect(Collectors.toList());
assertEquals(0, dataFiles.size(), "The data files for commit 003 should be rolled back");

dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
assertEquals(0, dataFiles.size(), "The data files for commit 004 should be rolled back");
}
}

/**
* Test Cases for effects of rolling back completed/inflight commits.
*/
Expand Down

0 comments on commit ca8a57a

Please sign in to comment.