Skip to content

Commit

Permalink
[HUDI-5296] Allow disable schema on read after enabling (apache#7421)
Browse files Browse the repository at this point in the history
If someone has enabled schema on read by mistake and never really renamed or dropped a column. it should be feasible to disable schema on read. This patch fixes that. essentially both on read and write path, if "hoodie.schema.on.read.enable" config is not set, it will fallback to regular code path. It might fail or users might miss data if any they have performed any irrevocable changes like renames. But for rest, this should work.
  • Loading branch information
nsivabalan authored and fengjian committed Apr 5, 2023
1 parent 51efff3 commit bf1c21d
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) {
None
} else {
Try {
Expand Down Expand Up @@ -639,15 +639,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

private def prunePartitionColumns(dataStructSchema: StructType): StructType =
StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))

private def isSchemaEvolutionEnabled = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
}

object HoodieBaseRelation extends SparkAdapterSupport {
Expand Down Expand Up @@ -749,4 +740,13 @@ object HoodieBaseRelation extends SparkAdapterSupport {
})
}
}

def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,17 @@ object HoodieSparkSqlWriter {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
// Create the table if not present
if (!tableExists) {
val tableMetaClient = if (tableExists) {
HoodieTableMetaClient.builder
.setConf(sparkContext.hadoopConfiguration)
.setBasePath(path)
.build()
} else {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);

val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setDatabaseName(databaseName)
.setTableName(tblName)
Expand All @@ -180,8 +183,8 @@ object HoodieSparkSqlWriter {
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
}
}
tableConfig = tableMetaClient.getTableConfig

val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)

Expand All @@ -191,28 +194,23 @@ object HoodieSparkSqlWriter {
classOf[Schema]))

val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean

val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration)
val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, tableMetaClient)
// NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and
// Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that
// play crucial role in establishing compatibility b/w schemas
val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s => (s.getName, s.getNamespace))
.getOrElse(getAvroRecordNameAndNamespace(tblName))

val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace)
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext).orElse {
val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && schemaEvolutionEnabled) {
val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
// in case sourceSchema contains HoodieRecord.HOODIE_META_COLUMNS
val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField)))
} else {
None
}
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField)))
} else {
None
}
}

// NOTE: Target writer's schema is deduced based on
Expand Down Expand Up @@ -252,7 +250,7 @@ object HoodieSparkSqlWriter {
}

// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
Expand Down Expand Up @@ -294,8 +292,6 @@ object HoodieSparkSqlWriter {
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime)
(writeStatuses, client)


case _ =>
// Here all other (than DELETE, DELETE_PARTITION) write operations are handled
//
Expand Down Expand Up @@ -562,25 +558,24 @@ object HoodieSparkSqlWriter {
}

/**
* get latest internalSchema from table
*
* @param fs instance of FileSystem.
* @param basePath base path.
* @param sparkContext instance of spark context.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = {
try {
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
* get latest internalSchema from table
*
* @param config instance of {@link HoodieConfig}
* @param tableMetaClient instance of HoodieTableMetaClient
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableInternalSchema(config: HoodieConfig,
tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = {
if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
Option.empty[InternalSchema]
} else {
try {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata
if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None
} else {
None
} catch {
case _: Exception => None
}
} catch {
case _: Exception => None
}
}

Expand All @@ -589,22 +584,11 @@ object HoodieSparkSqlWriter {
}

private def getLatestTableSchema(spark: SparkSession,
tableBasePath: Path,
tableId: TableIdentifier,
hadoopConf: Configuration): Option[Schema] = {
val fs = tableBasePath.getFileSystem(hadoopConf)
tableMetaClient: HoodieTableMetaClient): Option[Schema] = {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val latestTableSchemaFromCommitMetadata =
if (FSUtils.isTableExists(tableBasePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder
.setConf(hadoopConf)
.setBasePath(tableBasePath.toString)
.build()
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
} else {
None
}

toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
latestTableSchemaFromCommitMetadata.orElse {
getCatalogTable(spark, tableId).map { catalogTable =>
val (structName, namespace) = getAvroRecordNameAndNamespace(tableId.table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.hudi

import org.apache.avro.Schema
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
Expand Down Expand Up @@ -47,6 +48,7 @@ import scala.collection.mutable
* Relation, that implements the Hoodie incremental view.
*
* Implemented for Copy_on_write storage.
* TODO: rebase w/ HoodieBaseRelation HUDI-5362
*
*/
class IncrementalRelation(val sqlContext: SQLContext,
Expand Down Expand Up @@ -90,7 +92,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
val (usedSchema, internalSchema) = {
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) {
val iSchema : InternalSchema = if (!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) {
InternalSchema.getEmptyInternalSchema
} else if (useEndInstantSchema && !commitsToReturn.isEmpty) {
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
} else {
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit}
import org.apache.spark.sql.types.StringType
Expand Down Expand Up @@ -174,6 +174,58 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}

test("Test Enable and Disable Schema on read") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
// Create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// Insert data to the new table.
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)

// add column
spark.sql(s"alter table $tableName add columns(new_col string)")
val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName))
assertResult(Seq("id", "name", "price", "ts", "new_col")) {
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
}
checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
Seq(1, "a1", 10.0, 1000, null)
)
// disable schema on read.
spark.sql("set hoodie.schema.on.read.enable=false")
spark.sql(s"refresh table $tableName")
// Insert data to the new table.
spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')")
// write should succeed. and subsequent read should succeed as well.
checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
Seq(1, "a1", 10.0, 1000, null),
Seq(2, "a2", 12.0, 2000, "e0")
)
}
}
}

test("Test Partition Table alter ") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down

0 comments on commit bf1c21d

Please sign in to comment.