Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4515] Fix savepoints will be cleaned in keeping latest versions policy #6267

Merged
merged 2 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,17 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(

while (fileSliceIterator.hasNext() && keepVersions > 0) {
// Skip this most recent version
fileSliceIterator.next();
keepVersions--;
}
// Delete the remaining files
while (fileSliceIterator.hasNext()) {
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
FileSlice nextSlice = fileSliceIterator.next();
Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
// do not clean up a savepoint data file
continue;
}
keepVersions--;
}
// Delete the remaining files
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,104 @@ public void testSavepointAndRollback() throws Exception {
}
}

/**
* Test case for rollback-savepoint with KEEP_LATEST_FILE_VERSIONS policy.
*/
@Test
public void testSavepointAndRollbackWithKeepLatestFileVersionPolicy() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);

/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);

List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);

/**
* Write 2 (updates)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);

records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);

client.savepoint("hoodie-unit-test", "test");

/**
* Write 3 (updates)
*/
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);

records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
HoodieWriteConfig config = getConfig();
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
final BaseFileOnlyView view1 = table.getBaseFileOnlyView();

List<HoodieBaseFile> dataFiles = partitionPaths.stream().flatMap(s -> {
return view1.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"));
}).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 003 should be present");

dataFiles = partitionPaths.stream().flatMap(s -> {
return view1.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 002 should be present");

/**
* Write 4 (updates)
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);

records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);

metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(getConfig(), context, metaClient);
final BaseFileOnlyView view2 = table.getBaseFileOnlyView();

dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 004 should be present");

// rollback to savepoint 002
HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get();
client.restoreToSavepoint(savepoint.getTimestamp());

metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(getConfig(), context, metaClient);
final BaseFileOnlyView view3 = table.getBaseFileOnlyView();
dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"))).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 002 be available");

dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"))).collect(Collectors.toList());
assertEquals(0, dataFiles.size(), "The data files for commit 003 should be rolled back");

dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
assertEquals(0, dataFiles.size(), "The data files for commit 004 should be rolled back");
}
}

/**
* Test Cases for effects of rolling back completed/inflight commits.
*/
Expand Down