From 9cc5ae59c5c932763b91b0dd79fd34c504b6ed6b Mon Sep 17 00:00:00 2001 From: simonssu Date: Sun, 24 Apr 2022 10:56:01 +0800 Subject: [PATCH] [HUDI-3959] Rename class name for spark rdd reader --- .../functional/CLIFunctionalTestHarness.java | 4 +- ...eadClient.java => SparkRDDReadClient.java} | 8 +-- .../hudi/client/TestClientRollback.java | 12 ++-- .../client/TestHoodieClientMultiWriter.java | 38 +++++------ .../hudi/client/TestHoodieReadClient.java | 14 ++-- .../org/apache/hudi/client/TestMultiFS.java | 4 +- .../hudi/client/TestTableSchemaEvolution.java | 18 +++--- .../functional/TestHoodieBackedMetadata.java | 8 +-- .../TestHoodieClientOnCopyOnWriteStorage.java | 64 +++++++++---------- .../client/functional/TestHoodieIndex.java | 4 +- .../index/bloom/TestHoodieBloomIndex.java | 2 +- .../bloom/TestHoodieGlobalBloomIndex.java | 2 +- .../apache/hudi/io/TestHoodieMergeHandle.java | 5 +- .../org/apache/hudi/table/TestCleaner.java | 10 +-- .../table/TestHoodieMergeOnReadTable.java | 4 +- .../commit/TestCopyOnWriteActionExecutor.java | 6 +- .../action/compact/CompactionTestBase.java | 4 +- .../action/compact/TestAsyncCompaction.java | 38 +++++------ .../action/compact/TestHoodieCompactor.java | 6 +- .../action/compact/TestInlineCompaction.java | 40 ++++++------ .../HoodieClientRollbackTestBase.java | 4 +- ...TestMergeOnReadRollbackActionExecutor.java | 4 +- .../table/upgrade/TestUpgradeDowngrade.java | 12 ++-- .../hudi/testutils/FunctionalTestHarness.java | 4 +- .../testutils/HoodieClientTestHarness.java | 10 +-- .../hudi/testutils/HoodieClientTestUtils.java | 4 +- .../SparkClientFunctionalTestHarness.java | 4 +- .../quickstart/TestHoodieSparkQuickstart.java | 4 +- .../java/org/apache/hudi/DataSourceUtils.java | 4 +- .../hudi/utilities/TestHoodieIndexer.java | 4 +- .../hudi/utilities/TestHoodieRepairTool.java | 4 +- 31 files changed, 175 insertions(+), 174 deletions(-) rename hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/{HoodieReadClient.java => SparkRDDReadClient.java} (96%) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java index 7a12a6692a2bc..a8f27c3d6be36 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java @@ -19,7 +19,7 @@ package org.apache.hudi.cli.functional; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; @@ -107,7 +107,7 @@ public synchronized void runBeforeEach() { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java similarity index 96% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java index 37a78a4be54c0..6dc9dde2bbec8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java @@ -59,7 +59,7 @@ /** * Provides an RDD based API for accessing/filtering Hoodie tables, based on keys. */ -public class HoodieReadClient> implements Serializable { +public class SparkRDDReadClient> implements Serializable { private static final long serialVersionUID = 1L; @@ -76,7 +76,7 @@ public class HoodieReadClient> implements Seria /** * @param basePath path to Hoodie table */ - public HoodieReadClient(HoodieSparkEngineContext context, String basePath) { + public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath) { this(context, HoodieWriteConfig.newBuilder().withPath(basePath) // by default we use HoodieBloomIndex .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build()); @@ -87,7 +87,7 @@ public HoodieReadClient(HoodieSparkEngineContext context, String basePath) { * @param basePath * @param sqlContext */ - public HoodieReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext) { + public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext) { this(context, basePath); this.sqlContextOpt = Option.of(sqlContext); } @@ -95,7 +95,7 @@ public HoodieReadClient(HoodieSparkEngineContext context, String basePath, SQLCo /** * @param clientConfig instance of HoodieWriteConfig */ - public HoodieReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clientConfig) { + public SparkRDDReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clientConfig) { this.context = context; this.hadoopConf = context.getHadoopConf().get(); final String basePath = clientConfig.getBasePath(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index f6315eec7d211..0a4f3024f968e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -82,7 +82,7 @@ public class TestClientRollback extends HoodieClientTestBase { public void testSavepointAndRollback() throws Exception { HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg)) { HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath); /** @@ -236,7 +236,7 @@ public void testRollbackCommit() throws Exception { testTable.doWriteOperation(commitTime3, WriteOperationType.INSERT, Collections.emptyList(), partitionToFilesNameLengthMap3, false, true); - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) { // Rollback commit3 client.rollback(commitTime3); @@ -346,7 +346,7 @@ public void testFailedRollbackCommit( .addInflightCommit(commitTime3) .withBaseFilesInPartitions(partitionAndFileId3); - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) { // Rollback commit3 client.rollback(commitTime3); @@ -458,7 +458,7 @@ public void testAutoRollbackInflightCommit() throws Exception { false, true); final String commitTime4 = "20160506030621"; - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) { client.startCommitWithTime(commitTime4); // Check results, nothing changed assertTrue(testTable.commitExists(commitTime1)); @@ -474,7 +474,7 @@ public void testAutoRollbackInflightCommit() throws Exception { .withRollbackUsingMarkers(false) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); final String commitTime5 = "20160506030631"; - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) { client.startCommitWithTime(commitTime5); assertTrue(testTable.commitExists(commitTime1)); assertFalse(testTable.inflightCommitExists(commitTime2)); @@ -547,7 +547,7 @@ public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, b .addInflightCommit(commitTime3) .withBaseFilesInPartitions(partitionAndFileId3); - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) { if (isRollbackPlanCorrupted) { // Add a corrupted requested rollback plan FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, new byte[] {0, 1, 2}); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 3aeca0f275891..06820080ba1f5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -110,12 +110,12 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E .build()).withAutoCommit(false).withProperties(properties).build(); // Create the first commit - createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true); + createCommitWithInserts(writeConfig, getSparkRDDWriteClient(writeConfig), "000", "001", 200, true); final int threadCount = 2; final ExecutorService executors = Executors.newFixedThreadPool(2); - final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig); - final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig); + final SparkRDDWriteClient client1 = getSparkRDDWriteClient(writeConfig); + final SparkRDDWriteClient client2 = getSparkRDDWriteClient(writeConfig); final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount); final AtomicBoolean writer1Completed = new AtomicBoolean(false); @@ -210,7 +210,7 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table .build(); // Create the first commit - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); createCommitWithInsertsForPartition(cfg, client, "000", "001", 100, "2016/03/01"); int numConcurrentWriters = 5; @@ -222,7 +222,7 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table String partition = "2016/03/0" + (loop + 2); futures.add(executors.submit(() -> { try { - SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg); + SparkRDDWriteClient writeClient = getSparkRDDWriteClient(cfg); createCommitWithInsertsForPartition(cfg, writeClient, "001", newCommitTime, 100, partition); } catch (Exception e) { throw new RuntimeException(e); @@ -280,7 +280,7 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t Set validInstants = new HashSet<>(); // Create the first commit with inserts HoodieWriteConfig cfg = writeConfigBuilder.build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); createCommitWithInserts(cfg, client, "000", "001", 200, true); validInstants.add("001"); // Create 2 commits with upserts @@ -301,9 +301,9 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t .withInlineClusteringNumCommits(1) .build()) .build(); - final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2); - final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); - final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg); + final SparkRDDWriteClient client1 = getSparkRDDWriteClient(cfg2); + final SparkRDDWriteClient client2 = getSparkRDDWriteClient(cfg); + final SparkRDDWriteClient client3 = getSparkRDDWriteClient(cfg); // Create upserts, schedule cleaning, schedule compaction in parallel Future future1 = executors.submit(() -> { @@ -416,24 +416,24 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) .build(); // Create the first commit - createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, true); + createCommitWithInserts(cfg, getSparkRDDWriteClient(cfg), "000", "001", 200, true); // Start another inflight commit String newCommitTime = "003"; int numRecords = 100; - SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); + SparkRDDWriteClient client1 = getSparkRDDWriteClient(cfg); String commitTimeBetweenPrevAndNew = "002"; JavaRDD result1 = updateBatch(cfg, client1, newCommitTime, "001", Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false, numRecords, 200, 2); // Start and finish another commit while the previous writer for commit 003 is running newCommitTime = "004"; - SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); + SparkRDDWriteClient client2 = getSparkRDDWriteClient(cfg); JavaRDD result2 = updateBatch(cfg2, client2, newCommitTime, "001", Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false, numRecords, 200, 2); client2.commit(newCommitTime, result2); // Schedule and run clustering while previous writer for commit 003 is running - SparkRDDWriteClient client3 = getHoodieWriteClient(cfg3); + SparkRDDWriteClient client3 = getSparkRDDWriteClient(cfg3); // schedule clustering Option clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER); assertTrue(clusterInstant.isPresent()); @@ -464,12 +464,12 @@ public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception HoodieWriteConfig cfg2 = writeConfigBuilder.build(); // Create the first commit - createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 5000, false); + createCommitWithInserts(cfg, getSparkRDDWriteClient(cfg), "000", "001", 5000, false); // Start another inflight commit String newCommitTime1 = "003"; String newCommitTime2 = "004"; - SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); - SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2); + SparkRDDWriteClient client1 = getSparkRDDWriteClient(cfg); + SparkRDDWriteClient client2 = getSparkRDDWriteClient(cfg2); List updates1 = dataGen.generateUpdates(newCommitTime1, 5000); List updates2 = dataGen.generateUpdates(newCommitTime2, 5000); @@ -547,12 +547,12 @@ public void testHoodieClientMultiWriterAutoCommitNonConflict() throws Exception HoodieWriteConfig cfg2 = writeConfigBuilder.build(); // Create the first commit - createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, false); + createCommitWithInserts(cfg, getSparkRDDWriteClient(cfg), "000", "001", 200, false); // Start another inflight commit String newCommitTime1 = "003"; String newCommitTime2 = "004"; - SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); - SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2); + SparkRDDWriteClient client1 = getSparkRDDWriteClient(cfg); + SparkRDDWriteClient client2 = getSparkRDDWriteClient(cfg2); List updates1 = dataGen.generateInserts(newCommitTime1, 200); List updates2 = dataGen.generateInserts(newCommitTime2, 200); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java index 872a4a4215ffc..c171304bd2d3d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java @@ -85,7 +85,7 @@ public void testReadFilterExistAfterBulkInsertPrepped() throws Exception { @Test public void testReadROViewFailsWithoutSqlContext() { - HoodieReadClient readClient = new HoodieReadClient(context, getConfig()); + SparkRDDReadClient readClient = new SparkRDDReadClient(context, getConfig()); JavaRDD recordsRDD = jsc.parallelize(new ArrayList<>(), 1); assertThrows(IllegalStateException.class, () -> { readClient.readROView(recordsRDD, 1); @@ -102,8 +102,8 @@ public void testReadROViewFailsWithoutSqlContext() { */ private void testReadFilterExist(HoodieWriteConfig config, Function3, SparkRDDWriteClient, JavaRDD, String> writeFn) throws Exception { - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { - HoodieReadClient readClient = getHoodieReadClient(config.getBasePath()); + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(config);) { + SparkRDDReadClient readClient = getSparkRDDReadClient(config.getBasePath()); String newCommitTime = writeClient.startCommit(); List records = dataGen.generateInserts(newCommitTime, 100); JavaRDD recordsRDD = jsc.parallelize(records, 1); @@ -119,7 +119,7 @@ private void testReadFilterExist(HoodieWriteConfig config, // Verify there are no errors assertNoWriteErrors(statuses); - HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath()); + SparkRDDReadClient anotherReadClient = getSparkRDDReadClient(config.getBasePath()); filteredRDD = anotherReadClient.filterExists(recordsRDD); List result = filteredRDD.collect(); // Check results @@ -199,7 +199,7 @@ private void testTagLocation(HoodieWriteConfig hoodieWriteConfig, Function3, SparkRDDWriteClient, JavaRDD, String> insertFn, Function3, SparkRDDWriteClient, JavaRDD, String> updateFn, boolean isPrepped) throws Exception { - try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(hoodieWriteConfig);) { // Write 1 (only inserts) String newCommitTime = "001"; String initCommitTime = "000"; @@ -212,7 +212,7 @@ private void testTagLocation(HoodieWriteConfig hoodieWriteConfig, jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream) .map(record -> new HoodieAvroRecord(record.getKey(), null)).collect(Collectors.toList())); // Should have 100 records in table (check using Index), all in locations marked at commit - HoodieReadClient readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath()); + SparkRDDReadClient readClient = getSparkRDDReadClient(hoodieWriteConfig.getBasePath()); List taggedRecords = readClient.tagLocation(recordRDD).collect(); checkTaggedRecords(taggedRecords, newCommitTime); @@ -228,7 +228,7 @@ private void testTagLocation(HoodieWriteConfig hoodieWriteConfig, jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream) .map(record -> new HoodieAvroRecord(record.getKey(), null)).collect(Collectors.toList())); // Index should be able to locate all updates in correct locations. - readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath()); + readClient = getSparkRDDReadClient(hoodieWriteConfig.getBasePath()); taggedRecords = readClient.tagLocation(recordRDD).collect(); checkTaggedRecords(taggedRecords, newCommitTime); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index df0fed027cec1..ecfab44299bdc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -96,8 +96,8 @@ public void readLocalWriteHDFS() throws Exception { .initTable(hadoopConf, tablePath); - try (SparkRDDWriteClient hdfsWriteClient = getHoodieWriteClient(cfg); - SparkRDDWriteClient localWriteClient = getHoodieWriteClient(localConfig)) { + try (SparkRDDWriteClient hdfsWriteClient = getSparkRDDWriteClient(cfg); + SparkRDDWriteClient localWriteClient = getSparkRDDWriteClient(localConfig)) { // Write generated data to hdfs (only inserts) String readCommitTime = hdfsWriteClient.startCommit(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 1cb7bcbfc4fcb..440f142b2ec9f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -172,7 +172,7 @@ public void testMORTable() throws Exception { .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA); - SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); + SparkRDDWriteClient client = getSparkRDDWriteClient(hoodieWriteConfig); // Initial inserts with TRIP_EXAMPLE_SCHEMA int numRecords = 10; @@ -201,7 +201,7 @@ public void testMORTable() throws Exception { // Insert with evolved schema is not allowed HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_DEVOLVED); - client = getHoodieWriteClient(hoodieDevolvedWriteConfig); + client = getSparkRDDWriteClient(hoodieDevolvedWriteConfig); final List failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED); try { // We cannot use insertBatch directly here because we want to insert records @@ -230,7 +230,7 @@ public void testMORTable() throws Exception { // Insert with an evolved scheme is allowed HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED); - client = getHoodieWriteClient(hoodieEvolvedWriteConfig); + client = getSparkRDDWriteClient(hoodieEvolvedWriteConfig); // We cannot use insertBatch directly here because we want to insert records // with an evolved schema and insertBatch inserts records using the TRIP_EXAMPLE_SCHEMA. @@ -252,7 +252,7 @@ public void testMORTable() throws Exception { // Now even the original schema cannot be used for updates as it is devolved in relation to the // current schema of the dataset. - client = getHoodieWriteClient(hoodieWriteConfig); + client = getSparkRDDWriteClient(hoodieWriteConfig); try { updateBatch(hoodieWriteConfig, client, "007", "006", Option.empty(), initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, 0, 0, 0); @@ -295,7 +295,7 @@ public void testMORTable() throws Exception { checkLatestDeltaCommit("004"); // Updates with original schema are now allowed - client = getHoodieWriteClient(hoodieWriteConfig); + client = getSparkRDDWriteClient(hoodieWriteConfig); updateBatch(hoodieWriteConfig, client, "008", "004", Option.empty(), initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, 0, 0, 0); // new commit @@ -318,7 +318,7 @@ public void testCopyOnWriteTable() throws Exception { .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); HoodieWriteConfig hoodieWriteConfig = getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA).withRollbackUsingMarkers(false).build(); - SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); + SparkRDDWriteClient client = getSparkRDDWriteClient(hoodieWriteConfig); // Initial inserts with TRIP_EXAMPLE_SCHEMA int numRecords = 10; @@ -342,7 +342,7 @@ public void testCopyOnWriteTable() throws Exception { // Insert with devolved schema is not allowed HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_DEVOLVED); - client = getHoodieWriteClient(hoodieDevolvedWriteConfig); + client = getSparkRDDWriteClient(hoodieDevolvedWriteConfig); final List failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED); try { // We cannot use insertBatch directly here because we want to insert records @@ -372,7 +372,7 @@ public void testCopyOnWriteTable() throws Exception { // Insert with evolved scheme is allowed HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED); - client = getHoodieWriteClient(hoodieEvolvedWriteConfig); + client = getSparkRDDWriteClient(hoodieEvolvedWriteConfig); final List evolvedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED); // We cannot use insertBatch directly here because we want to insert records // with a evolved schema. @@ -391,7 +391,7 @@ public void testCopyOnWriteTable() throws Exception { // Now even the original schema cannot be used for updates as it is devolved // in relation to the current schema of the dataset. - client = getHoodieWriteClient(hoodieWriteConfig); + client = getSparkRDDWriteClient(hoodieWriteConfig); try { updateBatch(hoodieWriteConfig, client, "006", "005", Option.empty(), initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true, diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 34f470eb1b64a..039ff3c5dc58d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -496,7 +496,7 @@ public void testMetadataTableArchival() throws Exception { assertEquals(4, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants()); // start the timeline server for MARKERS cleaning up - getHoodieWriteClient(writeConfig); + getSparkRDDWriteClient(writeConfig); // trigger a regular write operation. data set timeline archival should kick in. doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); archiveDataTable(writeConfig, HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build()); @@ -1720,7 +1720,7 @@ public void testReattemptOfFailedClusteringCommit() throws Exception { init(tableType); context = new HoodieSparkEngineContext(jsc); HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); // Write 1 (Bulk insert) String newCommitTime = "0000001"; @@ -1748,7 +1748,7 @@ public void testReattemptOfFailedClusteringCommit() throws Exception { .withClusteringConfig(clusteringConfig).build(); // trigger clustering - SparkRDDWriteClient newClient = getHoodieWriteClient(newWriteConfig); + SparkRDDWriteClient newClient = getSparkRDDWriteClient(newWriteConfig); String clusteringCommitTime = newClient.scheduleClustering(Option.empty()).get().toString(); HoodieWriteMetadata> clusterMetadata = newClient.cluster(clusteringCommitTime, true); @@ -2127,7 +2127,7 @@ public void testDeletePartitions() throws Exception { .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) .build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg)) { String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 10); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index a6a37030e8a69..e964b078c3b31 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -289,7 +289,7 @@ private void testAutoCommit(Function3, SparkRDDWriteClient, // Set autoCommit false HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfgBuilder.build());) { String prevCommitTime = "000"; String newCommitTime = "001"; @@ -314,7 +314,7 @@ public void testPreCommitValidatorsOnInsert() throws Exception { .build(); HoodieWriteConfig config = getConfigBuilder().withAutoCommit(true) .withPreCommitValidatorConfig(validatorConfig).build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) { Function3, SparkRDDWriteClient, JavaRDD, String> writeFn = (writeClient, recordRDD, instantTime) -> writeClient.bulkInsert(recordRDD, instantTime, Option.empty()); String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); @@ -334,7 +334,7 @@ public void testPreCommitValidationFailureOnInsert() throws Exception { .build(); HoodieWriteConfig config = getConfigBuilder().withPreCommitValidatorConfig(validatorConfig).build(); String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) { Function3, SparkRDDWriteClient, JavaRDD, String> writeFn = (writeClient, recordRDD, instantTime) -> writeClient.bulkInsert(recordRDD, instantTime, Option.empty()); JavaRDD result = insertFirstBatch(config, client, newCommitTime, @@ -397,7 +397,7 @@ public void testPreCommitValidationWithMultipleInflights() throws Exception { } private void insertWithConfig(HoodieWriteConfig config, int numRecords, String instant) throws Exception { - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) { Function3, SparkRDDWriteClient, JavaRDD, String> writeFn = (writeClient, recordRDD, instantTime) -> writeClient.bulkInsert(recordRDD, instantTime, Option.empty()); JavaRDD result = insertFirstBatch(config, client, instant, @@ -479,7 +479,7 @@ private void testDeduplication( .combineInput(true, true); addConfigsForPopulateMetaFields(configBuilder, populateMetaFields); - try (SparkRDDWriteClient client = getHoodieWriteClient(configBuilder.build());) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(configBuilder.build());) { client.startCommitWithTime(newCommitTime); List statuses = writeFn.apply(client, recordList, newCommitTime).collect(); assertNoWriteErrors(statuses); @@ -551,7 +551,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, .setPopulateMetaFields(config.populateMetaFields()) .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); - SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); + SparkRDDWriteClient client = getSparkRDDWriteClient(hoodieWriteConfig); // Write 1 (only inserts) String newCommitTime = "001"; @@ -581,7 +581,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, // Now simulate an upgrade and perform a restore operation HoodieWriteConfig newConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion( TimelineLayoutVersion.CURR_VERSION).build(); - client = getHoodieWriteClient(newConfig); + client = getSparkRDDWriteClient(newConfig); client.savepoint("004", "user1","comment1"); @@ -713,7 +713,7 @@ private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped) .setTimelineLayoutVersion(VERSION_0) .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); - SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); + SparkRDDWriteClient client = getSparkRDDWriteClient(hoodieWriteConfig); // Write 1 (only inserts) String newCommitTime = "001"; @@ -760,7 +760,7 @@ private void testHoodieConcatHandleOnDupInserts(HoodieWriteConfig config, boolea .withMergeAllowDuplicateOnInserts(true) .build(); - SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); + SparkRDDWriteClient client = getSparkRDDWriteClient(hoodieWriteConfig); // Write 1 (only inserts) String initCommitTime = "000"; @@ -786,7 +786,7 @@ private void testHoodieConcatHandleOnDupInserts(HoodieWriteConfig config, boolea @Test public void testBulkInsertWithCustomPartitioner() { HoodieWriteConfig config = getConfigBuilder().withRollbackUsingMarkers(true).build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) { final String commitTime1 = "001"; client.startCommitWithTime(commitTime1); List inserts1 = dataGen.generateInserts(commitTime1, 100); @@ -805,7 +805,7 @@ public void testBulkInsertWithCustomPartitioner() { public void testDeletes(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); - SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build()); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfgBuilder.build()); /** * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records */ @@ -856,7 +856,7 @@ public void testDeletes(boolean populateMetaFields) throws Exception { public void testDeletesForInsertsInSameBatch(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); - SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build()); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfgBuilder.build()); /** * Write 200 inserts and issue deletes to a subset(50) of inserts. */ @@ -923,7 +923,7 @@ private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConf // Set rollback to LAZY so no inflights are deleted hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()); - SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); + SparkRDDWriteClient client = getSparkRDDWriteClient(hoodieWriteConfig); // Write 1 String newCommitTime = "001"; @@ -1077,7 +1077,7 @@ public void testUpdateRejectForClustering() throws IOException { props.setProperty(ASYNC_CLUSTERING_ENABLE.key(), "true"); HoodieWriteConfig config = getSmallInsertWriteConfig(100, TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), true, props); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient); //1. insert to generate 2 file group @@ -1135,7 +1135,7 @@ public void testSmallInsertHandlingForUpserts() throws Exception { TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150)); dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); // Inserts => will write file1 @@ -1248,7 +1248,7 @@ public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); // Inserts => will write file1 @@ -1331,7 +1331,7 @@ public void testDeletesWithDeleteApi() throws Exception { TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150)); dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); // Inserts => will write file1 String commitTime1 = "001"; @@ -1409,7 +1409,7 @@ public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() thro // trigger another partial commit, followed by valid commit. rollback of partial commit should succeed. HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false); - SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build()); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfgBuilder.build()); String commitTime1 = HoodieActiveTimeline.createNewInstantTime(); List records1 = dataGen.generateInserts(commitTime1, 200); client.startCommitWithTime(commitTime1); @@ -1447,7 +1447,7 @@ public void testInlineScheduleClustering(boolean scheduleInlineClustering) throw HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false) .withClusteringConfig(clusteringConfig) .withProps(getPropertiesForKeyGen()).build(); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"}); String commitTime1 = HoodieActiveTimeline.createNewInstantTime(); List records1 = dataGen.generateInserts(commitTime1, 200); @@ -1512,7 +1512,7 @@ public void testPendingClusteringRollback() throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(EAGER); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig config = cfgBuilder.build(); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); dataGen = new HoodieTestDataGenerator(); String commitTime = HoodieActiveTimeline.createNewInstantTime(); allRecords.addAll(dataGen.generateInserts(commitTime, 200)); @@ -1575,7 +1575,7 @@ public void testInflightClusteringRollbackWhenUpdatesAllowed(boolean rollbackPen addConfigsForPopulateMetaFields(cfgBuilder, true); cfgBuilder.withClusteringConfig(clusteringConfig); HoodieWriteConfig config = cfgBuilder.build(); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); String commitTime = HoodieActiveTimeline.createNewInstantTime(); allRecords.addAll(dataGen.generateUpdates(commitTime, 200)); writeAndVerifyBatch(client, allRecords, commitTime, true); @@ -1660,7 +1660,7 @@ private Pair, List>, Set> tes // create config to not update small files. HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields, populateMetaFields ? new Properties() : getPropertiesForKeyGen()); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"}); String commitTime1 = HoodieActiveTimeline.createNewInstantTime(); List records1 = dataGen.generateInserts(commitTime1, 200); @@ -1720,7 +1720,7 @@ private HoodieWriteMetadata> performClustering(HoodieCluste .withClusteringConfig(clusteringConfig).build(); // create client with new config. - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString(); HoodieWriteMetadata> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering); if (config.isPreserveHoodieCommitMetadataForClustering() && config.populateMetaFields()) { @@ -1778,7 +1778,7 @@ private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, populateMetaFields ? new Properties() : getPropertiesForKeyGen()); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); // Do Inserts @@ -1865,7 +1865,7 @@ private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2Re HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, populateMetaFields ? new Properties() : getPropertiesForKeyGen()); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); dataGen = new HoodieTestDataGenerator(); // Do Inserts for DEFAULT_FIRST_PARTITION_PATH @@ -2048,7 +2048,7 @@ public void testDeletesWithoutInserts(boolean populateMetaFields) { TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, populateMetaFields ? new Properties() : getPropertiesForKeyGen()); dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); - SparkRDDWriteClient client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getSparkRDDWriteClient(config); // delete non existent keys String commitTime1 = "001"; @@ -2071,7 +2071,7 @@ public void testCommitWritesRelativePaths(boolean populateMetaFields) throws Exc HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfgBuilder.build());) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); HoodieSparkTable table = HoodieSparkTable.create(cfgBuilder.build(), context, metaClient); @@ -2118,7 +2118,7 @@ public void testMetadataStatsOnCommit(boolean populateMetaFields) throws Excepti HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); String instantTime0 = "000"; client.startCommitWithTime(instantTime0); @@ -2184,7 +2184,7 @@ public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsisten String instantTime = "000"; HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() .withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); Pair> result = testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard); // Delete orphan marker and commit should succeed @@ -2223,7 +2223,7 @@ private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollb .withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard) .withOptimisticConsistencyGuardSleepTimeMs(1).build()) .withProperties(properties).build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard); if (!enableOptimisticConsistencyGuard) { @@ -2470,7 +2470,7 @@ private Pair> testConsistencyCheck(HoodieTableMetaCli .withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard) .withOptimisticConsistencyGuardSleepTimeMs(1).build()) .build()); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); client.startCommitWithTime(instantTime); JavaRDD writeRecords = jsc.parallelize(dataGen.generateInserts(instantTime, 200), 1); @@ -2519,7 +2519,7 @@ public void testMultiOperationsPerCommit(boolean populateMetaFields) throws IOEx .withAllowMultiWriteOnSameInstant(true); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); String firstInstantTime = "0000"; client.startCommitWithTime(firstInstantTime); int numRecords = 200; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java index 024cf1ff50acc..d93d5e7eb30f2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java @@ -143,7 +143,7 @@ private void setUp(IndexType indexType, boolean populateMetaFields, boolean roll .withLayoutConfig(HoodieLayoutConfig.newBuilder().fromProperties(indexBuilder.build().getProps()) .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) .build(); - writeClient = getHoodieWriteClient(config); + writeClient = getSparkRDDWriteClient(config); this.index = writeClient.getIndex(); } @@ -496,7 +496,7 @@ public void testSimpleGlobalIndexTagLocationWhenShouldUpdatePartitionPath() thro .withMetadataIndexColumnStats(true) .build()) .build(); - writeClient = getHoodieWriteClient(config); + writeClient = getSparkRDDWriteClient(config); index = writeClient.getIndex(); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 4421bd4d65442..f5673509eb2f0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -102,7 +102,7 @@ public void setUp() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) .withIndexConfig(indexBuilder.build()) .build(); - writeClient = getHoodieWriteClient(config); + writeClient = getSparkRDDWriteClient(config); } @AfterEach diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 3ad8952feea84..2edf19f0a5954 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -78,7 +78,7 @@ public void setUp() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) .withIndexConfig(indexBuilder.build()) .build(); - writeClient = getHoodieWriteClient(config); + writeClient = getSparkRDDWriteClient(config); } @AfterEach diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 72749160e6bd0..1ee1cc6041f1b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -95,7 +95,8 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap HoodieWriteConfig cfg = getConfigBuilder() .withProperties(properties) .build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg);) { + FileSystem fs = FSUtils.getFs(basePath, hadoopConf); /** * Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times @@ -262,7 +263,7 @@ public void testHoodieMergeHandleWriteStatMetrics(ExternalSpillableMap.DiskMapTy HoodieWriteConfig config = getConfigBuilder() .withProperties(properties) .build(); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index b8545b0f63809..d9db7ca42d7c4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -339,7 +339,7 @@ private void testInsertAndCleanByVersions( .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg)) { final Function2, String, Integer> recordInsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); @@ -508,7 +508,7 @@ private void testInsertAndCleanByCommits( .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); final Function2, String, Integer> recordInsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); @@ -585,7 +585,7 @@ private void testFailedInsertAndCleanByCommits( .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); final Function2, String, Integer> recordInsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); @@ -656,7 +656,7 @@ protected List runCleaner( protected List runCleaner( HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure, Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { - SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + SparkRDDWriteClient writeClient = getSparkRDDWriteClient(config); String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d"); HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); @@ -1323,7 +1323,7 @@ private void testInsertAndCleanFailedWritesByVersions( .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg)) { final Function2, String, Integer> recordInsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index b9f025223b7df..bffb90f375c06 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -18,7 +18,7 @@ package org.apache.hudi.table; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.FileSlice; @@ -226,7 +226,7 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws List updatedRecords = dataGen.generateUpdates(newCommitTime, records); JavaRDD updatedRecordsRDD = jsc().parallelize(updatedRecords, 1); - HoodieReadClient readClient = new HoodieReadClient(context(), config); + SparkRDDReadClient readClient = new SparkRDDReadClient(context(), config); JavaRDD updatedTaggedRecordsRDD = readClient.tagLocation(updatedRecordsRDD); writeClient.startCommitWithTime(newCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 8114daa30f763..86f77d6cd3a44 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -165,7 +165,7 @@ public void testUpdateRecords(HoodieIndex.IndexType indexType) throws Exception HoodieWriteConfig config = makeHoodieClientConfigBuilder() .withProps(makeIndexConfig(indexType)).build(); String firstCommitTime = makeNewCommitTime(); - SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + SparkRDDWriteClient writeClient = getSparkRDDWriteClient(config); writeClient.startCommitWithTime(firstCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -484,7 +484,7 @@ public void testBulkInsertRecords(String bulkInsertMode) throws Exception { .withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) .withBulkInsertParallelism(2).withBulkInsertSortMode(bulkInsertMode).build(); String instantTime = makeNewCommitTime(); - SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + SparkRDDWriteClient writeClient = getSparkRDDWriteClient(config); writeClient.startCommitWithTime(instantTime); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient); @@ -524,7 +524,7 @@ public void testPartitionMetafileFormat(boolean partitionMetafileUseBaseFormat) } String instantTime = makeNewCommitTime(); - SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + SparkRDDWriteClient writeClient = getSparkRDDWriteClient(config); writeClient.startCommitWithTime(instantTime); // Insert new records diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index c3f4395b5a81a..a571a6f4732ea 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -20,7 +20,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; @@ -106,7 +106,7 @@ protected void validateDeltaCommit(String latestDeltaCommit, final Map runNextDeltaCommits(SparkRDDWriteClient client, final HoodieReadClient readClient, List deltaInstants, + protected List runNextDeltaCommits(SparkRDDWriteClient client, final SparkRDDReadClient readClient, List deltaInstants, List records, HoodieWriteConfig cfg, boolean insertFirst, List expPendingCompactionInstants) throws Exception { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index 87d8613303347..c377e6863358e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -18,7 +18,7 @@ package org.apache.hudi.table.action.compact; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; @@ -59,8 +59,8 @@ private HoodieWriteConfig getConfig(Boolean autoCommit) { public void testRollbackForInflightCompaction() throws Exception { // Rollback inflight compaction HoodieWriteConfig cfg = getConfig(false); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg);) { + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -119,8 +119,8 @@ public void testRollbackInflightIngestionWithPendingCompaction() throws Exceptio int numRecs = 2000; - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg);) { + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); List records = dataGen.generateInserts(firstInstantTime, numRecs); records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); @@ -161,8 +161,8 @@ public void testRollbackInflightIngestionWithPendingCompaction() throws Exceptio public void testInflightCompaction() throws Exception { // There is inflight compaction. Subsequent compaction run must work correctly HoodieWriteConfig cfg = getConfig(true); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg);) { + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -194,8 +194,8 @@ public void testInflightCompaction() throws Exception { public void testScheduleIngestionBeforePendingCompaction() throws Exception { // Case: Failure case. Latest pending compaction instant time must be earlier than this instant time HoodieWriteConfig cfg = getConfig(false); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -225,8 +225,8 @@ public void testScheduleCompactionAfterPendingIngestion() throws Exception { // Case: Failure case. Earliest ingestion inflight instant time must be later than compaction time HoodieWriteConfig cfg = getConfig(false); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -257,8 +257,8 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception { // Case: Failure case. Earliest ingestion inflight instant time must be later than compaction time HoodieWriteConfig cfg = getConfig(false); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); final String firstInstantTime = "001"; final String secondInstantTime = "004"; @@ -292,8 +292,8 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception { public void testCompactionAfterTwoDeltaCommits() throws Exception { // No Delta Commits after compaction request HoodieWriteConfig cfg = getConfig(true); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg);) { + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -313,8 +313,8 @@ public void testCompactionAfterTwoDeltaCommits() throws Exception { public void testInterleavedCompaction() throws Exception { // Case: Two delta commits before and after compaction schedule HoodieWriteConfig cfg = getConfig(true); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg);) { + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -341,8 +341,8 @@ public void testInterleavedCompaction() throws Exception { public void testCompactionOnReplacedFiles() throws Exception { // Schedule a compaction. Replace those file groups and ensure compaction completes successfully. HoodieWriteConfig cfg = getConfig(true); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + try (SparkRDDWriteClient client = getSparkRDDWriteClient(cfg);) { + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 59174a9371a58..49011d27252f9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -117,7 +117,7 @@ public void testCompactionEmpty() throws Exception { HoodieWriteConfig config = getConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(config);) { String newCommitTime = writeClient.startCommit(); List records = dataGen.generateInserts(newCommitTime, 100); @@ -133,7 +133,7 @@ public void testCompactionEmpty() throws Exception { @Test public void testScheduleCompactionWithInflightInstant() { HoodieWriteConfig config = getConfig(); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(config)) { // insert 100 records. String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -160,7 +160,7 @@ public void testWriteStatusContentsAfterCompaction() throws Exception { HoodieWriteConfig config = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) .build(); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(config)) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 7f1046ba90ce4..ab3217bb61a86 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -18,7 +18,7 @@ package org.apache.hudi.table.action.compact; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -56,9 +56,9 @@ private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits, int public void testCompactionIsNotScheduledEarly() throws Exception { // Given: make two commits HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_COMMITS); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(cfg)) { List records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 100); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); @@ -74,9 +74,9 @@ public void testSuccessfulCompactionBasedOnNumCommits() throws Exception { HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_COMMITS); List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(cfg)) { List records = dataGen.generateInserts(instants.get(0), 100); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // third commit, that will trigger compaction @@ -98,10 +98,10 @@ public void testSuccessfulCompactionBasedOnTime() throws Exception { // Given: make one commit HoodieWriteConfig cfg = getConfigForInlineCompaction(5, 10, CompactionTriggerStrategy.TIME_ELAPSED); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(cfg)) { String instantTime = HoodieActiveTimeline.createNewInstantTime(); List records = dataGen.generateInserts(instantTime, 10); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime), records, cfg, true, new ArrayList<>()); // after 10s, that will trigger compaction @@ -120,9 +120,9 @@ public void testSuccessfulCompactionBasedOnTime() throws Exception { public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception { // Given: make three commits HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_OR_TIME); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(cfg)) { List records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // Then: trigger the compaction because reach 3 commits. @@ -146,9 +146,9 @@ public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception { public void testSuccessfulCompactionBasedOnNumAndTime() throws Exception { // Given: make three commits HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_AND_TIME); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(cfg)) { List records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); List instants = IntStream.range(0, 3).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); @@ -175,9 +175,9 @@ public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception { .build(); List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); String instantTime2; - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(cfg)) { List records = dataGen.generateInserts(instants.get(0), 100); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // Schedule compaction instant2, make it in-flight (simulates inline compaction failing) instantTime2 = HoodieActiveTimeline.createNewInstantTime(); @@ -188,7 +188,7 @@ public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception { // When: a third commit happens HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2, 60, CompactionTriggerStrategy.NUM_COMMITS); String instantTime3 = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(inlineCfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(inlineCfg)) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); createNextDeltaCommit(instantTime3, dataGen.generateUpdates(instantTime3, 100), writeClient, metaClient, inlineCfg, false); } @@ -210,9 +210,9 @@ public void testCompactionRetryOnFailureBasedOnTime() throws Exception { .build(); String instantTime; List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(cfg)) { List records = dataGen.generateInserts(instants.get(0), 100); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // Schedule compaction instantTime, make it in-flight (simulates inline compaction failing) instantTime = HoodieActiveTimeline.createNewInstantTime(10000); @@ -223,7 +223,7 @@ public void testCompactionRetryOnFailureBasedOnTime() throws Exception { // When: commit happens after 10s HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(5, 10, CompactionTriggerStrategy.TIME_ELAPSED); String instantTime2; - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(inlineCfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(inlineCfg)) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); instantTime2 = HoodieActiveTimeline.createNewInstantTime(); createNextDeltaCommit(instantTime2, dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, false); @@ -247,9 +247,9 @@ public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception { .build(); String instantTime; List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(cfg)) { List records = dataGen.generateInserts(instants.get(0), 10); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getSparkRDDReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // Schedule compaction instantTime, make it in-flight (simulates inline compaction failing) instantTime = HoodieActiveTimeline.createNewInstantTime(); @@ -260,7 +260,7 @@ public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception { // When: a third commit happens HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_OR_TIME); String instantTime2; - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(inlineCfg)) { + try (SparkRDDWriteClient writeClient = getSparkRDDWriteClient(inlineCfg)) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); instantTime2 = HoodieActiveTimeline.createNewInstantTime(); createNextDeltaCommit(instantTime2, dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, false); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java index 33a1c58a3a991..f59549cd36ce1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java @@ -54,7 +54,7 @@ protected void twoUpsertCommitDataWithTwoPartitions(List firstPartiti dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); //1. prepare data HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); /** * Write 1 (only inserts) */ @@ -108,7 +108,7 @@ protected void insertOverwriteCommitDataWithTwoPartitions(List firstP //just generate two partitions dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); /** * Write 1 (upsert) */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index d8ce6612a443a..9a119387854c7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -178,7 +178,7 @@ public void testRollbackForCanIndexLogFile() throws IOException { //1. prepare data new HoodieTestDataGenerator().writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH}, basePath); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); // Write 1 (only inserts) String newCommitTime = "001"; client.startCommitWithTime(newCommitTime); @@ -305,7 +305,7 @@ public void testRollbackWhenFirstCommitFail() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .withRollbackUsingMarkers(false) .withPath(basePath).build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + try (SparkRDDWriteClient client = getSparkRDDWriteClient(config)) { client.startCommitWithTime("001"); client.insert(jsc.emptyRDD(), "001"); client.rollback("001"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 79f20b9f85c75..219e143f7211a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -161,7 +161,7 @@ public void testUpgradeZeroToOneInternal(boolean induceResiduesFromPrevUpgrade, metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); } HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); // prepare data. Make 2 commits, in which 2nd is not committed. List firstPartitionCommit2FileSlices = new ArrayList<>(); @@ -223,7 +223,7 @@ public void testUpgradeOneToTwo(HoodieTableType tableType) throws IOException { metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); } HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); // Write inserts doInsert(client); @@ -261,7 +261,7 @@ public void testUpgradeTwoToThree( cfgBuilder.withKeyGenerator(keyGeneratorClass.get()); } HoodieWriteConfig cfg = cfgBuilder.build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); // Write inserts doInsert(client); @@ -297,7 +297,7 @@ public void testUpgradeDowngradeBetweenThreeAndCurrentVersion() throws IOExcepti HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); // write inserts - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); doInsert(client); // current version should have TABLE_CHECKSUM key @@ -392,7 +392,7 @@ public void testDowngrade(boolean deletePartialMarkerFiles, HoodieTableType tabl } HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(true) .withMarkersType(markerType.name()).withProps(params).build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); if (fromVersion == HoodieTableVersion.TWO) { // set table configs @@ -548,7 +548,7 @@ private List triggerCommit(String newCommitTime, HoodieTableType t params.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); } HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(enableMarkedBasedRollback).withProps(params).build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); + SparkRDDWriteClient client = getSparkRDDWriteClient(cfg); client.startCommitWithTime(newCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java index 11c615a76555a..9d28577059404 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java @@ -19,7 +19,7 @@ package org.apache.hudi.testutils; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -139,7 +139,7 @@ public synchronized void runBeforeEach() throws Exception { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 4504c552c95d6..803b9fb2cb019 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -27,7 +27,7 @@ import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -128,7 +128,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient ExecutorService executorService; protected transient HoodieTableMetaClient metaClient; protected transient SparkRDDWriteClient writeClient; - protected transient HoodieReadClient readClient; + protected transient SparkRDDReadClient readClient; protected transient HoodieTableFileSystemView tableView; protected transient TimelineService timelineService; @@ -453,12 +453,12 @@ private void initFileSystemWithConfiguration(Configuration configuration) { } } - public HoodieReadClient getHoodieReadClient(String basePath) { - readClient = new HoodieReadClient(context, basePath, SQLContext.getOrCreate(jsc.sc())); + public SparkRDDReadClient getSparkRDDReadClient(String basePath) { + readClient = new SparkRDDReadClient(context, basePath, SQLContext.getOrCreate(jsc.sc())); return readClient; } - public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { + public SparkRDDWriteClient getSparkRDDWriteClient(HoodieWriteConfig cfg) { if (null != writeClient) { writeClient.close(); writeClient = null; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 75d2d14221d32..8908a8bf5a901 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -19,7 +19,7 @@ package org.apache.hudi.testutils; import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -100,7 +100,7 @@ public static SparkConf getSparkConfForTest(String appName) { sparkConf.set("spark.eventLog.dir", evlogDir); } - return HoodieReadClient.addHoodieSupport(sparkConf); + return SparkRDDReadClient.addHoodieSupport(sparkConf); } private static HashMap getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline, diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index f9676c6c477be..ba97a87e35583 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -180,7 +180,7 @@ public synchronized void runBeforeEach() { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java index 212dcc440933f..c8c17a3c10552 100644 --- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java @@ -18,7 +18,7 @@ package org.apache.hudi.examples.quickstart; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieAvroPayload; @@ -86,7 +86,7 @@ public synchronized void runBeforeEach() { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 4042f431d7d56..842a9a6f96f22 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -18,7 +18,7 @@ package org.apache.hudi; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -253,7 +253,7 @@ public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey, public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, HoodieWriteConfig writeConfig) { try { - HoodieReadClient client = new HoodieReadClient<>(new HoodieSparkEngineContext(jssc), writeConfig); + SparkRDDReadClient client = new SparkRDDReadClient<>(new HoodieSparkEngineContext(jssc), writeConfig); return client.tagLocation(incomingHoodieRecords) .filter(r -> !((HoodieRecord) r).isCurrentLocationKnown()); } catch (TableNotFoundException e) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java index 9312a26b4f950..03d8ad9ad80bc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java @@ -21,7 +21,7 @@ import org.apache.hudi.avro.model.HoodieIndexCommitMetadata; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -80,7 +80,7 @@ public void init() throws IOException { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java index 8d3917f066dbd..00cf3ae8832b2 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java @@ -20,7 +20,7 @@ package org.apache.hudi.utilities; import org.apache.hudi.HoodieTestCommitGenerator; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -94,7 +94,7 @@ public void initWithCleanState() throws IOException { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext());