Skip to content

Commit

Permalink
[HUDI-3959] Rename class name for spark rdd reader
Browse files Browse the repository at this point in the history
  • Loading branch information
simonssu committed Apr 25, 2022
1 parent d994c58 commit 9cc5ae5
Show file tree
Hide file tree
Showing 31 changed files with 175 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.hudi.cli.functional;

import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
Expand Down Expand Up @@ -107,7 +107,7 @@ public synchronized void runBeforeEach() {
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
HoodieReadClient.addHoodieSupport(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
/**
* Provides an RDD based API for accessing/filtering Hoodie tables, based on keys.
*/
public class HoodieReadClient<T extends HoodieRecordPayload<T>> implements Serializable {
public class SparkRDDReadClient<T extends HoodieRecordPayload<T>> implements Serializable {

private static final long serialVersionUID = 1L;

Expand All @@ -76,7 +76,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload<T>> implements Seria
/**
* @param basePath path to Hoodie table
*/
public HoodieReadClient(HoodieSparkEngineContext context, String basePath) {
public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath) {
this(context, HoodieWriteConfig.newBuilder().withPath(basePath)
// by default we use HoodieBloomIndex
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build());
Expand All @@ -87,15 +87,15 @@ public HoodieReadClient(HoodieSparkEngineContext context, String basePath) {
* @param basePath
* @param sqlContext
*/
public HoodieReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext) {
public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext) {
this(context, basePath);
this.sqlContextOpt = Option.of(sqlContext);
}

/**
* @param clientConfig instance of HoodieWriteConfig
*/
public HoodieReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clientConfig) {
public SparkRDDReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clientConfig) {
this.context = context;
this.hadoopConf = context.getHadoopConf().get();
final String basePath = clientConfig.getBasePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class TestClientRollback extends HoodieClientTestBase {
public void testSavepointAndRollback() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg)) {
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);

/**
Expand Down Expand Up @@ -236,7 +236,7 @@ public void testRollbackCommit() throws Exception {
testTable.doWriteOperation(commitTime3, WriteOperationType.INSERT, Collections.emptyList(), partitionToFilesNameLengthMap3,
false, true);

try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) {

// Rollback commit3
client.rollback(commitTime3);
Expand Down Expand Up @@ -346,7 +346,7 @@ public void testFailedRollbackCommit(
.addInflightCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);

try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) {

// Rollback commit3
client.rollback(commitTime3);
Expand Down Expand Up @@ -458,7 +458,7 @@ public void testAutoRollbackInflightCommit() throws Exception {
false, true);

final String commitTime4 = "20160506030621";
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) {
client.startCommitWithTime(commitTime4);
// Check results, nothing changed
assertTrue(testTable.commitExists(commitTime1));
Expand All @@ -474,7 +474,7 @@ public void testAutoRollbackInflightCommit() throws Exception {
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
final String commitTime5 = "20160506030631";
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) {
client.startCommitWithTime(commitTime5);
assertTrue(testTable.commitExists(commitTime1));
assertFalse(testTable.inflightCommitExists(commitTime2));
Expand Down Expand Up @@ -547,7 +547,7 @@ public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, b
.addInflightCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);

try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) {
if (isRollbackPlanCorrupted) {
// Add a corrupted requested rollback plan
FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, new byte[] {0, 1, 2});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E
.build()).withAutoCommit(false).withProperties(properties).build();

// Create the first commit
createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true);
createCommitWithInserts(writeConfig, getSparkRDDWriteClient(writeConfig), "000", "001", 200, true);

final int threadCount = 2;
final ExecutorService executors = Executors.newFixedThreadPool(2);
final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig);
final SparkRDDWriteClient client1 = getSparkRDDWriteClient(writeConfig);
final SparkRDDWriteClient client2 = getSparkRDDWriteClient(writeConfig);

final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
Expand Down Expand Up @@ -210,7 +210,7 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table
.build();

// Create the first commit
SparkRDDWriteClient<?> client = getHoodieWriteClient(cfg);
SparkRDDWriteClient<?> client = getSparkRDDWriteClient(cfg);
createCommitWithInsertsForPartition(cfg, client, "000", "001", 100, "2016/03/01");

int numConcurrentWriters = 5;
Expand All @@ -222,7 +222,7 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table
String partition = "2016/03/0" + (loop + 2);
futures.add(executors.submit(() -> {
try {
SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg);
SparkRDDWriteClient<?> writeClient = getSparkRDDWriteClient(cfg);
createCommitWithInsertsForPartition(cfg, writeClient, "001", newCommitTime, 100, partition);
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -280,7 +280,7 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
Set<String> validInstants = new HashSet<>();
// Create the first commit with inserts
HoodieWriteConfig cfg = writeConfigBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
SparkRDDWriteClient client = getSparkRDDWriteClient(cfg);
createCommitWithInserts(cfg, client, "000", "001", 200, true);
validInstants.add("001");
// Create 2 commits with upserts
Expand All @@ -301,9 +301,9 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
.withInlineClusteringNumCommits(1)
.build())
.build();
final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client1 = getSparkRDDWriteClient(cfg2);
final SparkRDDWriteClient client2 = getSparkRDDWriteClient(cfg);
final SparkRDDWriteClient client3 = getSparkRDDWriteClient(cfg);

// Create upserts, schedule cleaning, schedule compaction in parallel
Future future1 = executors.submit(() -> {
Expand Down Expand Up @@ -416,24 +416,24 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType)
.build();

// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, true);
createCommitWithInserts(cfg, getSparkRDDWriteClient(cfg), "000", "001", 200, true);
// Start another inflight commit
String newCommitTime = "003";
int numRecords = 100;
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client1 = getSparkRDDWriteClient(cfg);
String commitTimeBetweenPrevAndNew = "002";
JavaRDD<WriteStatus> result1 = updateBatch(cfg, client1, newCommitTime, "001",
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false,
numRecords, 200, 2);
// Start and finish another commit while the previous writer for commit 003 is running
newCommitTime = "004";
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getSparkRDDWriteClient(cfg);
JavaRDD<WriteStatus> result2 = updateBatch(cfg2, client2, newCommitTime, "001",
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false,
numRecords, 200, 2);
client2.commit(newCommitTime, result2);
// Schedule and run clustering while previous writer for commit 003 is running
SparkRDDWriteClient client3 = getHoodieWriteClient(cfg3);
SparkRDDWriteClient client3 = getSparkRDDWriteClient(cfg3);
// schedule clustering
Option<String> clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER);
assertTrue(clusterInstant.isPresent());
Expand Down Expand Up @@ -464,12 +464,12 @@ public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception
HoodieWriteConfig cfg2 = writeConfigBuilder.build();

// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 5000, false);
createCommitWithInserts(cfg, getSparkRDDWriteClient(cfg), "000", "001", 5000, false);
// Start another inflight commit
String newCommitTime1 = "003";
String newCommitTime2 = "004";
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2);
SparkRDDWriteClient client1 = getSparkRDDWriteClient(cfg);
SparkRDDWriteClient client2 = getSparkRDDWriteClient(cfg2);

List<HoodieRecord> updates1 = dataGen.generateUpdates(newCommitTime1, 5000);
List<HoodieRecord> updates2 = dataGen.generateUpdates(newCommitTime2, 5000);
Expand Down Expand Up @@ -547,12 +547,12 @@ public void testHoodieClientMultiWriterAutoCommitNonConflict() throws Exception
HoodieWriteConfig cfg2 = writeConfigBuilder.build();

// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, false);
createCommitWithInserts(cfg, getSparkRDDWriteClient(cfg), "000", "001", 200, false);
// Start another inflight commit
String newCommitTime1 = "003";
String newCommitTime2 = "004";
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2);
SparkRDDWriteClient client1 = getSparkRDDWriteClient(cfg);
SparkRDDWriteClient client2 = getSparkRDDWriteClient(cfg2);

List<HoodieRecord> updates1 = dataGen.generateInserts(newCommitTime1, 200);
List<HoodieRecord> updates2 = dataGen.generateInserts(newCommitTime2, 200);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testReadFilterExistAfterBulkInsertPrepped() throws Exception {

@Test
public void testReadROViewFailsWithoutSqlContext() {
HoodieReadClient readClient = new HoodieReadClient(context, getConfig());
SparkRDDReadClient readClient = new SparkRDDReadClient(context, getConfig());
JavaRDD<HoodieKey> recordsRDD = jsc.parallelize(new ArrayList<>(), 1);
assertThrows(IllegalStateException.class, () -> {
readClient.readROView(recordsRDD, 1);
Expand All @@ -102,8 +102,8 @@ public void testReadROViewFailsWithoutSqlContext() {
*/
private void testReadFilterExist(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());
try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(config);) {
SparkRDDReadClient readClient = getSparkRDDReadClient(config.getBasePath());
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
Expand All @@ -119,7 +119,7 @@ private void testReadFilterExist(HoodieWriteConfig config,
// Verify there are no errors
assertNoWriteErrors(statuses);

HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath());
SparkRDDReadClient anotherReadClient = getSparkRDDReadClient(config.getBasePath());
filteredRDD = anotherReadClient.filterExists(recordsRDD);
List<HoodieRecord> result = filteredRDD.collect();
// Check results
Expand Down Expand Up @@ -199,7 +199,7 @@ private void testTagLocation(HoodieWriteConfig hoodieWriteConfig,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> updateFn, boolean isPrepped)
throws Exception {
try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);) {
try (SparkRDDWriteClient client = getSparkRDDWriteClient(hoodieWriteConfig);) {
// Write 1 (only inserts)
String newCommitTime = "001";
String initCommitTime = "000";
Expand All @@ -212,7 +212,7 @@ private void testTagLocation(HoodieWriteConfig hoodieWriteConfig,
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
.map(record -> new HoodieAvroRecord(record.getKey(), null)).collect(Collectors.toList()));
// Should have 100 records in table (check using Index), all in locations marked at commit
HoodieReadClient readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath());
SparkRDDReadClient readClient = getSparkRDDReadClient(hoodieWriteConfig.getBasePath());
List<HoodieRecord> taggedRecords = readClient.tagLocation(recordRDD).collect();
checkTaggedRecords(taggedRecords, newCommitTime);

Expand All @@ -228,7 +228,7 @@ private void testTagLocation(HoodieWriteConfig hoodieWriteConfig,
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
.map(record -> new HoodieAvroRecord(record.getKey(), null)).collect(Collectors.toList()));
// Index should be able to locate all updates in correct locations.
readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath());
readClient = getSparkRDDReadClient(hoodieWriteConfig.getBasePath());
taggedRecords = readClient.tagLocation(recordRDD).collect();
checkTaggedRecords(taggedRecords, newCommitTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ public void readLocalWriteHDFS() throws Exception {
.initTable(hadoopConf, tablePath);


try (SparkRDDWriteClient hdfsWriteClient = getHoodieWriteClient(cfg);
SparkRDDWriteClient localWriteClient = getHoodieWriteClient(localConfig)) {
try (SparkRDDWriteClient hdfsWriteClient = getSparkRDDWriteClient(cfg);
SparkRDDWriteClient localWriteClient = getSparkRDDWriteClient(localConfig)) {

// Write generated data to hdfs (only inserts)
String readCommitTime = hdfsWriteClient.startCommit();
Expand Down
Loading

0 comments on commit 9cc5ae5

Please sign in to comment.