Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5296] Allow disable schema on read after enabling #7333

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{CommitUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
Expand Down Expand Up @@ -66,7 +65,6 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.mutable
import scala.util.matching.Regex

object HoodieSparkSqlWriter {

Expand Down Expand Up @@ -200,16 +198,20 @@ object HoodieSparkSqlWriter {
.getOrElse(getAvroRecordNameAndNamespace(tblName))

val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace)
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext).orElse {
val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && schemaEvolutionEnabled) {
Some(AvroInternalSchemaConverter.convert(sourceSchema))
} else {
None
val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
val internalSchemaOpt = if (schemaEvolutionEnabled) {
getLatestTableInternalSchema(fs, basePath, sparkContext).orElse {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema) {
Some(AvroInternalSchemaConverter.convert(sourceSchema))
} else {
None
}
}
} else {
None
}

// NOTE: Target writer's schema is deduced based on
Expand Down Expand Up @@ -249,7 +251,13 @@ object HoodieSparkSqlWriter {
}

// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
// Create a HoodieWriteClient & issue the delete.
val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
val internalSchemaOpt : Option[org.apache.hudi.internal.schema.InternalSchema]= if (schemaEvolutionEnabled) {
getLatestTableInternalSchema(fs, basePath, sparkContext)
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
} else {
None
}
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
Expand Down Expand Up @@ -291,8 +299,6 @@ object HoodieSparkSqlWriter {
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime)
(writeStatuses, client)


case _ =>
// Here all other (than DELETE, DELETE_PARTITION) write operations are handled
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,14 @@ class IncrementalRelation(val sqlContext: SQLContext,
val (usedSchema, internalSchema) = {
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) {
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
val iSchema = if (isSchemaEvolutionEnabledOnRead) {
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
if (useEndInstantSchema && !commitsToReturn.isEmpty) {
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
} else {
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
}
} else {
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
Option.empty.asInstanceOf[InternalSchema]
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
}

val tableSchema = if (useEndInstantSchema && iSchema.isEmptySchema) {
Expand Down Expand Up @@ -121,6 +125,13 @@ class IncrementalRelation(val sqlContext: SQLContext,

override def schema: StructType = usedSchema

private def isSchemaEvolutionEnabledOnRead = {
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sqlContext.sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}

override def buildScan(): RDD[Row] = {
if (usedSchema == StructType(Nil)) {
// if first commit in a table is an empty commit without schema, return empty RDD here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.TableIdentifier

Expand Down Expand Up @@ -168,6 +169,56 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
}
}

test("Test Enable and Disable Schema on read") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// Create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// Insert data to the new table.
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)

// add column
spark.sql(s"alter table $tableName add columns(new_col string)")
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName))
assertResult(Seq("id", "name", "price", "ts", "new_col")) {
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
}
checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
Seq(1, "a1", 10.0, 1000, null)
)
// disable schema on read.
spark.sessionState.conf.setConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false")
spark.sql(s"refresh table $tableName")
// Insert data to the new table.
spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')")
// write should succeed. and subsequent read should succeed as well.
checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
Seq(1, "a1", 10.0, 1000, null),
Seq(2, "a2", 12.0, 2000, "e0")
)
}
}


test("Test Alter Rename Table") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down