diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index afc0781eb1b0..ecc387b06aa9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -140,7 +140,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = { val schemaResolver = new TableSchemaResolver(metaClient) - val internalSchemaOpt = if (!isSchemaEvolutionEnabled) { + val internalSchemaOpt = if (!HoodieParamUtils.isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) { None } else { Try { @@ -639,15 +639,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, private def prunePartitionColumns(dataStructSchema: StructType): StructType = StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) - - private def isSchemaEvolutionEnabled = { - // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as - // t/h Spark Session configuration (for ex, for Spark SQL) - optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || - sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean - } } object HoodieBaseRelation extends SparkAdapterSupport { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieParamUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieParamUtils.scala new file mode 100644 index 000000000000..a462d9d8ea71 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieParamUtils.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.spark.sql.SparkSession + +/** + * Hoodie parameter utils. + */ +object HoodieParamUtils { + + def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = { + // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as + // t/h Spark Session configuration (for ex, for Spark SQL) + optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || + sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + } + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b538d31377c7..f0ce0dbb056d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -35,7 +35,6 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model._ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.{CommitUtils, StringUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} @@ -66,7 +65,6 @@ import org.apache.spark.{SPARK_VERSION, SparkContext} import scala.collection.JavaConversions._ import scala.collection.JavaConverters.setAsJavaSetConverter import scala.collection.mutable -import scala.util.matching.Regex object HoodieSparkSqlWriter { @@ -151,37 +149,41 @@ object HoodieSparkSqlWriter { handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs) val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters)) // Create the table if not present - if (!tableExists) { - val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) - val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) - val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) - val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); - - val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() - .setTableType(tableType) - .setDatabaseName(databaseName) - .setTableName(tblName) - .setBaseFileFormat(baseFileFormat) - .setArchiveLogFolder(archiveLogFolder) - .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME)) - // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, - // but we are interested in what user has set, hence fetching from optParams. - .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) - .setPartitionFields(partitionColumns) - .setPopulateMetaFields(populateMetaFields) - .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) - .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) - .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) - .setKeyGeneratorClassProp(originKeyGeneratorClassName) - .set(timestampKeyGeneratorConfigs) - .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) - .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) - .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile) - .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) - .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) - .initTable(sparkContext.hadoopConfiguration, path) - tableConfig = tableMetaClient.getTableConfig - } + var tableMetaClient : HoodieTableMetaClient = null + if (!tableExists) { + val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) + val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) + val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) + val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() + .setTableType(tableType) + .setDatabaseName(databaseName) + .setTableName(tblName) + .setBaseFileFormat(baseFileFormat) + .setArchiveLogFolder(archiveLogFolder) + .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME)) + // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, + // but we are interested in what user has set, hence fetching from optParams. + .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) + .setPartitionFields(partitionColumns) + .setPopulateMetaFields(populateMetaFields) + .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) + .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) + .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) + .setKeyGeneratorClassProp(originKeyGeneratorClassName) + .set(timestampKeyGeneratorConfigs) + .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) + .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) + .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile) + .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) + .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) + .initTable(sparkContext.hadoopConfiguration, path) + tableConfig = tableMetaClient.getTableConfig + } else { + tableMetaClient = HoodieTableMetaClient.builder + .setConf(sparkContext.hadoopConfiguration) + .setBasePath(path) + .build() + } val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType) @@ -191,8 +193,8 @@ object HoodieSparkSqlWriter { classOf[Schema])) val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean - - val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration) + val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration, + tableMetaClient) // NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and // Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that // play crucial role in establishing compatibility b/w schemas @@ -200,16 +202,14 @@ object HoodieSparkSqlWriter { .getOrElse(getAvroRecordNameAndNamespace(tblName)) val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace) - val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext).orElse { - val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean - // In case we need to reconcile the schema and schema evolution is enabled, - // we will force-apply schema evolution to the writer's schema - if (shouldReconcileSchema && schemaEvolutionEnabled) { - Some(AvroInternalSchemaConverter.convert(sourceSchema)) - } else { - None - } + val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext, hoodieConfig, tableMetaClient).orElse { + // In case we need to reconcile the schema and schema evolution is enabled, + // we will force-apply schema evolution to the writer's schema + if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { + Some(AvroInternalSchemaConverter.convert(sourceSchema)) + } else { + None + } } // NOTE: Target writer's schema is deduced based on @@ -249,7 +249,7 @@ object HoodieSparkSqlWriter { } // Create a HoodieWriteClient & issue the delete. - val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) + val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext, hoodieConfig, tableMetaClient) val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) @@ -291,8 +291,6 @@ object HoodieSparkSqlWriter { client.startCommitWithTime(instantTime, commitActionType) val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime) (writeStatuses, client) - - case _ => // Here all other (than DELETE, DELETE_PARTITION) write operations are handled // @@ -548,18 +546,18 @@ object HoodieSparkSqlWriter { * @param sparkContext instance of spark context. * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. */ - def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = { - try { - if (FSUtils.isTableExists(basePath.toString, fs)) { - val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build() + def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, config: HoodieConfig, + tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { + if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { + Option.empty[InternalSchema] + } else { + try { val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None - } else { - None + } catch { + case _: Exception => None } - } catch { - case _: Exception => None } } @@ -570,20 +568,11 @@ object HoodieSparkSqlWriter { private def getLatestTableSchema(spark: SparkSession, tableBasePath: Path, tableId: TableIdentifier, - hadoopConf: Configuration): Option[Schema] = { - val fs = tableBasePath.getFileSystem(hadoopConf) + hadoopConf: Configuration, + tableMetaClient: HoodieTableMetaClient): Option[Schema] = { + val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) val latestTableSchemaFromCommitMetadata = - if (FSUtils.isTableExists(tableBasePath.toString, fs)) { - val tableMetaClient = HoodieTableMetaClient.builder - .setConf(hadoopConf) - .setBasePath(tableBasePath.toString) - .build() - val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) - } else { - None - } - + toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) latestTableSchemaFromCommitMetadata.orElse { getCatalogTable(spark, tableId).map { catalogTable => val (structName, namespace) = getAvroRecordNameAndNamespace(tableId.table) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 4c763b054ad3..bc8952ad9848 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -90,7 +90,9 @@ class IncrementalRelation(val sqlContext: SQLContext, val (usedSchema, internalSchema) = { log.info("Inferring schema..") val schemaResolver = new TableSchemaResolver(metaClient) - val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) { + val iSchema : InternalSchema = if (!HoodieParamUtils.isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) { + InternalSchema.getEmptyInternalSchema + } else if (useEndInstantSchema && !commitsToReturn.isEmpty) { InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable) } else { schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala index ef6425182959..4e2821a029e0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.sql.catalyst.TableIdentifier diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 9d955cb83103..056bca37fce4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.{arrays_zip, col} import org.apache.spark.sql.{Row, SaveMode, SparkSession} @@ -171,6 +171,58 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } + test("Test Enable and Disable Schema on read") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + if (HoodieSparkUtils.gteqSpark3_1) { + spark.sql("set hoodie.schema.on.read.enable=true") + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // Insert data to the new table. + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + // add column + spark.sql(s"alter table $tableName add columns(new_col string)") + val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName)) + assertResult(Seq("id", "name", "price", "ts", "new_col")) { + HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name) + } + checkAnswer(s"select id, name, price, ts, new_col from $tableName")( + Seq(1, "a1", 10.0, 1000, null) + ) + // disable schema on read. + spark.sql("set hoodie.schema.on.read.enable=false") + spark.sql(s"refresh table $tableName") + // Insert data to the new table. + spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')") + // write should succeed. and subsequent read should succeed as well. + checkAnswer(s"select id, name, price, ts, new_col from $tableName")( + Seq(1, "a1", 10.0, 1000, null), + Seq(2, "a2", 12.0, 2000, "e0") + ) + } + } + } + test("Test Partition Table alter ") { withTempDir { tmp => Seq("cow", "mor").foreach { tableType =>