Skip to content

Commit

Permalink
[HUDI-5346][HUDI-5320] Fixing Create Table as Select (CTAS) performan…
Browse files Browse the repository at this point in the history
…ce gaps (#7370)

This PR is addressing some of the performance traps detected while stress-testing Spark SQL's Create Table as Select command:

Avoids reordering of the columns w/in CTAS (there's no need for it, InsertIntoTableCommand will be resolving columns anyway)
Fixing validation sequence w/in InsertIntoTableCommand to first resolve the columns and then run validation (currently it's done the other way around)
Propagating properties specified in CTAS to the HoodieSparkSqlWriter (for ex, currently there's no way to disable MT when using CTAS precisely b/c of the fact that these properties are not propagated)
Additionally following improvements to HoodieBulkInsertHelper were made:

Now if meta-fields are disabled, we won't be dereferencing incoming Dataset into RDD and instead simply add stubbed out meta-fields t/h additional Projection
  • Loading branch information
alexeykudinkin authored Dec 9, 2022
1 parent 2da69d3 commit 8de5357
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -58,31 +60,6 @@ object HoodieDatasetBulkInsertHelper extends Logging {
val populateMetaFields = config.populateMetaFields()
val schema = df.schema

val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
"Key-generator class name is required")

val prependedRdd: RDD[InternalRow] =
df.queryExecution.toRdd.mapPartitions { iter =>
val keyGenerator =
ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
.asInstanceOf[SparkKeyGeneratorInterface]

iter.map { row =>
val (recordKey, partitionPath) =
if (populateMetaFields) {
(keyGenerator.getRecordKey(row, schema), keyGenerator.getPartitionPath(row, schema))
} else {
(UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
}
val commitTimestamp = UTF8String.EMPTY_UTF8
val commitSeqNo = UTF8String.EMPTY_UTF8
val filename = UTF8String.EMPTY_UTF8

// TODO use mutable row, avoid re-allocating
new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false)
}
}

val metaFields = Seq(
StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
Expand All @@ -92,11 +69,44 @@ object HoodieDatasetBulkInsertHelper extends Logging {

val updatedSchema = StructType(metaFields ++ schema.fields)

val updatedDF = if (populateMetaFields && config.shouldCombineBeforeInsert) {
val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
val updatedDF = if (populateMetaFields) {
val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
"Key-generator class name is required")

val prependedRdd: RDD[InternalRow] =
df.queryExecution.toRdd.mapPartitions { iter =>
val keyGenerator =
ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
.asInstanceOf[SparkKeyGeneratorInterface]

iter.map { row =>
val recordKey = keyGenerator.getRecordKey(row, schema)
val partitionPath = keyGenerator.getPartitionPath(row, schema)
val commitTimestamp = UTF8String.EMPTY_UTF8
val commitSeqNo = UTF8String.EMPTY_UTF8
val filename = UTF8String.EMPTY_UTF8

// TODO use mutable row, avoid re-allocating
new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false)
}
}

val dedupedRdd = if (config.shouldCombineBeforeInsert) {
dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
} else {
prependedRdd
}

HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema)
} else {
HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, prependedRdd, updatedSchema)
// NOTE: In cases when we're not populating meta-fields we actually don't
// need access to the [[InternalRow]] and therefore can avoid the need
// to dereference [[DataFrame]] into [[RDD]]
val query = df.queryExecution.logical
val metaFieldsStubs = metaFields.map(f => Alias(Literal(UTF8String.EMPTY_UTF8, dataType = StringType), f.name)())
val prependedQuery = Project(metaFieldsStubs ++ query.output, query)

HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery)
}

val trimmedDF = if (shouldDropPartitionColumns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ import org.apache.spark.util.MutablePair
*/
object HoodieUnsafeUtils {

/**
* Creates [[DataFrame]] from provided [[plan]]
*
* @param spark spark's session
* @param plan given plan to wrap into [[DataFrame]]
*/
def createDataFrameFrom(spark: SparkSession, plan: LogicalPlan): DataFrame =
Dataset.ofRows(spark, plan)

/**
* Creates [[DataFrame]] from the in-memory [[Seq]] of [[Row]]s with provided [[schema]]
*
Expand All @@ -39,7 +48,6 @@ object HoodieUnsafeUtils {
* @param spark spark's session
* @param rows collection of rows to base [[DataFrame]] on
* @param schema target [[DataFrame]]'s schema
* @return
*/
def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema: StructType): DataFrame =
Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes, rows))
Expand All @@ -53,7 +61,6 @@ object HoodieUnsafeUtils {
* @param spark spark's session
* @param rows collection of rows to base [[DataFrame]] on
* @param schema target [[DataFrame]]'s schema
* @return
*/
def createDataFrameFromInternalRows(spark: SparkSession, rows: Seq[InternalRow], schema: StructType): DataFrame =
Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows))
Expand All @@ -65,7 +72,6 @@ object HoodieUnsafeUtils {
* @param spark spark's session
* @param rdd RDD w/ [[Row]]s to base [[DataFrame]] on
* @param schema target [[DataFrame]]'s schema
* @return
*/
def createDataFrameFromRDD(spark: SparkSession, rdd: RDD[InternalRow], schema: StructType): DataFrame =
spark.internalCreateDataFrame(rdd, schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ import scala.collection.JavaConversions._
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.matching.Regex

object HoodieSparkSqlWriter {

Expand Down Expand Up @@ -121,6 +120,7 @@ object HoodieSparkSqlWriter {
}
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
var operation = WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
// TODO clean up
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS is true
// Auto-correct the operation to "insert" if OPERATION is set to "upsert" wrongly
// or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) .
Expand Down Expand Up @@ -749,8 +749,7 @@ object HoodieSparkSqlWriter {
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
userDefinedBulkInsertPartitionerOpt.get
}
else {
} else {
BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig.getBulkInsertSortMode)
}
} else {
Expand Down Expand Up @@ -842,7 +841,7 @@ object HoodieSparkSqlWriter {
properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key, SPARK_VERSION)
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))

//Collect exceptions in list because we want all sync to run. Then we can throw
// Collect exceptions in list because we want all sync to run. Then we can throw
val metaSyncExceptions = new ListBuffer[HoodieException]()
syncClientToolClassSet.foreach(impl => {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ package org.apache.spark.sql.hudi.command

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.util.ConfigUtils

import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}

import scala.collection.JavaConverters._
Expand All @@ -43,8 +42,8 @@ case class CreateHoodieTableAsSelectCommand(
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)

override def run(sparkSession: SparkSession): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
checkState(table.tableType != CatalogTableType.VIEW)
checkState(table.provider.isDefined)

val hasQueryAsProp = (table.storage.properties ++ table.properties).contains(ConfigUtils.IS_QUERY_AS_RO_TABLE)
if (hasQueryAsProp) {
Expand All @@ -53,11 +52,11 @@ case class CreateHoodieTableAsSelectCommand(

val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = table.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
val qualifiedTableIdentifier = table.identifier.copy(database = Some(db))
val tableName = qualifiedTableIdentifier.unquotedString

if (sessionState.catalog.tableExists(tableIdentWithDB)) {
assert(mode != SaveMode.Overwrite,
if (sessionState.catalog.tableExists(qualifiedTableIdentifier)) {
checkState(mode != SaveMode.Overwrite,
s"Expect the table $tableName has been dropped when the save mode is Overwrite")

if (mode == SaveMode.ErrorIfExists) {
Expand All @@ -72,47 +71,44 @@ case class CreateHoodieTableAsSelectCommand(
}
}

// ReOrder the query which move the partition columns to the last of the project list
val reOrderedQuery = reOrderPartitionColumn(query, table.partitionColumnNames)
// Remove some properties should not be used
val newStorage = new CatalogStorageFormat(
table.storage.locationUri,
table.storage.inputFormat,
table.storage.outputFormat,
table.storage.serde,
table.storage.compressed,
table.storage.properties.--(needFilterProps))
val newTable = table.copy(
identifier = tableIdentWithDB,
storage = newStorage,
schema = reOrderedQuery.schema,
properties = table.properties.--(needFilterProps)
val updatedStorageFormat = table.storage.copy(
properties = table.storage.properties -- needFilterProps)

val updatedTable = table.copy(
identifier = qualifiedTableIdentifier,
storage = updatedStorageFormat,
// TODO need to add meta-fields here
schema = query.schema,
properties = table.properties -- needFilterProps
)

val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable)
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, updatedTable)
val tablePath = hoodieCatalogTable.tableLocation
val hadoopConf = sparkSession.sessionState.newHadoopConf()

// Execute the insert query
try {
// init hoodie table
// Init hoodie table
hoodieCatalogTable.initHoodieTable()

val tblProperties = hoodieCatalogTable.catalogProperties
val options = Map(
val tableProperties = hoodieCatalogTable.catalogProperties
// NOTE: Users might be specifying write-configuration (inadvertently) as options or table properties
// in CTAS, therefore we need to make sure that these are appropriately propagated to the
// write operation
val options = tableProperties ++ Map(
HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString,
HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava),
HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(newTable.properties.asJava),
HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tableProperties.asJava),
HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(updatedTable.properties.asJava),
DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
)
val partitionSpec = newTable.partitionColumnNames.map((_, None)).toMap
val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, reOrderedQuery, partitionSpec,
val partitionSpec = updatedTable.partitionColumnNames.map((_, None)).toMap
val success = InsertIntoHoodieTableCommand.run(sparkSession, updatedTable, query, partitionSpec,
mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options)
if (success) {
// If write success, create the table in catalog if it has not synced to the
// catalog by the meta sync.
if (!sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
if (!sparkSession.sessionState.catalog.tableExists(qualifiedTableIdentifier)) {
// create catalog table for this hoodie table
CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, mode == SaveMode.Ignore)
}
Expand All @@ -132,16 +128,4 @@ case class CreateHoodieTableAsSelectCommand(
val fs = path.getFileSystem(conf)
fs.delete(path, true)
}

private def reOrderPartitionColumn(query: LogicalPlan,
partitionColumns: Seq[String]): LogicalPlan = {
if (partitionColumns.isEmpty) {
query
} else {
val nonPartitionAttrs = query.output.filter(p => !partitionColumns.contains(p.name))
val partitionAttrs = query.output.filter(p => partitionColumns.contains(p.name))
val reorderAttrs = nonPartitionAttrs ++ partitionAttrs
Project(reorderAttrs, query)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
val targetPartitionSchema = catalogTable.partitionSchema
val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)

validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
// Make sure we strip out meta-fields from the incoming dataset (these will have to be discarded anyway)
val cleanedQuery = stripMetaFields(query)
// To validate and align properly output of the query, we simply filter out partition columns with already
Expand All @@ -144,6 +143,8 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
// positionally for example
val expectedQueryColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name))
val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, catalogTable, conf)
// After potential reshaping validate that the output of the query conforms to the table's schema
validate(removeMetaFields(coercedQueryOutput.schema), partitionsSpec, catalogTable)

val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
spark.sql(sql)
} catch {
case e: Throwable =>
assertResult(errorMsg)(e.getMessage)
assertResult(errorMsg.trim)(e.getMessage.trim)
hasException = true
}
assertResult(true)(hasException)
Expand All @@ -139,7 +139,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
try {
spark.sql(sql)
} catch {
case e: Throwable if e.getMessage.contains(errorMsg) => hasException = true
case e: Throwable if e.getMessage.trim.contains(errorMsg.trim) => hasException = true
case f: Throwable => fail("Exception should contain: " + errorMsg + ", error message: " + f.getMessage, f)
}
assertResult(true)(hasException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,17 +529,19 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
""".stripMargin)
checkException(s"insert into $tableName partition(dt = '2021-06-20') select 1, 'a1', 10, '2021-06-20'") (
"Expected table's schema: " +
"[StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
"query's output (including static partition values): " +
"[StructField(1,IntegerType,false), StructField(a1,StringType,false), StructField(10,IntegerType,false), StructField(2021-06-20,StringType,false), StructField(dt,StringType,true)]"
checkExceptionContain(s"insert into $tableName partition(dt = '2021-06-20') select 1, 'a1', 10, '2021-06-20'") (
"""
|too many data columns:
|Table columns: 'id', 'name', 'price'
|Data columns: '1', 'a1', '10', '2021-06-20'
|""".stripMargin
)
checkException(s"insert into $tableName select 1, 'a1', 10")(
"Expected table's schema: " +
"[StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
"query's output (including static partition values): " +
"[StructField(1,IntegerType,false), StructField(a1,StringType,false), StructField(10,IntegerType,false)]"
checkExceptionContain(s"insert into $tableName select 1, 'a1', 10")(
"""
|not enough data columns:
|Table columns: 'id', 'name', 'price', 'dt'
|Data columns: '1', 'a1', '10'
|""".stripMargin
)
spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql("set hoodie.sql.insert.mode = strict")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, ExplainCommand}
Expand All @@ -31,8 +32,14 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
expected: Seq[Attribute],
query: LogicalPlan,
byName: Boolean,
conf: SQLConf): LogicalPlan =
SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName, expected, query, byName)
conf: SQLConf): LogicalPlan = {
// NOTE: We have to apply [[ResolveUpCast]] and [[SimplifyCasts]] rules since by default Spark 2.x will
// always be wrapping matched attributes into [[UpCast]]s which aren't resolvable and render some
// APIs like [[QueryPlan.schema]] unusable
SimplifyCasts.apply(
SimpleAnalyzer.ResolveUpCast.apply(
SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName, expected, query, byName)))
}

def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
ExplainCommand(plan, extended = extended)
Expand Down

0 comments on commit 8de5357

Please sign in to comment.