Skip to content


[HUDI-5296] Allow disable schema on read after enabling (apache#7421)
Browse files Browse the repository at this point in the history
If someone has enabled schema on read by mistake and never really renamed or dropped a column. it should be feasible to disable schema on read. This patch fixes that. essentially both on read and write path, if "" config is not set, it will fallback to regular code path. It might fail or users might miss data if any they have performed any irrevocable changes like renames. But for rest, this should work.
  • Loading branch information
nsivabalan authored and Alexey Kudinkin committed Dec 14, 2022
1 parent 2851b9a commit 904c10d
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) {
} else {
Try {
Expand Down Expand Up @@ -635,15 +635,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

private def prunePartitionColumns(dataStructSchema: StructType): StructType =
StructType(dataStructSchema.filterNot(f => partitionColumns.contains(

private def isSchemaEvolutionEnabled = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||

object HoodieBaseRelation extends SparkAdapterSupport {
Expand Down Expand Up @@ -745,4 +736,13 @@ object HoodieBaseRelation extends SparkAdapterSupport {

def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieWriterUtils._
Expand Down Expand Up @@ -53,6 +54,8 @@ import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SPARK_VERSION, SparkContext}
Expand Down Expand Up @@ -81,6 +84,7 @@ object HoodieSparkSqlWriter {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
val path = optParams("path")
val basePath = new Path(path)
val spark = sqlContext.sparkSession
val sparkContext = sqlContext.sparkContext
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
Expand All @@ -96,6 +100,7 @@ object HoodieSparkSqlWriter {
val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
val tableIdentifier = TableIdentifier(tblName, if (databaseName.isEmpty) None else Some(databaseName))
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")

Expand Down Expand Up @@ -136,15 +141,18 @@ object HoodieSparkSqlWriter {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
// Create the table if not present
if (!tableExists) {
val tableMetaClient = if (tableExists) {
} else {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);

val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
Expand All @@ -166,8 +174,8 @@ object HoodieSparkSqlWriter {
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
tableConfig = tableMetaClient.getTableConfig

val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)
val dropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
Expand Down Expand Up @@ -195,7 +203,7 @@ object HoodieSparkSqlWriter {

// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
Expand Down Expand Up @@ -245,10 +253,10 @@ object HoodieSparkSqlWriter {

// TODO(HUDI-4472) revisit and simplify schema handling
val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema)
val latestTableSchema = getLatestTableSchema(spark, tableIdentifier, tableMetaClient).getOrElse(sourceSchema)

val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
var internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)

val writerSchema: Schema =
if (reconcileSchema) {
Expand Down Expand Up @@ -281,6 +289,34 @@ object HoodieSparkSqlWriter {
sparkContext.getConf.registerAvroSchemas(writerSchema)"Registered avro schema : ${writerSchema.toString(true)}")
case _ =>
// Here all other (than DELETE, DELETE_PARTITION) write operations are handled
// Convert to RDD[HoodieRecord]
val avroRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace,
// Check whether partition columns should be persisted w/in the data-files, or should
// be instead omitted from them and simply encoded into the partition path (which is Spark's
// behavior by default)
// TODO move partition columns handling down into the handlers
val shouldDropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
val dataFileSchema = if (shouldDropPartitionColumns) {
val truncatedSchema = generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema)
// NOTE: We have to register this schema w/ Kryo to make sure it's able to apply an optimization
// allowing it to avoid the need to ser/de the whole schema along _every_ Avro record
registerAvroSchemasWithKryo(sparkContext, truncatedSchema)
} else {
// NOTE: Avro's [[Schema]] can't be effectively serialized by JVM native serialization framework
// (due to containing cyclic refs), therefore we have to convert it to string before
// passing onto the Executor
val dataFileSchemaStr = dataFileSchema.toString
>>>>>>> aacfe6de80 ([HUDI-5296] Allow disable schema on read after enabling (#7421))*/

// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
Expand Down Expand Up @@ -367,47 +403,48 @@ object HoodieSparkSqlWriter {

* get latest internalSchema from table
* @param fs instance of FileSystem.
* @param basePath base path.
* @param sparkContext instance of spark context.
* @param schema incoming record's schema.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = {
try {
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
* get latest internalSchema from table
* @param config instance of {@link HoodieConfig}
* @param tableMetaClient instance of HoodieTableMetaClient
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
def getLatestTableInternalSchema(config: HoodieConfig,
tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = {
if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
} else {
try {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata
if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None
} else {
} catch {
case _: Exception => None
} catch {
case _: Exception => None

* Checks if schema needs upgrade (if incoming record's write schema is old while table schema got evolved).
* @param fs instance of FileSystem.
* @param basePath base path.
* @param sparkContext instance of spark context.
* @param schema incoming record's schema.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[Schema] = {
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
private def registerAvroSchemasWithKryo(sparkContext: SparkContext, targetAvroSchemas: Schema*): Unit = {
sparkContext.getConf.registerAvroSchemas(targetAvroSchemas: _*)

private def getLatestTableSchema(spark: SparkSession,
tableId: TableIdentifier,
tableMetaClient: HoodieTableMetaClient): Option[Schema] = {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val latestTableSchemaFromCommitMetadata =
latestTableSchemaFromCommitMetadata.orElse {
getCatalogTable(spark, tableId).map { catalogTable =>
val (structName, namespace) = getAvroRecordNameAndNamespace(tableId.table)
convertStructTypeToAvroSchema(catalogTable.schema, structName, namespace)

private def getCatalogTable(spark: SparkSession, tableId: TableIdentifier): Option[CatalogTable] = {
if (spark.sessionState.catalog.tableExists(tableId)) {
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}

import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
Expand All @@ -48,6 +49,7 @@ import scala.collection.mutable
* Relation, that implements the Hoodie incremental view.
* Implemented for Copy_on_write storage.
* TODO: rebase w/ HoodieBaseRelation HUDI-5362
class IncrementalRelation(val sqlContext: SQLContext,
Expand Down Expand Up @@ -91,7 +93,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
val (usedSchema, internalSchema) = {"Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) {
val iSchema : InternalSchema = if (!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) {
} else if (useEndInstantSchema && !commitsToReturn.isEmpty) {
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.{arrays_zip, col}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
Expand Down Expand Up @@ -171,6 +171,58 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {

test("Test Enable and Disable Schema on read") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
if (HoodieSparkUtils.gteqSpark3_1) {
// Create table
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'ts'
| )

// Insert data to the new table.
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)

// add column
spark.sql(s"alter table $tableName add columns(new_col string)")
val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName))
assertResult(Seq("id", "name", "price", "ts", "new_col")) {
checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
Seq(1, "a1", 10.0, 1000, null)
// disable schema on read.
spark.sql(s"refresh table $tableName")
// Insert data to the new table.
spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')")
// write should succeed. and subsequent read should succeed as well.
checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
Seq(1, "a1", 10.0, 1000, null),
Seq(2, "a2", 12.0, 2000, "e0")

test("Test Partition Table alter ") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down

0 comments on commit 904c10d

Please sign in to comment.