Skip to content

Commit

Permalink
[HUDI-5104] Add feature flag to disable HoodieFileIndex and fall back…
Browse files Browse the repository at this point in the history
… to HoodieROTablePathFilter (apache#7088)

* Add the feature flag back to disable HoodieFileIndex and fall back to HoodieROTablePathFilter

* Turn off hoodie.file.index.enable by default to test CI

* Add tests for Spark datasource with the fallback to HoodieROTablePathFilter
  • Loading branch information
yihua authored and fengjian committed Apr 5, 2023
1 parent 2bd7514 commit f087080
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX
import org.apache.hudi.HoodieBaseRelation.projectReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
Expand Down Expand Up @@ -134,7 +135,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
if (globPaths.isEmpty) {
val enableFileIndex = HoodieSparkConfUtils.getBooleanConfigValue(
optParams, sparkSession.sessionState.conf, ENABLE_HOODIE_FILE_INDEX.key, ENABLE_HOODIE_FILE_INDEX.defaultValue)
if (enableFileIndex && globPaths.isEmpty) {
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
// from the data file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,11 @@ case class HoodieFileIndex(spark: SparkSession,

override def sizeInBytes: Long = getTotalCachedFilesSize

private def isDataSkippingEnabled: Boolean = HoodieFileIndex.getBooleanConfigValue(options, spark.sessionState.conf, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"false")
private def isDataSkippingEnabled: Boolean = HoodieSparkConfUtils.getBooleanConfigValue(
options, spark.sessionState.conf, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), false)

private def isMetadataTableEnabled: Boolean = metadataConfig.enabled()

private def isColumnStatsIndexEnabled: Boolean = metadataConfig.isColumnStatsIndexEnabled

private def validateConfig(): Unit = {
Expand All @@ -268,10 +269,6 @@ case class HoodieFileIndex(spark: SparkSession,

object HoodieFileIndex extends Logging {

def getBooleanConfigValue(options: Map[String, String], sqlConf: SQLConf, configKey: String, defaultValue: String) : Boolean = {
options.getOrElse(configKey, sqlConf.getConfString(configKey, defaultValue)).toBoolean
}

object DataSkippingFailureMode extends Enumeration {
val configName = "hoodie.fileIndex.dataSkippingFailureMode"

Expand Down Expand Up @@ -305,7 +302,8 @@ object HoodieFileIndex extends Logging {
// To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users
// would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing.
val isMetadataFilesPartitionAvailable = isFilesPartitionAvailable(metaClient) &&
getBooleanConfigValue(options, sqlConf, HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString)
HoodieSparkConfUtils.getBooleanConfigValue(
options, sqlConf, HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)
properties.putAll(options.filter(p => p._2 != null).asJava)
properties.setProperty(HoodieMetadataConfig.ENABLE.key(), String.valueOf(isMetadataFilesPartitionAvailable))
properties
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi

import org.apache.spark.sql.internal.SQLConf

/**
* Util methods for Hudi Spark and SQL configurations
*/
object HoodieSparkConfUtils {
/**
* Gets boolean config value from config properties and SQL conf.
*
* @param options Config properties.
* @param sqlConf SQL conf.
* @param configKey Config key to fetch.
* @param defaultValue Default value to return if not configured.
* @return The config value.
*/
def getBooleanConfigValue(options: Map[String, String],
sqlConf: SQLConf,
configKey: String,
defaultValue: Boolean): Boolean = {
options.getOrElse(configKey, sqlConf.getConfString(configKey, defaultValue.toString)).toBoolean
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource}
import java.util.Properties

import java.util.Properties
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.util.Random
Expand All @@ -67,6 +67,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase with ScalaAssertionSuppor
)

var queryOpts = Map(
DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key -> "true",
DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
)

Expand Down Expand Up @@ -223,7 +224,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase with ScalaAssertionSuppor
)
val prunedPartitions = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
.map(_.values.toSeq(Seq(StringType))
.mkString(","))
.mkString(","))
.toList
.sorted

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs}
import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
Expand All @@ -30,29 +31,27 @@ import org.apache.hudi.common.util
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.metrics.HoodieMetricsConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.exception.{HoodieException, HoodieUpsertException, SchemaCompatibilityException}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger, QuickstartUtils, ScalaAssertionSupport}
import org.apache.spark.sql.{HoodieInternalRowUtils, _}
import org.apache.spark.sql.functions.{col, concat, lit, udf, when}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}

import java.sql.{Date, Timestamp}
import java.util.function.Consumer
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -170,9 +169,10 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
* For COW table, test the snapshot query mode and incremental query mode.
*/
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
def testPrunePartitionForTimestampBasedKeyGenerator(recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
@CsvSource(Array("true,AVRO", "true,SPARK", "false,AVRO", "false,SPARK"))
def testPrunePartitionForTimestampBasedKeyGenerator(enableFileIndex: Boolean,
recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex)

val options = commonOpts ++ Map(
"hoodie.compact.inline" -> "false",
Expand Down Expand Up @@ -207,12 +207,16 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
val commit2Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp

// snapshot query
val snapshotQueryRes = spark.read.format("hudi").options(readOpts).load(basePath)
val pathForReader = getPathForReader(basePath, !enableFileIndex, 3)
val snapshotQueryRes = spark.read.format("hudi").options(readOpts).load(pathForReader)
// TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10
//assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20)
//assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30)
assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 20)
assertEquals(snapshotQueryRes.where("partition = '2022/01/02'").count, 30)
if (enableFileIndex) {
assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 20)
assertEquals(snapshotQueryRes.where("partition = '2022/01/02'").count, 30)
} else {
assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20)
assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30)
}

// incremental query
val incrementalQueryRes = spark.read.format("hudi")
Expand Down Expand Up @@ -286,7 +290,7 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
assertEquals(100, snapshotDF1.count())

val records2 = deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2))
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))

inputDF2.write.format("org.apache.hudi")
.options(writeOpts)
Expand Down Expand Up @@ -802,13 +806,11 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0)
}

@ParameterizedTest
@CsvSource(Array(
"true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO",
"true,false,SPARK", "true,true,SPARK", "false,true,SPARK", "false,false,SPARK"
))
def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean, isMetadataEnabled: Boolean, recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
private def testPartitionPruning(enableFileIndex: Boolean,
partitionEncode: Boolean,
isMetadataEnabled: Boolean,
recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex)

val N = 20
// Test query with partition prune if URL_ENCODE_PARTITIONING has enable
Expand All @@ -824,11 +826,12 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)

val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15")
val pathForReader = getPathForReader(basePath, !enableFileIndex, if (partitionEncode) 1 else 3)
// query the partition by filter
val count1 = spark.read.format("hudi")
.options(readOpts)
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.load(basePath)
.load(pathForReader)
.filter("partition = '2016/03/15'")
.count()
assertEquals(countIn20160315, count1)
Expand Down Expand Up @@ -862,10 +865,33 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
assertEquals(false, Metrics.isInitialized)
}

@ParameterizedTest
@CsvSource(Array(
"true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO",
"true,false,SPARK", "true,true,SPARK", "false,true,SPARK", "false,false,SPARK"
))
def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean, isMetadataEnabled: Boolean, recordType: HoodieRecordType): Unit = {
testPartitionPruning(
enableFileIndex = true,
partitionEncode = partitionEncode,
isMetadataEnabled = isMetadataEnabled,
recordType = recordType)
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testPartitionPruningWithoutFileIndex(partitionEncode: Boolean): Unit = {
testPartitionPruning(
enableFileIndex = false,
partitionEncode = partitionEncode,
isMetadataEnabled = HoodieMetadataConfig.ENABLE.defaultValue,
recordType = HoodieRecordType.SPARK)
}

@Test def testSchemaNotEqualData(): Unit = {
val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true")
val schema1 = StructType(StructField("_row_key", StringType, nullable = true) :: StructField("name", StringType, nullable = true)::
StructField("timestamp", IntegerType, nullable = true):: StructField("age", StringType, nullable = true) :: StructField("partition", IntegerType, nullable = true)::Nil)
val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true")
val schema1 = StructType(StructField("_row_key", StringType, nullable = true) :: StructField("name", StringType, nullable = true) ::
StructField("timestamp", IntegerType, nullable = true) :: StructField("age", StringType, nullable = true) :: StructField("partition", IntegerType, nullable = true) :: Nil)
val records = Array("{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}",
"{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}")
val inputDF = spark.read.schema(schema1.toDDL).json(spark.sparkContext.parallelize(records, 2))
Expand All @@ -875,7 +901,7 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
.save(basePath)
val recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath)
val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray)
val resultSchema = new StructType(recordsReadDF.schema.filter(p => !p.name.startsWith("_hoodie")).toArray)
assertEquals(resultSchema, schema1)
}

Expand All @@ -892,7 +918,7 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
.option(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key, enableDropPartitionColumns)
.mode(SaveMode.Overwrite)
.save(basePath)
val snapshotDF1 = spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
val snapshotDF1 = spark.read.format("hudi").options(readOpts).load(basePath)
assertEquals(snapshotDF1.count(), 100)
assertEquals(3, snapshotDF1.select("partition").distinct().count())
}
Expand Down Expand Up @@ -961,9 +987,14 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
}

@ParameterizedTest
@CsvSource(Array("true, AVRO", "false, AVRO", "true, SPARK", "false, SPARK"))
def testPartitionColumnsProperHandling(useGlobbing: Boolean, recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
@CsvSource(Array(
"true, true, AVRO", "true, false, AVRO", "true, true, SPARK", "true, false, SPARK",
"false, true, AVRO", "false, false, AVRO", "false, true, SPARK", "false, false, SPARK"
))
def testPartitionColumnsProperHandling(enableFileIndex: Boolean,
useGlobbing: Boolean,
recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType, enableFileIndex = enableFileIndex)

val _spark = spark
import _spark.implicits._
Expand All @@ -990,20 +1021,16 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
// NOTE: We're testing here that both paths are appropriately handling
// partition values, regardless of whether we're reading the table
// t/h a globbed path or not
val path = if (useGlobbing) {
s"$basePath/*/*/*/*"
} else {
basePath
}
val pathForReader = getPathForReader(basePath, useGlobbing || !enableFileIndex, 3)

// Case #1: Partition columns are read from the data file
val firstDF = spark.read.format("hudi").options(readOpts).load(path)
val firstDF = spark.read.format("hudi").options(readOpts).load(pathForReader)

assert(firstDF.count() == 2)

// data_date is the partition field. Persist to the parquet file using the origin values, and read it.
// TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10
val expectedValues = if (useGlobbing) {
val expectedValues = if (useGlobbing || !enableFileIndex) {
Seq("2018-09-23", "2018-09-24")
} else {
Seq("2018/09/23", "2018/09/24")
Expand All @@ -1019,11 +1046,11 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
//
// NOTE: This case is only relevant when globbing is NOT used, since when globbing is used Spark
// won't be able to infer partitioning properly
if (!useGlobbing) {
if (!useGlobbing && enableFileIndex) {
val secondDF = spark.read.format("hudi")
.options(readOpts)
.option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, "true")
.load(path)
.load(pathForReader)

assert(secondDF.count() == 2)

Expand Down Expand Up @@ -1131,10 +1158,26 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
assertEquals(false, Metrics.isInitialized, "Metrics should be shutdown")
}

def getWriterReaderOpts(recordType: HoodieRecordType, opt: Map[String, String] = commonOpts): (Map[String, String], Map[String, String]) = {
def getWriterReaderOpts(recordType: HoodieRecordType,
opt: Map[String, String] = commonOpts,
enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):
(Map[String, String], Map[String, String]) = {
val fileIndexOpt: Map[String, String] =
Map(DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key -> enableFileIndex.toString)

recordType match {
case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts)
case _ => (opt, Map.empty[String, String])
case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts ++ fileIndexOpt)
case _ => (opt, fileIndexOpt)
}
}

def getPathForReader(basePath: String, useGlobbing: Boolean, partitionPathLevel: Int): String = {
if (useGlobbing) {
// When explicitly using globbing or not using HoodieFileIndex, we fall back to the old way
// of reading Hudi table with globbed path
basePath + "/*" * (partitionPathLevel + 1)
} else {
basePath
}
}
}
Expand Down
Loading

0 comments on commit f087080

Please sign in to comment.