Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5296] Allow disable schema on read after enabling #7421

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) {
None
} else {
Try {
Expand Down Expand Up @@ -639,15 +639,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

private def prunePartitionColumns(dataStructSchema: StructType): StructType =
StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))

private def isSchemaEvolutionEnabled = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
}

object HoodieBaseRelation extends SparkAdapterSupport {
Expand Down Expand Up @@ -749,4 +740,13 @@ object HoodieBaseRelation extends SparkAdapterSupport {
})
}
}

def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,17 @@ object HoodieSparkSqlWriter {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
// Create the table if not present
if (!tableExists) {
val tableMetaClient = if (tableExists) {
HoodieTableMetaClient.builder
.setConf(sparkContext.hadoopConfiguration)
.setBasePath(path)
.build()
} else {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);

val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setDatabaseName(databaseName)
.setTableName(tblName)
Expand All @@ -180,8 +183,8 @@ object HoodieSparkSqlWriter {
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
}
}
tableConfig = tableMetaClient.getTableConfig

val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)

Expand All @@ -191,28 +194,23 @@ object HoodieSparkSqlWriter {
classOf[Schema]))

val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean

val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration)
val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, tableMetaClient)
// NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and
// Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that
// play crucial role in establishing compatibility b/w schemas
val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s => (s.getName, s.getNamespace))
.getOrElse(getAvroRecordNameAndNamespace(tblName))

val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace)
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext).orElse {
val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && schemaEvolutionEnabled) {
val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
// in case sourceSchema contains HoodieRecord.HOODIE_META_COLUMNS
val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField)))
} else {
None
}
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField)))
} else {
None
}
}

// NOTE: Target writer's schema is deduced based on
Expand Down Expand Up @@ -252,7 +250,7 @@ object HoodieSparkSqlWriter {
}

// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
Expand Down Expand Up @@ -294,8 +292,6 @@ object HoodieSparkSqlWriter {
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime)
(writeStatuses, client)


case _ =>
// Here all other (than DELETE, DELETE_PARTITION) write operations are handled
//
Expand Down Expand Up @@ -562,25 +558,24 @@ object HoodieSparkSqlWriter {
}

/**
* get latest internalSchema from table
*
* @param fs instance of FileSystem.
* @param basePath base path.
* @param sparkContext instance of spark context.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = {
try {
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
* get latest internalSchema from table
*
* @param config instance of {@link HoodieConfig}
* @param tableMetaClient instance of HoodieTableMetaClient
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableInternalSchema(config: HoodieConfig,
tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = {
if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
Option.empty[InternalSchema]
} else {
try {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata
if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None
} else {
None
} catch {
case _: Exception => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do vaguely remember that there were a lot of noise coming from TableScheamResolver, but can't recollect exactly what that was. Do you have context on what those are?

In general swallowing exceptions in a blanket fashion w/o a log like that it's not a good idea (it's fine for particular exception which is not an issue, but not for all exceptions)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to address in this one, we can take as a follow-up

}
} catch {
case _: Exception => None
}
}

Expand All @@ -589,22 +584,11 @@ object HoodieSparkSqlWriter {
}

private def getLatestTableSchema(spark: SparkSession,
tableBasePath: Path,
tableId: TableIdentifier,
hadoopConf: Configuration): Option[Schema] = {
val fs = tableBasePath.getFileSystem(hadoopConf)
tableMetaClient: HoodieTableMetaClient): Option[Schema] = {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val latestTableSchemaFromCommitMetadata =
if (FSUtils.isTableExists(tableBasePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder
.setConf(hadoopConf)
.setBasePath(tableBasePath.toString)
.build()
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
} else {
None
}

toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
latestTableSchemaFromCommitMetadata.orElse {
getCatalogTable(spark, tableId).map { catalogTable =>
val (structName, namespace) = getAvroRecordNameAndNamespace(tableId.table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.hudi

import org.apache.avro.Schema
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
Expand Down Expand Up @@ -47,6 +48,7 @@ import scala.collection.mutable
* Relation, that implements the Hoodie incremental view.
*
* Implemented for Copy_on_write storage.
* TODO: rebase w/ HoodieBaseRelation HUDI-5362
*
*/
class IncrementalRelation(val sqlContext: SQLContext,
Expand Down Expand Up @@ -90,7 +92,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
val (usedSchema, internalSchema) = {
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) {
val iSchema : InternalSchema = if (!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) {
InternalSchema.getEmptyInternalSchema
} else if (useEndInstantSchema && !commitsToReturn.isEmpty) {
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
} else {
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit}
import org.apache.spark.sql.types.StringType
Expand Down Expand Up @@ -174,6 +174,58 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}

test("Test Enable and Disable Schema on read") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
// Create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// Insert data to the new table.
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)

// add column
spark.sql(s"alter table $tableName add columns(new_col string)")
val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName))
assertResult(Seq("id", "name", "price", "ts", "new_col")) {
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
}
checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
Seq(1, "a1", 10.0, 1000, null)
)
// disable schema on read.
spark.sql("set hoodie.schema.on.read.enable=false")
spark.sql(s"refresh table $tableName")
// Insert data to the new table.
spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')")
// write should succeed. and subsequent read should succeed as well.
checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
Seq(1, "a1", 10.0, 1000, null),
Seq(2, "a2", 12.0, 2000, "e0")
)
}
}
}

test("Test Partition Table alter ") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down