Skip to content

Commit

Permalink
[HUDI-4515] test case for rollback-savepoint with KEEP_LATEST_FILE_VE…
Browse files Browse the repository at this point in the history
…RSIONS policy
  • Loading branch information
Zouxxyy committed Aug 5, 2022
1 parent 9b6f580 commit 43899bb
Showing 1 changed file with 98 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,104 @@ public void testSavepointAndRollback() throws Exception {
}
}

/**
* Test case for rollback-savepoint with KEEP_LATEST_FILE_VERSIONS policy.
*/
@Test
public void testSavepointAndRollbackWithKeepLatestFileVersionPolicy() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);

/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);

List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);

/**
* Write 2 (updates)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);

records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);

client.savepoint("hoodie-unit-test", "test");

/**
* Write 3 (updates)
*/
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);

records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
HoodieWriteConfig config = getConfig();
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
final BaseFileOnlyView view1 = table.getBaseFileOnlyView();

List<HoodieBaseFile> dataFiles = partitionPaths.stream().flatMap(s -> {
return view1.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"));
}).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 003 should be present");

dataFiles = partitionPaths.stream().flatMap(s -> {
return view1.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 002 should be present");

/**
* Write 4 (updates)
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);

records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);

metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(getConfig(), context, metaClient);
final BaseFileOnlyView view2 = table.getBaseFileOnlyView();

dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 004 should be present");

// rollback to savepoint 002
HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get();
client.restoreToSavepoint(savepoint.getTimestamp());

metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(getConfig(), context, metaClient);
final BaseFileOnlyView view3 = table.getBaseFileOnlyView();
dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"))).collect(Collectors.toList());
assertEquals(3, dataFiles.size(), "The data files for commit 002 be available");

dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"))).collect(Collectors.toList());
assertEquals(0, dataFiles.size(), "The data files for commit 003 should be rolled back");

dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
assertEquals(0, dataFiles.size(), "The data files for commit 004 should be rolled back");
}
}

/**
* Test Cases for effects of rolling back completed/inflight commits.
*/
Expand Down

0 comments on commit 43899bb

Please sign in to comment.