diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 01a28d056c643..523cb53ba7be4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -20,6 +20,7 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX import org.apache.hudi.HoodieBaseRelation.projectReader import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.hadoop.HoodieROTablePathFilter @@ -134,7 +135,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, * rule; you can find more details in HUDI-3896) */ def toHadoopFsRelation: HadoopFsRelation = { - if (globPaths.isEmpty) { + val enableFileIndex = HoodieSparkConfUtils.getBooleanConfigValue( + optParams, sparkSession.sessionState.conf, ENABLE_HOODIE_FILE_INDEX.key, ENABLE_HOODIE_FILE_INDEX.defaultValue) + if (enableFileIndex && globPaths.isEmpty) { // NOTE: There are currently 2 ways partition values could be fetched: // - Source columns (producing the values used for physical partitioning) will be read // from the data file diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 0773bd850a308..40e1353a54b98 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -251,10 +251,11 @@ case class HoodieFileIndex(spark: SparkSession, override def sizeInBytes: Long = getTotalCachedFilesSize - private def isDataSkippingEnabled: Boolean = HoodieFileIndex.getBooleanConfigValue(options, spark.sessionState.conf, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), - "false") + private def isDataSkippingEnabled: Boolean = HoodieSparkConfUtils.getBooleanConfigValue( + options, spark.sessionState.conf, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), false) private def isMetadataTableEnabled: Boolean = metadataConfig.enabled() + private def isColumnStatsIndexEnabled: Boolean = metadataConfig.isColumnStatsIndexEnabled private def validateConfig(): Unit = { @@ -268,10 +269,6 @@ case class HoodieFileIndex(spark: SparkSession, object HoodieFileIndex extends Logging { - def getBooleanConfigValue(options: Map[String, String], sqlConf: SQLConf, configKey: String, defaultValue: String) : Boolean = { - options.getOrElse(configKey, sqlConf.getConfString(configKey, defaultValue)).toBoolean - } - object DataSkippingFailureMode extends Enumeration { val configName = "hoodie.fileIndex.dataSkippingFailureMode" @@ -305,7 +302,8 @@ object HoodieFileIndex extends Logging { // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. val isMetadataFilesPartitionAvailable = isFilesPartitionAvailable(metaClient) && - getBooleanConfigValue(options, sqlConf, HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString) + HoodieSparkConfUtils.getBooleanConfigValue( + options, sqlConf, HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS) properties.putAll(options.filter(p => p._2 != null).asJava) properties.setProperty(HoodieMetadataConfig.ENABLE.key(), String.valueOf(isMetadataFilesPartitionAvailable)) properties diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala new file mode 100644 index 0000000000000..0d85ace0bf86a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.spark.sql.internal.SQLConf + +/** + * Util methods for Hudi Spark and SQL configurations + */ +object HoodieSparkConfUtils { + /** + * Gets boolean config value from config properties and SQL conf. + * + * @param options Config properties. + * @param sqlConf SQL conf. + * @param configKey Config key to fetch. + * @param defaultValue Default value to return if not configured. + * @return The config value. + */ + def getBooleanConfigValue(options: Map[String, String], + sqlConf: SQLConf, + configKey: String, + defaultValue: Boolean): Boolean = { + options.getOrElse(configKey, sqlConf.getConfString(configKey, defaultValue.toString)).toBoolean + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 375abcebfeee5..a0a060d4e2801 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -48,8 +48,8 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource} -import java.util.Properties +import java.util.Properties import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.util.Random @@ -67,6 +67,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase with ScalaAssertionSuppor ) var queryOpts = Map( + DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key -> "true", DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL ) @@ -223,7 +224,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase with ScalaAssertionSuppor ) val prunedPartitions = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty) .map(_.values.toSeq(Seq(StringType)) - .mkString(",")) + .mkString(",")) .toList .sorted diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index b5d7f91dc8001..21b6a34c94f4e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -20,8 +20,9 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs} -import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig} +import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator @@ -30,7 +31,7 @@ import org.apache.hudi.common.util import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.metrics.HoodieMetricsConfig import org.apache.hudi.exception.ExceptionUtil.getRootCause -import org.apache.hudi.exception.{HoodieException, HoodieUpsertException, SchemaCompatibilityException} +import org.apache.hudi.exception.HoodieException import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config @@ -38,8 +39,8 @@ import org.apache.hudi.metrics.Metrics import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.util.JFunction import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger, QuickstartUtils, ScalaAssertionSupport} -import org.apache.spark.sql.{HoodieInternalRowUtils, _} -import org.apache.spark.sql.functions.{col, concat, lit, udf, when} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.types._ import org.joda.time.DateTime @@ -47,12 +48,10 @@ import org.joda.time.format.DateTimeFormat import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{CsvSource, EnumSource} +import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource} import java.sql.{Date, Timestamp} import java.util.function.Consumer -import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType - import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -170,9 +169,10 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport * For COW table, test the snapshot query mode and incremental query mode. */ @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testPrunePartitionForTimestampBasedKeyGenerator(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @CsvSource(Array("true,AVRO", "true,SPARK", "false,AVRO", "false,SPARK")) + def testPrunePartitionForTimestampBasedKeyGenerator(enableFileIndex: Boolean, + recordType: HoodieRecordType): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex) val options = commonOpts ++ Map( "hoodie.compact.inline" -> "false", @@ -207,12 +207,16 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport val commit2Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp // snapshot query - val snapshotQueryRes = spark.read.format("hudi").options(readOpts).load(basePath) + val pathForReader = getPathForReader(basePath, !enableFileIndex, 3) + val snapshotQueryRes = spark.read.format("hudi").options(readOpts).load(pathForReader) // TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10 - //assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20) - //assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30) - assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 20) - assertEquals(snapshotQueryRes.where("partition = '2022/01/02'").count, 30) + if (enableFileIndex) { + assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 20) + assertEquals(snapshotQueryRes.where("partition = '2022/01/02'").count, 30) + } else { + assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20) + assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30) + } // incremental query val incrementalQueryRes = spark.read.format("hudi") @@ -286,7 +290,7 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport assertEquals(100, snapshotDF1.count()) val records2 = deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList - val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2)) + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(writeOpts) @@ -802,13 +806,11 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) } - @ParameterizedTest - @CsvSource(Array( - "true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO", - "true,false,SPARK", "true,true,SPARK", "false,true,SPARK", "false,false,SPARK" - )) - def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean, isMetadataEnabled: Boolean, recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + private def testPartitionPruning(enableFileIndex: Boolean, + partitionEncode: Boolean, + isMetadataEnabled: Boolean, + recordType: HoodieRecordType): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex) val N = 20 // Test query with partition prune if URL_ENCODE_PARTITIONING has enable @@ -824,11 +826,12 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15") + val pathForReader = getPathForReader(basePath, !enableFileIndex, if (partitionEncode) 1 else 3) // query the partition by filter val count1 = spark.read.format("hudi") .options(readOpts) .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .load(basePath) + .load(pathForReader) .filter("partition = '2016/03/15'") .count() assertEquals(countIn20160315, count1) @@ -862,10 +865,33 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport assertEquals(false, Metrics.isInitialized) } + @ParameterizedTest + @CsvSource(Array( + "true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO", + "true,false,SPARK", "true,true,SPARK", "false,true,SPARK", "false,false,SPARK" + )) + def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean, isMetadataEnabled: Boolean, recordType: HoodieRecordType): Unit = { + testPartitionPruning( + enableFileIndex = true, + partitionEncode = partitionEncode, + isMetadataEnabled = isMetadataEnabled, + recordType = recordType) + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testPartitionPruningWithoutFileIndex(partitionEncode: Boolean): Unit = { + testPartitionPruning( + enableFileIndex = false, + partitionEncode = partitionEncode, + isMetadataEnabled = HoodieMetadataConfig.ENABLE.defaultValue, + recordType = HoodieRecordType.SPARK) + } + @Test def testSchemaNotEqualData(): Unit = { - val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") - val schema1 = StructType(StructField("_row_key", StringType, nullable = true) :: StructField("name", StringType, nullable = true):: - StructField("timestamp", IntegerType, nullable = true):: StructField("age", StringType, nullable = true) :: StructField("partition", IntegerType, nullable = true)::Nil) + val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") + val schema1 = StructType(StructField("_row_key", StringType, nullable = true) :: StructField("name", StringType, nullable = true) :: + StructField("timestamp", IntegerType, nullable = true) :: StructField("age", StringType, nullable = true) :: StructField("partition", IntegerType, nullable = true) :: Nil) val records = Array("{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}", "{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}") val inputDF = spark.read.schema(schema1.toDDL).json(spark.sparkContext.parallelize(records, 2)) @@ -875,7 +901,7 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport .save(basePath) val recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath) - val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) + val resultSchema = new StructType(recordsReadDF.schema.filter(p => !p.name.startsWith("_hoodie")).toArray) assertEquals(resultSchema, schema1) } @@ -892,7 +918,7 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport .option(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key, enableDropPartitionColumns) .mode(SaveMode.Overwrite) .save(basePath) - val snapshotDF1 = spark.read.format("org.apache.hudi").options(readOpts).load(basePath) + val snapshotDF1 = spark.read.format("hudi").options(readOpts).load(basePath) assertEquals(snapshotDF1.count(), 100) assertEquals(3, snapshotDF1.select("partition").distinct().count()) } @@ -961,9 +987,14 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport } @ParameterizedTest - @CsvSource(Array("true, AVRO", "false, AVRO", "true, SPARK", "false, SPARK")) - def testPartitionColumnsProperHandling(useGlobbing: Boolean, recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @CsvSource(Array( + "true, true, AVRO", "true, false, AVRO", "true, true, SPARK", "true, false, SPARK", + "false, true, AVRO", "false, false, AVRO", "false, true, SPARK", "false, false, SPARK" + )) + def testPartitionColumnsProperHandling(enableFileIndex: Boolean, + useGlobbing: Boolean, + recordType: HoodieRecordType): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex) val _spark = spark import _spark.implicits._ @@ -990,20 +1021,16 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport // NOTE: We're testing here that both paths are appropriately handling // partition values, regardless of whether we're reading the table // t/h a globbed path or not - val path = if (useGlobbing) { - s"$basePath/*/*/*/*" - } else { - basePath - } + val pathForReader = getPathForReader(basePath, useGlobbing || !enableFileIndex, 3) // Case #1: Partition columns are read from the data file - val firstDF = spark.read.format("hudi").options(readOpts).load(path) + val firstDF = spark.read.format("hudi").options(readOpts).load(pathForReader) assert(firstDF.count() == 2) // data_date is the partition field. Persist to the parquet file using the origin values, and read it. // TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10 - val expectedValues = if (useGlobbing) { + val expectedValues = if (useGlobbing || !enableFileIndex) { Seq("2018-09-23", "2018-09-24") } else { Seq("2018/09/23", "2018/09/24") @@ -1019,11 +1046,11 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport // // NOTE: This case is only relevant when globbing is NOT used, since when globbing is used Spark // won't be able to infer partitioning properly - if (!useGlobbing) { + if (!useGlobbing && enableFileIndex) { val secondDF = spark.read.format("hudi") .options(readOpts) .option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, "true") - .load(path) + .load(pathForReader) assert(secondDF.count() == 2) @@ -1131,10 +1158,26 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport assertEquals(false, Metrics.isInitialized, "Metrics should be shutdown") } - def getWriterReaderOpts(recordType: HoodieRecordType, opt: Map[String, String] = commonOpts): (Map[String, String], Map[String, String]) = { + def getWriterReaderOpts(recordType: HoodieRecordType, + opt: Map[String, String] = commonOpts, + enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()): + (Map[String, String], Map[String, String]) = { + val fileIndexOpt: Map[String, String] = + Map(DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key -> enableFileIndex.toString) + recordType match { - case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts) - case _ => (opt, Map.empty[String, String]) + case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts ++ fileIndexOpt) + case _ => (opt, fileIndexOpt) + } + } + + def getPathForReader(basePath: String, useGlobbing: Boolean, partitionPathLevel: Int): String = { + if (useGlobbing) { + // When explicitly using globbing or not using HoodieFileIndex, we fall back to the old way + // of reading Hudi table with globbed path + basePath + "/*" * (partitionPathLevel + 1) + } else { + basePath } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 35b7792f0c36c..e6e850d621205 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -20,16 +20,17 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.toJavaOption -import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig} -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} import org.apache.hudi.client.SparkRDDWriteClient -import org.apache.hudi.common.model.{HoodieRecordPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig} +import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.util import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config @@ -38,7 +39,7 @@ import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} import org.apache.hudi.util.JFunction import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger, SparkDatasetMixin} import org.apache.log4j.LogManager -import org.apache.spark.sql.{HoodieInternalRowUtils, _} +import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.types.BooleanType @@ -46,9 +47,8 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, EnumSource} + import java.util.function.Consumer -import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType -import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConverters._ @@ -1047,9 +1047,10 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { * For MOR table, test all the three query modes. */ @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testPrunePartitionForTimestampBasedKeyGenerator(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @CsvSource(Array("true,AVRO", "true,SPARK", "false,AVRO", "false,SPARK")) + def testPrunePartitionForTimestampBasedKeyGenerator(enableFileIndex: Boolean, + recordType: HoodieRecordType): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex) val options = commonOpts ++ Map( "hoodie.compact.inline" -> "false", @@ -1091,9 +1092,10 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .save(basePath) val commit3Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp + val pathForROQuery = getPathForROQuery(basePath, !enableFileIndex, 3) // snapshot query val snapshotQueryRes = spark.read.format("hudi").options(readOpts).load(basePath) - assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit1Time'").count, 50) + assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit1Time'").count, 50) assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit2Time'").count, 40) assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit3Time'").count, 20) @@ -1104,12 +1106,15 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { val readOptimizedQueryRes = spark.read.format("hudi") .options(readOpts) .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - .load(basePath) + .load(pathForROQuery) // TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10 - //assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50) - //assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60) - assertEquals(readOptimizedQueryRes.where("partition = '2022/01/01'").count, 50) - assertEquals(readOptimizedQueryRes.where("partition = '2022/01/02'").count, 60) + if (enableFileIndex) { + assertEquals(readOptimizedQueryRes.where("partition = '2022/01/01'").count, 50) + assertEquals(readOptimizedQueryRes.where("partition = '2022/01/02'").count, 60) + } else { + assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50) + assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60) + } // incremental query val incrementalQueryRes = spark.read.format("hudi") @@ -1143,8 +1148,9 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { * The read-optimized query should read `fg1_dc1.parquet` only in this case. */ @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(recordType: HoodieRecordType): Unit = { + @CsvSource(Array("true,AVRO", "true,SPARK", "false,AVRO", "false,SPARK")) + def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(enableFileIndex: Boolean, + recordType: HoodieRecordType): Unit = { val (tableName, tablePath) = ("hoodie_mor_ro_read_test_table", s"${basePath}_mor_test_table") val precombineField = "col3" val recordKeyField = "key" @@ -1160,8 +1166,9 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { HoodieWriteConfig.TBL_NAME.key -> tableName, "hoodie.insert.shuffle.parallelism" -> "1", "hoodie.upsert.shuffle.parallelism" -> "1") + val pathForROQuery = getPathForROQuery(tablePath, !enableFileIndex, 0) - val (writeOpts, readOpts) = getWriterReaderOpts(recordType, options) + val (writeOpts, readOpts) = getWriterReaderOpts(recordType, options, enableFileIndex) // First batch with all inserts // Deltacommit1 (DC1, completed), writing file group 1 (fg1) @@ -1227,7 +1234,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .option( DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - .load(tablePath) + .load(pathForROQuery) // The base file in the first file slice, i.e., fg1_dc1.parquet, should be read only assertEquals(10, roDf.count()) @@ -1236,10 +1243,26 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { roDf.where(col(recordKeyField) === 0).select(dataField).collect()(0).getLong(0)) } - def getWriterReaderOpts(recordType: HoodieRecordType, opt: Map[String, String] = commonOpts): (Map[String, String], Map[String, String]) = { + def getWriterReaderOpts(recordType: HoodieRecordType, + opt: Map[String, String] = commonOpts, + enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()): + (Map[String, String], Map[String, String]) = { + val fileIndexOpt: Map[String, String] = + Map(DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key -> enableFileIndex.toString) + recordType match { - case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts) - case _ => (opt, Map.empty[String, String]) + case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts ++ fileIndexOpt) + case _ => (opt, fileIndexOpt) + } + } + + def getPathForROQuery(basePath: String, useGlobbing: Boolean, partitionPathLevel: Int): String = { + if (useGlobbing) { + // When explicitly using globbing or not using HoodieFileIndex, we fall back to the old way + // of reading Hudi table with globbed path + basePath + "/*" * (partitionPathLevel + 1) + } else { + basePath } } }