From ee081dda0813d6bf188b09c4a229e84dc863f728 Mon Sep 17 00:00:00 2001 From: simonsssu Date: Sun, 18 Sep 2022 06:16:52 +0800 Subject: [PATCH] [HUDI-3959] Rename class name for spark rdd reader (#5409) Co-authored-by: Y Ethan Guo --- .../functional/CLIFunctionalTestHarness.java | 4 +- .../apache/hudi/client/HoodieReadClient.java | 199 +-------------- .../hudi/client/SparkRDDReadClient.java | 234 ++++++++++++++++++ .../hudi/client/TestHoodieReadClient.java | 8 +- .../table/TestHoodieMergeOnReadTable.java | 4 +- .../action/compact/CompactionTestBase.java | 4 +- .../action/compact/TestAsyncCompaction.java | 20 +- .../action/compact/TestInlineCompaction.java | 20 +- .../hudi/testutils/FunctionalTestHarness.java | 4 +- .../testutils/HoodieClientTestHarness.java | 8 +- .../hudi/testutils/HoodieClientTestUtils.java | 4 +- .../SparkClientFunctionalTestHarness.java | 4 +- .../quickstart/TestHoodieSparkQuickstart.java | 4 +- .../java/org/apache/hudi/DataSourceUtils.java | 4 +- .../hudi/utilities/TestHoodieIndexer.java | 4 +- .../hudi/utilities/TestHoodieRepairTool.java | 4 +- 16 files changed, 290 insertions(+), 239 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java index 7a12a6692a2bc..a8f27c3d6be36 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java @@ -19,7 +19,7 @@ package org.apache.hudi.cli.functional; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; @@ -107,7 +107,7 @@ public synchronized void runBeforeEach() { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java index 97e54070cf4e8..7277479f64ec0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java @@ -18,217 +18,34 @@ package org.apache.hudi.client; -import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieAvroRecord; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.CompactionUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.data.HoodieJavaRDD; -import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.index.SparkHoodieIndexFactory; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; -import org.apache.hadoop.conf.Configuration; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.types.StructType; - -import java.io.Serializable; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import scala.Tuple2; /** * Provides an RDD based API for accessing/filtering Hoodie tables, based on keys. + * + * @deprecated This. Use {@link SparkRDDReadClient instead.} */ -public class HoodieReadClient> implements Serializable { +@Deprecated +public class HoodieReadClient> extends SparkRDDReadClient { - private static final long serialVersionUID = 1L; - - /** - * TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple - * base path pointing to the table. Until, then just always assume a BloomIndex - */ - private final transient HoodieIndex index; - private HoodieTable hoodieTable; - private transient Option sqlContextOpt; - private final transient HoodieSparkEngineContext context; - private final transient Configuration hadoopConf; - - /** - * @param basePath path to Hoodie table - */ public HoodieReadClient(HoodieSparkEngineContext context, String basePath) { - this(context, HoodieWriteConfig.newBuilder().withPath(basePath) - // by default we use HoodieBloomIndex - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build()); + super(context, basePath); } - /** - * @param context - * @param basePath - * @param sqlContext - */ public HoodieReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext) { - this(context, basePath); - this.sqlContextOpt = Option.of(sqlContext); + super(context, basePath, sqlContext); } - /** - * Initializes the {@link HoodieReadClient} with engine context, base path, SQL context and index type. - * - * @param context Hudi Spark engine context - * @param basePath Base path of the table - * @param sqlContext {@link SQLContext} instance - * @param indexType Hudi index type - */ public HoodieReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext, HoodieIndex.IndexType indexType) { - this(context, HoodieWriteConfig.newBuilder().withPath(basePath) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).build()); - this.sqlContextOpt = Option.of(sqlContext); + super(context, basePath, sqlContext, indexType); } - /** - * @param clientConfig instance of HoodieWriteConfig - */ public HoodieReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clientConfig) { - this.context = context; - this.hadoopConf = context.getHadoopConf().get(); - final String basePath = clientConfig.getBasePath(); - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); - this.hoodieTable = HoodieSparkTable.create(clientConfig, context, metaClient); - this.index = SparkHoodieIndexFactory.createIndex(clientConfig); - this.sqlContextOpt = Option.empty(); - } - - /** - * Adds support for accessing Hoodie built tables from SparkSQL, as you normally would. - * - * @return SparkConf object to be used to construct the SparkContext by caller - */ - public static SparkConf addHoodieSupport(SparkConf conf) { - conf.set("spark.sql.hive.convertMetastoreParquet", "false"); - return conf; - } - - private void assertSqlContext() { - if (!sqlContextOpt.isPresent()) { - throw new IllegalStateException("SQLContext must be set, when performing dataframe operations"); - } - } - - private Option convertToDataFilePath(Option> partitionPathFileIDPair) { - if (partitionPathFileIDPair.isPresent()) { - HoodieBaseFile dataFile = hoodieTable.getBaseFileOnlyView() - .getLatestBaseFile(partitionPathFileIDPair.get().getLeft(), partitionPathFileIDPair.get().getRight()).get(); - return Option.of(dataFile.getPath()); - } else { - return Option.empty(); - } - } - - /** - * Given a bunch of hoodie keys, fetches all the individual records out as a data frame. - * - * @return a dataframe - */ - public Dataset readROView(JavaRDD hoodieKeys, int parallelism) { - assertSqlContext(); - JavaPairRDD>> lookupResultRDD = checkExists(hoodieKeys); - JavaPairRDD> keyToFileRDD = - lookupResultRDD.mapToPair(r -> new Tuple2<>(r._1, convertToDataFilePath(r._2))); - List paths = keyToFileRDD.filter(keyFileTuple -> keyFileTuple._2().isPresent()) - .map(keyFileTuple -> keyFileTuple._2().get()).collect(); - - // record locations might be same for multiple keys, so need a unique list - Set uniquePaths = new HashSet<>(paths); - Dataset originalDF = null; - // read files based on the file extension name - if (paths.size() == 0 || paths.get(0).endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - originalDF = sqlContextOpt.get().read().parquet(uniquePaths.toArray(new String[uniquePaths.size()])); - } else if (paths.get(0).endsWith(HoodieFileFormat.ORC.getFileExtension())) { - originalDF = sqlContextOpt.get().read().orc(uniquePaths.toArray(new String[uniquePaths.size()])); - } - StructType schema = originalDF.schema(); - JavaPairRDD keyRowRDD = originalDF.javaRDD().mapToPair(row -> { - HoodieKey key = new HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD), - row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD)); - return new Tuple2<>(key, row); - }); - - // Now, we need to further filter out, for only rows that match the supplied hoodie keys - JavaRDD rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple -> tuple._2()._1()); - return sqlContextOpt.get().createDataFrame(rowRDD, schema); - } - - /** - * Checks if the given [Keys] exists in the hoodie table and returns [Key, Option[FullFilePath]] If the optional - * FullFilePath value is not present, then the key is not found. If the FullFilePath value is present, it is the path - * component (without scheme) of the URI underlying file - */ - public JavaPairRDD>> checkExists(JavaRDD hoodieKeys) { - return HoodieJavaRDD.getJavaRDD( - index.tagLocation(HoodieJavaRDD.of(hoodieKeys.map(k -> new HoodieAvroRecord<>(k, null))), - context, hoodieTable)) - .mapToPair(hr -> new Tuple2<>(hr.getKey(), hr.isCurrentLocationKnown() - ? Option.of(Pair.of(hr.getPartitionPath(), hr.getCurrentLocation().getFileId())) - : Option.empty()) - ); - } - - /** - * Filter out HoodieRecords that already exists in the output folder. This is useful in deduplication. - * - * @param hoodieRecords Input RDD of Hoodie records. - * @return A subset of hoodieRecords RDD, with existing records filtered out. - */ - public JavaRDD> filterExists(JavaRDD> hoodieRecords) { - JavaRDD> recordsWithLocation = tagLocation(hoodieRecords); - return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); - } - - /** - * Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually - * present). Input RDD should contain no duplicates if needed. - * - * @param hoodieRecords Input RDD of Hoodie records - * @return Tagged RDD of Hoodie records - */ - public JavaRDD> tagLocation(JavaRDD> hoodieRecords) throws HoodieIndexException { - return HoodieJavaRDD.getJavaRDD( - index.tagLocation(HoodieJavaRDD.of(hoodieRecords), context, hoodieTable)); - } - - /** - * Return all pending compactions with instant time for clients to decide what to compact next. - * - * @return - */ - public List> getPendingCompactions() { - HoodieTableMetaClient metaClient = - HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(hoodieTable.getMetaClient().getBasePath()).setLoadActiveTimelineOnLoad(true).build(); - return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream() - .map( - instantWorkloadPair -> Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue())) - .collect(Collectors.toList()); + super(context, clientConfig); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java new file mode 100644 index 0000000000000..adddabfdc0299 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.SparkHoodieIndexFactory; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.StructType; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import scala.Tuple2; + +/** + * Provides an RDD based API for accessing/filtering Hoodie tables, based on keys. + */ +public class SparkRDDReadClient> implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple + * base path pointing to the table. Until, then just always assume a BloomIndex + */ + private final transient HoodieIndex index; + private HoodieTable hoodieTable; + private transient Option sqlContextOpt; + private final transient HoodieSparkEngineContext context; + private final transient Configuration hadoopConf; + + /** + * @param basePath path to Hoodie table + */ + public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath) { + this(context, HoodieWriteConfig.newBuilder().withPath(basePath) + // by default we use HoodieBloomIndex + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build()); + } + + /** + * @param context + * @param basePath + * @param sqlContext + */ + public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext) { + this(context, basePath); + this.sqlContextOpt = Option.of(sqlContext); + } + + /** + * Initializes the {@link HoodieReadClient} with engine context, base path, SQL context and index type. + * + * @param context Hudi Spark engine context + * @param basePath Base path of the table + * @param sqlContext {@link SQLContext} instance + * @param indexType Hudi index type + */ + public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext, HoodieIndex.IndexType indexType) { + this(context, HoodieWriteConfig.newBuilder().withPath(basePath) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).build()); + this.sqlContextOpt = Option.of(sqlContext); + } + + /** + * @param clientConfig instance of HoodieWriteConfig + */ + public SparkRDDReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clientConfig) { + this.context = context; + this.hadoopConf = context.getHadoopConf().get(); + final String basePath = clientConfig.getBasePath(); + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); + this.hoodieTable = HoodieSparkTable.create(clientConfig, context, metaClient); + this.index = SparkHoodieIndexFactory.createIndex(clientConfig); + this.sqlContextOpt = Option.empty(); + } + + /** + * Adds support for accessing Hoodie built tables from SparkSQL, as you normally would. + * + * @return SparkConf object to be used to construct the SparkContext by caller + */ + public static SparkConf addHoodieSupport(SparkConf conf) { + conf.set("spark.sql.hive.convertMetastoreParquet", "false"); + return conf; + } + + private void assertSqlContext() { + if (!sqlContextOpt.isPresent()) { + throw new IllegalStateException("SQLContext must be set, when performing dataframe operations"); + } + } + + private Option convertToDataFilePath(Option> partitionPathFileIDPair) { + if (partitionPathFileIDPair.isPresent()) { + HoodieBaseFile dataFile = hoodieTable.getBaseFileOnlyView() + .getLatestBaseFile(partitionPathFileIDPair.get().getLeft(), partitionPathFileIDPair.get().getRight()).get(); + return Option.of(dataFile.getPath()); + } else { + return Option.empty(); + } + } + + /** + * Given a bunch of hoodie keys, fetches all the individual records out as a data frame. + * + * @return a dataframe + */ + public Dataset readROView(JavaRDD hoodieKeys, int parallelism) { + assertSqlContext(); + JavaPairRDD>> lookupResultRDD = checkExists(hoodieKeys); + JavaPairRDD> keyToFileRDD = + lookupResultRDD.mapToPair(r -> new Tuple2<>(r._1, convertToDataFilePath(r._2))); + List paths = keyToFileRDD.filter(keyFileTuple -> keyFileTuple._2().isPresent()) + .map(keyFileTuple -> keyFileTuple._2().get()).collect(); + + // record locations might be same for multiple keys, so need a unique list + Set uniquePaths = new HashSet<>(paths); + Dataset originalDF = null; + // read files based on the file extension name + if (paths.size() == 0 || paths.get(0).endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + originalDF = sqlContextOpt.get().read().parquet(uniquePaths.toArray(new String[uniquePaths.size()])); + } else if (paths.get(0).endsWith(HoodieFileFormat.ORC.getFileExtension())) { + originalDF = sqlContextOpt.get().read().orc(uniquePaths.toArray(new String[uniquePaths.size()])); + } + StructType schema = originalDF.schema(); + JavaPairRDD keyRowRDD = originalDF.javaRDD().mapToPair(row -> { + HoodieKey key = new HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD), + row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD)); + return new Tuple2<>(key, row); + }); + + // Now, we need to further filter out, for only rows that match the supplied hoodie keys + JavaRDD rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple -> tuple._2()._1()); + return sqlContextOpt.get().createDataFrame(rowRDD, schema); + } + + /** + * Checks if the given [Keys] exists in the hoodie table and returns [Key, Option[FullFilePath]] If the optional + * FullFilePath value is not present, then the key is not found. If the FullFilePath value is present, it is the path + * component (without scheme) of the URI underlying file + */ + public JavaPairRDD>> checkExists(JavaRDD hoodieKeys) { + return HoodieJavaRDD.getJavaRDD( + index.tagLocation(HoodieJavaRDD.of(hoodieKeys.map(k -> new HoodieAvroRecord<>(k, null))), + context, hoodieTable)) + .mapToPair(hr -> new Tuple2<>(hr.getKey(), hr.isCurrentLocationKnown() + ? Option.of(Pair.of(hr.getPartitionPath(), hr.getCurrentLocation().getFileId())) + : Option.empty()) + ); + } + + /** + * Filter out HoodieRecords that already exists in the output folder. This is useful in deduplication. + * + * @param hoodieRecords Input RDD of Hoodie records. + * @return A subset of hoodieRecords RDD, with existing records filtered out. + */ + public JavaRDD> filterExists(JavaRDD> hoodieRecords) { + JavaRDD> recordsWithLocation = tagLocation(hoodieRecords); + return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); + } + + /** + * Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually + * present). Input RDD should contain no duplicates if needed. + * + * @param hoodieRecords Input RDD of Hoodie records + * @return Tagged RDD of Hoodie records + */ + public JavaRDD> tagLocation(JavaRDD> hoodieRecords) throws HoodieIndexException { + return HoodieJavaRDD.getJavaRDD( + index.tagLocation(HoodieJavaRDD.of(hoodieRecords), context, hoodieTable)); + } + + /** + * Return all pending compactions with instant time for clients to decide what to compact next. + * + * @return + */ + public List> getPendingCompactions() { + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(hoodieTable.getMetaClient().getBasePath()).setLoadActiveTimelineOnLoad(true).build(); + return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream() + .map( + instantWorkloadPair -> Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue())) + .collect(Collectors.toList()); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java index 872a4a4215ffc..5ff92fe197faf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java @@ -85,7 +85,7 @@ public void testReadFilterExistAfterBulkInsertPrepped() throws Exception { @Test public void testReadROViewFailsWithoutSqlContext() { - HoodieReadClient readClient = new HoodieReadClient(context, getConfig()); + SparkRDDReadClient readClient = new SparkRDDReadClient(context, getConfig()); JavaRDD recordsRDD = jsc.parallelize(new ArrayList<>(), 1); assertThrows(IllegalStateException.class, () -> { readClient.readROView(recordsRDD, 1); @@ -103,7 +103,7 @@ public void testReadROViewFailsWithoutSqlContext() { private void testReadFilterExist(HoodieWriteConfig config, Function3, SparkRDDWriteClient, JavaRDD, String> writeFn) throws Exception { try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { - HoodieReadClient readClient = getHoodieReadClient(config.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(config.getBasePath()); String newCommitTime = writeClient.startCommit(); List records = dataGen.generateInserts(newCommitTime, 100); JavaRDD recordsRDD = jsc.parallelize(records, 1); @@ -119,7 +119,7 @@ private void testReadFilterExist(HoodieWriteConfig config, // Verify there are no errors assertNoWriteErrors(statuses); - HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath()); + SparkRDDReadClient anotherReadClient = getHoodieReadClient(config.getBasePath()); filteredRDD = anotherReadClient.filterExists(recordsRDD); List result = filteredRDD.collect(); // Check results @@ -212,7 +212,7 @@ private void testTagLocation(HoodieWriteConfig hoodieWriteConfig, jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream) .map(record -> new HoodieAvroRecord(record.getKey(), null)).collect(Collectors.toList())); // Should have 100 records in table (check using Index), all in locations marked at commit - HoodieReadClient readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath()); List taggedRecords = readClient.tagLocation(recordRDD).collect(); checkTaggedRecords(taggedRecords, newCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 0b80d20b39c36..18f764c1fa25f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -18,7 +18,7 @@ package org.apache.hudi.table; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.FileSlice; @@ -233,7 +233,7 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws List updatedRecords = dataGen.generateUpdates(newCommitTime, records); JavaRDD updatedRecordsRDD = jsc().parallelize(updatedRecords, 1); - HoodieReadClient readClient = new HoodieReadClient(context(), config); + SparkRDDReadClient readClient = new SparkRDDReadClient(context(), config); JavaRDD updatedTaggedRecordsRDD = readClient.tagLocation(updatedRecordsRDD); writeClient.startCommitWithTime(newCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index c3f4395b5a81a..a571a6f4732ea 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -20,7 +20,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; @@ -106,7 +106,7 @@ protected void validateDeltaCommit(String latestDeltaCommit, final Map runNextDeltaCommits(SparkRDDWriteClient client, final HoodieReadClient readClient, List deltaInstants, + protected List runNextDeltaCommits(SparkRDDWriteClient client, final SparkRDDReadClient readClient, List deltaInstants, List records, HoodieWriteConfig cfg, boolean insertFirst, List expPendingCompactionInstants) throws Exception { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index 87d8613303347..f673872804aff 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -18,7 +18,7 @@ package org.apache.hudi.table.action.compact; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; @@ -60,7 +60,7 @@ public void testRollbackForInflightCompaction() throws Exception { // Rollback inflight compaction HoodieWriteConfig cfg = getConfig(false); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -120,7 +120,7 @@ public void testRollbackInflightIngestionWithPendingCompaction() throws Exceptio int numRecs = 2000; try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); List records = dataGen.generateInserts(firstInstantTime, numRecs); records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); @@ -162,7 +162,7 @@ public void testInflightCompaction() throws Exception { // There is inflight compaction. Subsequent compaction run must work correctly HoodieWriteConfig cfg = getConfig(true); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -195,7 +195,7 @@ public void testScheduleIngestionBeforePendingCompaction() throws Exception { // Case: Failure case. Latest pending compaction instant time must be earlier than this instant time HoodieWriteConfig cfg = getConfig(false); SparkRDDWriteClient client = getHoodieWriteClient(cfg); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -226,7 +226,7 @@ public void testScheduleCompactionAfterPendingIngestion() throws Exception { HoodieWriteConfig cfg = getConfig(false); SparkRDDWriteClient client = getHoodieWriteClient(cfg); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -258,7 +258,7 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception { HoodieWriteConfig cfg = getConfig(false); SparkRDDWriteClient client = getHoodieWriteClient(cfg); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); final String firstInstantTime = "001"; final String secondInstantTime = "004"; @@ -293,7 +293,7 @@ public void testCompactionAfterTwoDeltaCommits() throws Exception { // No Delta Commits after compaction request HoodieWriteConfig cfg = getConfig(true); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -314,7 +314,7 @@ public void testInterleavedCompaction() throws Exception { // Case: Two delta commits before and after compaction schedule HoodieWriteConfig cfg = getConfig(true); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -342,7 +342,7 @@ public void testCompactionOnReplacedFiles() throws Exception { // Schedule a compaction. Replace those file groups and ensure compaction completes successfully. HoodieWriteConfig cfg = getConfig(true); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 24d387ec3f070..32d2dcda95e32 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -18,7 +18,7 @@ package org.apache.hudi.table.action.compact; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieRecord; @@ -73,7 +73,7 @@ public void testCompactionIsNotScheduledEarly() throws Exception { HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_COMMITS); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { List records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 100); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); @@ -91,7 +91,7 @@ public void testSuccessfulCompactionBasedOnNumCommits() throws Exception { try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { List records = dataGen.generateInserts(instants.get(0), 100); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // third commit, that will trigger compaction @@ -117,7 +117,7 @@ public void testSuccessfulCompactionBasedOnNumAfterCompactionRequest() throws Ex try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { List records = dataGen.generateInserts(instants.get(0), 100); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); // step 1: create and complete 4 delta commit, then create 1 compaction request after this runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); @@ -175,7 +175,7 @@ public void testSuccessfulCompactionBasedOnTime() throws Exception { try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { String instantTime = HoodieActiveTimeline.createNewInstantTime(); List records = dataGen.generateInserts(instantTime, 10); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime), records, cfg, true, new ArrayList<>()); // after 10s, that will trigger compaction @@ -196,7 +196,7 @@ public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception { HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_OR_TIME); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { List records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // Then: trigger the compaction because reach 3 commits. @@ -222,7 +222,7 @@ public void testSuccessfulCompactionBasedOnNumAndTime() throws Exception { HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_AND_TIME); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { List records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); List instants = IntStream.range(0, 3).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); @@ -251,7 +251,7 @@ public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception { String instantTime2; try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { List records = dataGen.generateInserts(instants.get(0), 100); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // Schedule compaction instant2, make it in-flight (simulates inline compaction failing) instantTime2 = HoodieActiveTimeline.createNewInstantTime(); @@ -286,7 +286,7 @@ public void testCompactionRetryOnFailureBasedOnTime() throws Exception { List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { List records = dataGen.generateInserts(instants.get(0), 100); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // Schedule compaction instantTime, make it in-flight (simulates inline compaction failing) instantTime = HoodieActiveTimeline.createNewInstantTime(10000); @@ -325,7 +325,7 @@ public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception { List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { List records = dataGen.generateInserts(instants.get(0), 10); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // Schedule compaction instantTime, make it in-flight (simulates inline compaction failing) instantTime = HoodieActiveTimeline.createNewInstantTime(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java index 11c615a76555a..9d28577059404 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java @@ -19,7 +19,7 @@ package org.apache.hudi.testutils; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -139,7 +139,7 @@ public synchronized void runBeforeEach() throws Exception { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 564870a4ca422..e6a4d63e8c9e2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -28,7 +28,7 @@ import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -132,7 +132,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient ExecutorService executorService; protected transient HoodieTableMetaClient metaClient; protected transient SparkRDDWriteClient writeClient; - protected transient HoodieReadClient readClient; + protected transient SparkRDDReadClient readClient; protected transient HoodieTableFileSystemView tableView; protected transient TimelineService timelineService; @@ -481,8 +481,8 @@ private void initFileSystemWithConfiguration(Configuration configuration) { } } - public HoodieReadClient getHoodieReadClient(String basePath) { - readClient = new HoodieReadClient(context, basePath, SQLContext.getOrCreate(jsc.sc())); + public SparkRDDReadClient getHoodieReadClient(String basePath) { + readClient = new SparkRDDReadClient(context, basePath, SQLContext.getOrCreate(jsc.sc())); return readClient; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 3387dd24bb3b0..458af3ad9e60b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -19,7 +19,7 @@ package org.apache.hudi.testutils; import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -100,7 +100,7 @@ public static SparkConf getSparkConfForTest(String appName) { sparkConf.set("spark.eventLog.dir", evlogDir); } - return HoodieReadClient.addHoodieSupport(sparkConf); + return SparkRDDReadClient.addHoodieSupport(sparkConf); } private static HashMap getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline, diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index ba1afbebb2529..cb7b2e6b3c43a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -185,7 +185,7 @@ public synchronized void runBeforeEach() { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java index 32c51788ee85b..c23db7f8e7106 100644 --- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java @@ -18,7 +18,7 @@ package org.apache.hudi.examples.quickstart; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieAvroPayload; @@ -95,7 +95,7 @@ public synchronized void runBeforeEach() { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 5d3e0bc3eb860..ee807f49dae89 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -21,7 +21,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -246,7 +246,7 @@ public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey, public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, HoodieWriteConfig writeConfig) { try { - HoodieReadClient client = new HoodieReadClient<>(new HoodieSparkEngineContext(jssc), writeConfig); + SparkRDDReadClient client = new SparkRDDReadClient<>(new HoodieSparkEngineContext(jssc), writeConfig); return client.tagLocation(incomingHoodieRecords) .filter(r -> !((HoodieRecord) r).isCurrentLocationKnown()); } catch (TableNotFoundException e) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java index 9c4fc076660f0..87afd56d83c06 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java @@ -21,7 +21,7 @@ import org.apache.hudi.avro.model.HoodieIndexCommitMetadata; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -89,7 +89,7 @@ public void init() throws IOException { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java index 8d3917f066dbd..00cf3ae8832b2 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java @@ -20,7 +20,7 @@ package org.apache.hudi.utilities; import org.apache.hudi.HoodieTestCommitGenerator; -import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -94,7 +94,7 @@ public void initWithCleanState() throws IOException { if (!initialized) { SparkConf sparkConf = conf(); SparkRDDWriteClient.registerClasses(sparkConf); - HoodieReadClient.addHoodieSupport(sparkConf); + SparkRDDReadClient.addHoodieSupport(sparkConf); spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext());