From b0a6a396451303e928f54bf98ae6f613a3c3d8e8 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 29 Nov 2022 15:08:23 -0800 Subject: [PATCH 1/4] Fixing disabling of schema on read --- .../apache/hudi/HoodieSparkSqlWriter.scala | 34 +++++++++++-------- .../org/apache/hudi/IncrementalRelation.scala | 17 ++++++++-- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b538d31377c7..cb95c805db99 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -35,7 +35,6 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model._ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.{CommitUtils, StringUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} @@ -66,7 +65,6 @@ import org.apache.spark.{SPARK_VERSION, SparkContext} import scala.collection.JavaConversions._ import scala.collection.JavaConverters.setAsJavaSetConverter import scala.collection.mutable -import scala.util.matching.Regex object HoodieSparkSqlWriter { @@ -200,16 +198,20 @@ object HoodieSparkSqlWriter { .getOrElse(getAvroRecordNameAndNamespace(tblName)) val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace) - val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext).orElse { - val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean - // In case we need to reconcile the schema and schema evolution is enabled, - // we will force-apply schema evolution to the writer's schema - if (shouldReconcileSchema && schemaEvolutionEnabled) { - Some(AvroInternalSchemaConverter.convert(sourceSchema)) - } else { - None + val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + val internalSchemaOpt = if (schemaEvolutionEnabled) { + getLatestTableInternalSchema(fs, basePath, sparkContext).orElse { + // In case we need to reconcile the schema and schema evolution is enabled, + // we will force-apply schema evolution to the writer's schema + if (shouldReconcileSchema) { + Some(AvroInternalSchemaConverter.convert(sourceSchema)) + } else { + None + } } + } else { + None } // NOTE: Target writer's schema is deduced based on @@ -249,7 +251,13 @@ object HoodieSparkSqlWriter { } // Create a HoodieWriteClient & issue the delete. - val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) + // Create a HoodieWriteClient & issue the delete. + val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean + val internalSchemaOpt : Option[org.apache.hudi.internal.schema.InternalSchema]= if (schemaEvolutionEnabled) { + getLatestTableInternalSchema(fs, basePath, sparkContext) + } else { + None + } val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) @@ -291,8 +299,6 @@ object HoodieSparkSqlWriter { client.startCommitWithTime(instantTime, commitActionType) val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime) (writeStatuses, client) - - case _ => // Here all other (than DELETE, DELETE_PARTITION) write operations are handled // diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 4c763b054ad3..7a8f1b4836e7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -90,10 +90,14 @@ class IncrementalRelation(val sqlContext: SQLContext, val (usedSchema, internalSchema) = { log.info("Inferring schema..") val schemaResolver = new TableSchemaResolver(metaClient) - val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) { - InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable) + val iSchema = if (isSchemaEvolutionEnabledOnRead) { + if (useEndInstantSchema && !commitsToReturn.isEmpty) { + InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable) + } else { + schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null) + } } else { - schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null) + Option.empty.asInstanceOf[InternalSchema] } val tableSchema = if (useEndInstantSchema && iSchema.isEmptySchema) { @@ -121,6 +125,13 @@ class IncrementalRelation(val sqlContext: SQLContext, override def schema: StructType = usedSchema + private def isSchemaEvolutionEnabledOnRead = { + optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || + sqlContext.sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + } + override def buildScan(): RDD[Row] = { if (usedSchema == StructType(Nil)) { // if first commit in a table is an empty commit without schema, return empty RDD here From 858997f2402215c86b4740ab06fe2056b3ad1fcc Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 29 Nov 2022 15:48:05 -0800 Subject: [PATCH 2/4] Adding support to disable schema on read if need be --- .../spark/sql/hudi/TestAlterTable.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala index ef6425182959..1b2859e9c666 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.sql.catalyst.TableIdentifier @@ -168,6 +169,56 @@ class TestAlterTable extends HoodieSparkSqlTestBase { } } + test("Test Enable and Disable Schema on read") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // Insert data to the new table. + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + // add column + spark.sql(s"alter table $tableName add columns(new_col string)") + val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName)) + assertResult(Seq("id", "name", "price", "ts", "new_col")) { + HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name) + } + checkAnswer(s"select id, name, price, ts, new_col from $tableName")( + Seq(1, "a1", 10.0, 1000, null) + ) + // disable schema on read. + spark.sessionState.conf.setConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false") + spark.sql(s"refresh table $tableName") + // Insert data to the new table. + spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')") + // write should succeed. and subsequent read should succeed as well. + checkAnswer(s"select id, name, price, ts, new_col from $tableName")( + Seq(1, "a1", 10.0, 1000, null), + Seq(2, "a2", 12.0, 2000, "e0") + ) + } + } + + test("Test Alter Rename Table") { withTempDir { tmp => Seq("cow", "mor").foreach { tableType => From c7dae19174b0bb8112990887b39ec5e487732591 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 1 Dec 2022 14:39:22 -0800 Subject: [PATCH 3/4] addressing feedback --- .../org/apache/hudi/HoodieBaseRelation.scala | 11 +- .../org/apache/hudi/HoodieParamUtils.scala | 38 ++++++ .../apache/hudi/HoodieSparkSqlWriter.scala | 119 ++++++++---------- .../org/apache/hudi/IncrementalRelation.scala | 19 +-- .../spark/sql/hudi/TestAlterTable.scala | 50 -------- .../apache/spark/sql/hudi/TestSpark3DDL.scala | 54 +++++++- 6 files changed, 148 insertions(+), 143 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieParamUtils.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index afc0781eb1b0..ecc387b06aa9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -140,7 +140,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = { val schemaResolver = new TableSchemaResolver(metaClient) - val internalSchemaOpt = if (!isSchemaEvolutionEnabled) { + val internalSchemaOpt = if (!HoodieParamUtils.isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) { None } else { Try { @@ -639,15 +639,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, private def prunePartitionColumns(dataStructSchema: StructType): StructType = StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) - - private def isSchemaEvolutionEnabled = { - // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as - // t/h Spark Session configuration (for ex, for Spark SQL) - optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || - sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean - } } object HoodieBaseRelation extends SparkAdapterSupport { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieParamUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieParamUtils.scala new file mode 100644 index 000000000000..a462d9d8ea71 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieParamUtils.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.spark.sql.SparkSession + +/** + * Hoodie parameter utils. + */ +object HoodieParamUtils { + + def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = { + // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as + // t/h Spark Session configuration (for ex, for Spark SQL) + optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || + sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + } + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index cb95c805db99..29b537a8276b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -149,37 +149,41 @@ object HoodieSparkSqlWriter { handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs) val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters)) // Create the table if not present - if (!tableExists) { - val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) - val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) - val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) - val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); - - val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() - .setTableType(tableType) - .setDatabaseName(databaseName) - .setTableName(tblName) - .setBaseFileFormat(baseFileFormat) - .setArchiveLogFolder(archiveLogFolder) - .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME)) - // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, - // but we are interested in what user has set, hence fetching from optParams. - .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) - .setPartitionFields(partitionColumns) - .setPopulateMetaFields(populateMetaFields) - .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) - .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) - .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) - .setKeyGeneratorClassProp(originKeyGeneratorClassName) - .set(timestampKeyGeneratorConfigs) - .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) - .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) - .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile) - .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) - .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) - .initTable(sparkContext.hadoopConfiguration, path) - tableConfig = tableMetaClient.getTableConfig - } + var tableMetaClient : HoodieTableMetaClient = null + if (!tableExists) { + val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) + val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) + val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) + val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() + .setTableType(tableType) + .setDatabaseName(databaseName) + .setTableName(tblName) + .setBaseFileFormat(baseFileFormat) + .setArchiveLogFolder(archiveLogFolder) + .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME)) + // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, + // but we are interested in what user has set, hence fetching from optParams. + .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) + .setPartitionFields(partitionColumns) + .setPopulateMetaFields(populateMetaFields) + .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) + .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) + .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) + .setKeyGeneratorClassProp(originKeyGeneratorClassName) + .set(timestampKeyGeneratorConfigs) + .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) + .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) + .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile) + .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) + .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) + .initTable(sparkContext.hadoopConfiguration, path) + tableConfig = tableMetaClient.getTableConfig + } else { + tableMetaClient = HoodieTableMetaClient.builder + .setConf(sparkContext.hadoopConfiguration) + .setBasePath(path) + .build() + } val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType) @@ -189,8 +193,8 @@ object HoodieSparkSqlWriter { classOf[Schema])) val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean - - val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration) + val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration, + tableMetaClient) // NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and // Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that // play crucial role in establishing compatibility b/w schemas @@ -198,10 +202,7 @@ object HoodieSparkSqlWriter { .getOrElse(getAvroRecordNameAndNamespace(tblName)) val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace) - val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean - val internalSchemaOpt = if (schemaEvolutionEnabled) { - getLatestTableInternalSchema(fs, basePath, sparkContext).orElse { + val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext, hoodieConfig, tableMetaClient).orElse { // In case we need to reconcile the schema and schema evolution is enabled, // we will force-apply schema evolution to the writer's schema if (shouldReconcileSchema) { @@ -209,9 +210,6 @@ object HoodieSparkSqlWriter { } else { None } - } - } else { - None } // NOTE: Target writer's schema is deduced based on @@ -251,13 +249,7 @@ object HoodieSparkSqlWriter { } // Create a HoodieWriteClient & issue the delete. - // Create a HoodieWriteClient & issue the delete. - val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean - val internalSchemaOpt : Option[org.apache.hudi.internal.schema.InternalSchema]= if (schemaEvolutionEnabled) { - getLatestTableInternalSchema(fs, basePath, sparkContext) - } else { - None - } + val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext, hoodieConfig, tableMetaClient) val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) @@ -554,18 +546,18 @@ object HoodieSparkSqlWriter { * @param sparkContext instance of spark context. * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. */ - def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = { - try { - if (FSUtils.isTableExists(basePath.toString, fs)) { - val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build() + def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, config: HoodieConfig, + tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { + if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { + Option.empty[InternalSchema] + } else { + try { val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None - } else { - None + } catch { + case _: Exception => None } - } catch { - case _: Exception => None } } @@ -576,20 +568,11 @@ object HoodieSparkSqlWriter { private def getLatestTableSchema(spark: SparkSession, tableBasePath: Path, tableId: TableIdentifier, - hadoopConf: Configuration): Option[Schema] = { - val fs = tableBasePath.getFileSystem(hadoopConf) + hadoopConf: Configuration, + tableMetaClient: HoodieTableMetaClient): Option[Schema] = { + val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) val latestTableSchemaFromCommitMetadata = - if (FSUtils.isTableExists(tableBasePath.toString, fs)) { - val tableMetaClient = HoodieTableMetaClient.builder - .setConf(hadoopConf) - .setBasePath(tableBasePath.toString) - .build() - val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) - } else { - None - } - + toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) latestTableSchemaFromCommitMetadata.orElse { getCatalogTable(spark, tableId).map { catalogTable => val (structName, namespace) = getAvroRecordNameAndNamespace(tableId.table) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 7a8f1b4836e7..bc8952ad9848 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -90,14 +90,12 @@ class IncrementalRelation(val sqlContext: SQLContext, val (usedSchema, internalSchema) = { log.info("Inferring schema..") val schemaResolver = new TableSchemaResolver(metaClient) - val iSchema = if (isSchemaEvolutionEnabledOnRead) { - if (useEndInstantSchema && !commitsToReturn.isEmpty) { - InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable) - } else { - schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null) - } + val iSchema : InternalSchema = if (!HoodieParamUtils.isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) { + InternalSchema.getEmptyInternalSchema + } else if (useEndInstantSchema && !commitsToReturn.isEmpty) { + InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable) } else { - Option.empty.asInstanceOf[InternalSchema] + schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null) } val tableSchema = if (useEndInstantSchema && iSchema.isEmptySchema) { @@ -125,13 +123,6 @@ class IncrementalRelation(val sqlContext: SQLContext, override def schema: StructType = usedSchema - private def isSchemaEvolutionEnabledOnRead = { - optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || - sqlContext.sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean - } - override def buildScan(): RDD[Row] = { if (usedSchema == StructType(Nil)) { // if first commit in a table is an empty commit without schema, return empty RDD here diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala index 1b2859e9c666..4e2821a029e0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala @@ -169,56 +169,6 @@ class TestAlterTable extends HoodieSparkSqlTestBase { } } - test("Test Enable and Disable Schema on read") { - withTempDir { tmp => - val tableName = generateTableName - val tablePath = s"${tmp.getCanonicalPath}/$tableName" - // Create table - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | location '$tablePath' - | tblproperties ( - | type = 'cow', - | primaryKey = 'id', - | preCombineField = 'ts' - | ) - """.stripMargin) - - // Insert data to the new table. - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 10.0, 1000) - ) - - // add column - spark.sql(s"alter table $tableName add columns(new_col string)") - val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName)) - assertResult(Seq("id", "name", "price", "ts", "new_col")) { - HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name) - } - checkAnswer(s"select id, name, price, ts, new_col from $tableName")( - Seq(1, "a1", 10.0, 1000, null) - ) - // disable schema on read. - spark.sessionState.conf.setConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false") - spark.sql(s"refresh table $tableName") - // Insert data to the new table. - spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')") - // write should succeed. and subsequent read should succeed as well. - checkAnswer(s"select id, name, price, ts, new_col from $tableName")( - Seq(1, "a1", 10.0, 1000, null), - Seq(2, "a2", 12.0, 2000, "e0") - ) - } - } - - test("Test Alter Rename Table") { withTempDir { tmp => Seq("cow", "mor").foreach { tableType => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 9d955cb83103..056bca37fce4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.{arrays_zip, col} import org.apache.spark.sql.{Row, SaveMode, SparkSession} @@ -171,6 +171,58 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } + test("Test Enable and Disable Schema on read") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + if (HoodieSparkUtils.gteqSpark3_1) { + spark.sql("set hoodie.schema.on.read.enable=true") + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // Insert data to the new table. + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + // add column + spark.sql(s"alter table $tableName add columns(new_col string)") + val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName)) + assertResult(Seq("id", "name", "price", "ts", "new_col")) { + HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name) + } + checkAnswer(s"select id, name, price, ts, new_col from $tableName")( + Seq(1, "a1", 10.0, 1000, null) + ) + // disable schema on read. + spark.sql("set hoodie.schema.on.read.enable=false") + spark.sql(s"refresh table $tableName") + // Insert data to the new table. + spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')") + // write should succeed. and subsequent read should succeed as well. + checkAnswer(s"select id, name, price, ts, new_col from $tableName")( + Seq(1, "a1", 10.0, 1000, null), + Seq(2, "a2", 12.0, 2000, "e0") + ) + } + } + } + test("Test Partition Table alter ") { withTempDir { tmp => Seq("cow", "mor").foreach { tableType => From 682139726e9a4f647815f79c9b15b31e36def72b Mon Sep 17 00:00:00 2001 From: sivabalan Date: Fri, 9 Dec 2022 11:51:15 -0800 Subject: [PATCH 4/4] Fixing test failures --- .../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 29b537a8276b..f0ce0dbb056d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -205,7 +205,7 @@ object HoodieSparkSqlWriter { val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext, hoodieConfig, tableMetaClient).orElse { // In case we need to reconcile the schema and schema evolution is enabled, // we will force-apply schema evolution to the writer's schema - if (shouldReconcileSchema) { + if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { Some(AvroInternalSchemaConverter.convert(sourceSchema)) } else { None