Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5296] Allow disable schema on read after enabling #7333

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
val internalSchemaOpt = if (!HoodieParamUtils.isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) {
None
} else {
Try {
Expand Down Expand Up @@ -639,15 +639,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

private def prunePartitionColumns(dataStructSchema: StructType): StructType =
StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))

private def isSchemaEvolutionEnabled = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
}

object HoodieBaseRelation extends SparkAdapterSupport {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi

import org.apache.spark.sql.SparkSession

/**
* Hoodie parameter utils.
*/
object HoodieParamUtils {

def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{CommitUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
Expand Down Expand Up @@ -66,7 +65,6 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.mutable
import scala.util.matching.Regex

object HoodieSparkSqlWriter {

Expand Down Expand Up @@ -151,37 +149,41 @@ object HoodieSparkSqlWriter {
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
// Create the table if not present
if (!tableExists) {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);

val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setDatabaseName(databaseName)
.setTableName(tblName)
.setBaseFileFormat(baseFileFormat)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
// but we are interested in what user has set, hence fetching from optParams.
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
.setKeyGeneratorClassProp(originKeyGeneratorClassName)
.set(timestampKeyGeneratorConfigs)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
}
var tableMetaClient : HoodieTableMetaClient = null
if (!tableExists) {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setDatabaseName(databaseName)
.setTableName(tblName)
.setBaseFileFormat(baseFileFormat)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
// but we are interested in what user has set, hence fetching from optParams.
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
.setKeyGeneratorClassProp(originKeyGeneratorClassName)
.set(timestampKeyGeneratorConfigs)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
} else {
tableMetaClient = HoodieTableMetaClient.builder
.setConf(sparkContext.hadoopConfiguration)
.setBasePath(path)
.build()
}

val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)

Expand All @@ -191,25 +193,23 @@ object HoodieSparkSqlWriter {
classOf[Schema]))

val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean

val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration)
val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration,
tableMetaClient)
// NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and
// Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that
// play crucial role in establishing compatibility b/w schemas
val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s => (s.getName, s.getNamespace))
.getOrElse(getAvroRecordNameAndNamespace(tblName))

val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace)
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext).orElse {
val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && schemaEvolutionEnabled) {
Some(AvroInternalSchemaConverter.convert(sourceSchema))
} else {
None
}
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext, hoodieConfig, tableMetaClient).orElse {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
Some(AvroInternalSchemaConverter.convert(sourceSchema))
} else {
None
}
}

// NOTE: Target writer's schema is deduced based on
Expand Down Expand Up @@ -249,7 +249,7 @@ object HoodieSparkSqlWriter {
}

// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext, hoodieConfig, tableMetaClient)
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
Expand Down Expand Up @@ -291,8 +291,6 @@ object HoodieSparkSqlWriter {
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime)
(writeStatuses, client)


case _ =>
// Here all other (than DELETE, DELETE_PARTITION) write operations are handled
//
Expand Down Expand Up @@ -548,18 +546,18 @@ object HoodieSparkSqlWriter {
* @param sparkContext instance of spark context.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = {
try {
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, config: HoodieConfig,
tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = {
if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
Option.empty[InternalSchema]
} else {
try {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata
if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None
} else {
None
} catch {
case _: Exception => None
}
} catch {
case _: Exception => None
}
}

Expand All @@ -570,20 +568,11 @@ object HoodieSparkSqlWriter {
private def getLatestTableSchema(spark: SparkSession,
tableBasePath: Path,
tableId: TableIdentifier,
hadoopConf: Configuration): Option[Schema] = {
val fs = tableBasePath.getFileSystem(hadoopConf)
hadoopConf: Configuration,
tableMetaClient: HoodieTableMetaClient): Option[Schema] = {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val latestTableSchemaFromCommitMetadata =
if (FSUtils.isTableExists(tableBasePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder
.setConf(hadoopConf)
.setBasePath(tableBasePath.toString)
.build()
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
} else {
None
}

toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
latestTableSchemaFromCommitMetadata.orElse {
getCatalogTable(spark, tableId).map { catalogTable =>
val (structName, namespace) = getAvroRecordNameAndNamespace(tableId.table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
val (usedSchema, internalSchema) = {
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) {
val iSchema : InternalSchema = if (!HoodieParamUtils.isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) {
InternalSchema.getEmptyInternalSchema
} else if (useEndInstantSchema && !commitsToReturn.isEmpty) {
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
} else {
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.TableIdentifier

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.{arrays_zip, col}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
Expand Down Expand Up @@ -171,6 +171,58 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}

test("Test Enable and Disable Schema on read") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
// Create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// Insert data to the new table.
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)

// add column
spark.sql(s"alter table $tableName add columns(new_col string)")
val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName))
assertResult(Seq("id", "name", "price", "ts", "new_col")) {
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
}
checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
Seq(1, "a1", 10.0, 1000, null)
)
// disable schema on read.
spark.sql("set hoodie.schema.on.read.enable=false")
spark.sql(s"refresh table $tableName")
// Insert data to the new table.
spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')")
// write should succeed. and subsequent read should succeed as well.
checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
Seq(1, "a1", 10.0, 1000, null),
Seq(2, "a2", 12.0, 2000, "e0")
)
}
}
}

test("Test Partition Table alter ") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down