Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Dec 9, 2022
1 parent cee024a commit a0e7091
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val internalSchemaOpt = if (!HoodieParamUtils.isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) {
val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) {
None
} else {
Try {
Expand Down Expand Up @@ -740,4 +740,13 @@ object HoodieBaseRelation extends SparkAdapterSupport {
})
}
}

def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -150,42 +150,41 @@ object HoodieSparkSqlWriter {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
// Create the table if not present
var tableMetaClient : HoodieTableMetaClient = null
if (!tableExists) {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setDatabaseName(databaseName)
.setTableName(tblName)
.setBaseFileFormat(baseFileFormat)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
// but we are interested in what user has set, hence fetching from optParams.
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
.setKeyGeneratorClassProp(originKeyGeneratorClassName)
.set(timestampKeyGeneratorConfigs)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
} else {
tableMetaClient = HoodieTableMetaClient.builder
.setConf(sparkContext.hadoopConfiguration)
.setBasePath(path)
.build()
val tableMetaClient = if (tableExists) {
HoodieTableMetaClient.builder
.setConf(sparkContext.hadoopConfiguration)
.setBasePath(path)
.build()
} else {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setDatabaseName(databaseName)
.setTableName(tblName)
.setBaseFileFormat(baseFileFormat)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
// but we are interested in what user has set, hence fetching from optParams.
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
.setKeyGeneratorClassProp(originKeyGeneratorClassName)
.set(timestampKeyGeneratorConfigs)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
}
tableConfig = tableMetaClient.getTableConfig

val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)

Expand All @@ -195,16 +194,15 @@ object HoodieSparkSqlWriter {
classOf[Schema]))

val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration,
tableMetaClient)
val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, tableMetaClient)
// NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and
// Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that
// play crucial role in establishing compatibility b/w schemas
val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s => (s.getName, s.getNamespace))
.getOrElse(getAvroRecordNameAndNamespace(tblName))

val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace)
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext, hoodieConfig, tableMetaClient).orElse {
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
Expand Down Expand Up @@ -252,7 +250,7 @@ object HoodieSparkSqlWriter {
}

// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext, hoodieConfig, tableMetaClient)
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
Expand Down Expand Up @@ -560,14 +558,13 @@ object HoodieSparkSqlWriter {
}

/**
* get latest internalSchema from table
*
* @param fs instance of FileSystem.
* @param basePath base path.
* @param sparkContext instance of spark context.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, config: HoodieConfig,
* get latest internalSchema from table
*
* @param config instance of {@link HoodieConfig}
* @param tableMetaClient instance of HoodieTableMetaClient
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableInternalSchema(config: HoodieConfig,
tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = {
if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
Option.empty[InternalSchema]
Expand All @@ -587,9 +584,7 @@ object HoodieSparkSqlWriter {
}

private def getLatestTableSchema(spark: SparkSession,
tableBasePath: Path,
tableId: TableIdentifier,
hadoopConf: Configuration,
tableMetaClient: HoodieTableMetaClient): Option[Schema] = {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val latestTableSchemaFromCommitMetadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.hudi

import org.apache.avro.Schema
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
Expand Down Expand Up @@ -47,6 +48,7 @@ import scala.collection.mutable
* Relation, that implements the Hoodie incremental view.
*
* Implemented for Copy_on_write storage.
* TODO: rebase w/ HoodieBaseRelation HUDI-5362
*
*/
class IncrementalRelation(val sqlContext: SQLContext,
Expand Down Expand Up @@ -90,7 +92,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
val (usedSchema, internalSchema) = {
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val iSchema : InternalSchema = if (!HoodieParamUtils.isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) {
val iSchema : InternalSchema = if (!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) {
InternalSchema.getEmptyInternalSchema
} else if (useEndInstantSchema && !commitsToReturn.isEmpty) {
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.TableIdentifier

Expand Down

0 comments on commit a0e7091

Please sign in to comment.