diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 296abaf4f5e2..79fa67acdb9d 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -33,7 +33,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row} import org.apache.spark.unsafe.types.UTF8String @@ -58,31 +60,6 @@ object HoodieDatasetBulkInsertHelper extends Logging { val populateMetaFields = config.populateMetaFields() val schema = df.schema - val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME, - "Key-generator class name is required") - - val prependedRdd: RDD[InternalRow] = - df.queryExecution.toRdd.mapPartitions { iter => - val keyGenerator = - ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)) - .asInstanceOf[SparkKeyGeneratorInterface] - - iter.map { row => - val (recordKey, partitionPath) = - if (populateMetaFields) { - (keyGenerator.getRecordKey(row, schema), keyGenerator.getPartitionPath(row, schema)) - } else { - (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8) - } - val commitTimestamp = UTF8String.EMPTY_UTF8 - val commitSeqNo = UTF8String.EMPTY_UTF8 - val filename = UTF8String.EMPTY_UTF8 - - // TODO use mutable row, avoid re-allocating - new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false) - } - } - val metaFields = Seq( StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType), StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType), @@ -92,11 +69,44 @@ object HoodieDatasetBulkInsertHelper extends Logging { val updatedSchema = StructType(metaFields ++ schema.fields) - val updatedDF = if (populateMetaFields && config.shouldCombineBeforeInsert) { - val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config)) + val updatedDF = if (populateMetaFields) { + val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME, + "Key-generator class name is required") + + val prependedRdd: RDD[InternalRow] = + df.queryExecution.toRdd.mapPartitions { iter => + val keyGenerator = + ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)) + .asInstanceOf[SparkKeyGeneratorInterface] + + iter.map { row => + val recordKey = keyGenerator.getRecordKey(row, schema) + val partitionPath = keyGenerator.getPartitionPath(row, schema) + val commitTimestamp = UTF8String.EMPTY_UTF8 + val commitSeqNo = UTF8String.EMPTY_UTF8 + val filename = UTF8String.EMPTY_UTF8 + + // TODO use mutable row, avoid re-allocating + new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false) + } + } + + val dedupedRdd = if (config.shouldCombineBeforeInsert) { + dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config)) + } else { + prependedRdd + } + HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema) } else { - HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, prependedRdd, updatedSchema) + // NOTE: In cases when we're not populating meta-fields we actually don't + // need access to the [[InternalRow]] and therefore can avoid the need + // to dereference [[DataFrame]] into [[RDD]] + val query = df.queryExecution.logical + val metaFieldsStubs = metaFields.map(f => Alias(Literal(UTF8String.EMPTY_UTF8, dataType = StringType), f.name)()) + val prependedQuery = Project(metaFieldsStubs ++ query.output, query) + + HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery) } val trimmedDF = if (shouldDropPartitionColumns) { diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala index edf05f2db2ec..c981cd8113ca 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala @@ -30,6 +30,15 @@ import org.apache.spark.util.MutablePair */ object HoodieUnsafeUtils { + /** + * Creates [[DataFrame]] from provided [[plan]] + * + * @param spark spark's session + * @param plan given plan to wrap into [[DataFrame]] + */ + def createDataFrameFrom(spark: SparkSession, plan: LogicalPlan): DataFrame = + Dataset.ofRows(spark, plan) + /** * Creates [[DataFrame]] from the in-memory [[Seq]] of [[Row]]s with provided [[schema]] * @@ -39,7 +48,6 @@ object HoodieUnsafeUtils { * @param spark spark's session * @param rows collection of rows to base [[DataFrame]] on * @param schema target [[DataFrame]]'s schema - * @return */ def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema: StructType): DataFrame = Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes, rows)) @@ -53,7 +61,6 @@ object HoodieUnsafeUtils { * @param spark spark's session * @param rows collection of rows to base [[DataFrame]] on * @param schema target [[DataFrame]]'s schema - * @return */ def createDataFrameFromInternalRows(spark: SparkSession, rows: Seq[InternalRow], schema: StructType): DataFrame = Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows)) @@ -65,7 +72,6 @@ object HoodieUnsafeUtils { * @param spark spark's session * @param rdd RDD w/ [[Row]]s to base [[DataFrame]] on * @param schema target [[DataFrame]]'s schema - * @return */ def createDataFrameFromRDD(spark: SparkSession, rdd: RDD[InternalRow], schema: StructType): DataFrame = spark.internalCreateDataFrame(rdd, schema) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 97c2c805ccd5..faa6fada1314 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -66,7 +66,6 @@ import scala.collection.JavaConversions._ import scala.collection.JavaConverters.setAsJavaSetConverter import scala.collection.mutable import scala.collection.mutable.ListBuffer -import scala.util.matching.Regex object HoodieSparkSqlWriter { @@ -121,6 +120,7 @@ object HoodieSparkSqlWriter { } val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE)) var operation = WriteOperationType.fromValue(hoodieConfig.getString(OPERATION)) + // TODO clean up // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS is true // Auto-correct the operation to "insert" if OPERATION is set to "upsert" wrongly // or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) . @@ -749,8 +749,7 @@ object HoodieSparkSqlWriter { val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig) if (userDefinedBulkInsertPartitionerOpt.isPresent) { userDefinedBulkInsertPartitionerOpt.get - } - else { + } else { BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig.getBulkInsertSortMode) } } else { @@ -842,7 +841,7 @@ object HoodieSparkSqlWriter { properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key, SPARK_VERSION) properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE)) - //Collect exceptions in list because we want all sync to run. Then we can throw + // Collect exceptions in list because we want all sync to run. Then we can throw val metaSyncExceptions = new ListBuffer[HoodieException]() syncClientToolClassSet.foreach(impl => { try { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index 6a93f0c7afb0..1f8d00953014 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -19,16 +19,15 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.util.ConfigUtils - import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import scala.collection.JavaConverters._ @@ -43,8 +42,8 @@ case class CreateHoodieTableAsSelectCommand( override def innerChildren: Seq[QueryPlan[_]] = Seq(query) override def run(sparkSession: SparkSession): Seq[Row] = { - assert(table.tableType != CatalogTableType.VIEW) - assert(table.provider.isDefined) + checkState(table.tableType != CatalogTableType.VIEW) + checkState(table.provider.isDefined) val hasQueryAsProp = (table.storage.properties ++ table.properties).contains(ConfigUtils.IS_QUERY_AS_RO_TABLE) if (hasQueryAsProp) { @@ -53,11 +52,11 @@ case class CreateHoodieTableAsSelectCommand( val sessionState = sparkSession.sessionState val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) - val tableIdentWithDB = table.identifier.copy(database = Some(db)) - val tableName = tableIdentWithDB.unquotedString + val qualifiedTableIdentifier = table.identifier.copy(database = Some(db)) + val tableName = qualifiedTableIdentifier.unquotedString - if (sessionState.catalog.tableExists(tableIdentWithDB)) { - assert(mode != SaveMode.Overwrite, + if (sessionState.catalog.tableExists(qualifiedTableIdentifier)) { + checkState(mode != SaveMode.Overwrite, s"Expect the table $tableName has been dropped when the save mode is Overwrite") if (mode == SaveMode.ErrorIfExists) { @@ -72,47 +71,44 @@ case class CreateHoodieTableAsSelectCommand( } } - // ReOrder the query which move the partition columns to the last of the project list - val reOrderedQuery = reOrderPartitionColumn(query, table.partitionColumnNames) // Remove some properties should not be used - val newStorage = new CatalogStorageFormat( - table.storage.locationUri, - table.storage.inputFormat, - table.storage.outputFormat, - table.storage.serde, - table.storage.compressed, - table.storage.properties.--(needFilterProps)) - val newTable = table.copy( - identifier = tableIdentWithDB, - storage = newStorage, - schema = reOrderedQuery.schema, - properties = table.properties.--(needFilterProps) + val updatedStorageFormat = table.storage.copy( + properties = table.storage.properties -- needFilterProps) + + val updatedTable = table.copy( + identifier = qualifiedTableIdentifier, + storage = updatedStorageFormat, + // TODO need to add meta-fields here + schema = query.schema, + properties = table.properties -- needFilterProps ) - val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable) + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, updatedTable) val tablePath = hoodieCatalogTable.tableLocation val hadoopConf = sparkSession.sessionState.newHadoopConf() - // Execute the insert query try { - // init hoodie table + // Init hoodie table hoodieCatalogTable.initHoodieTable() - val tblProperties = hoodieCatalogTable.catalogProperties - val options = Map( + val tableProperties = hoodieCatalogTable.catalogProperties + // NOTE: Users might be specifying write-configuration (inadvertently) as options or table properties + // in CTAS, therefore we need to make sure that these are appropriately propagated to the + // write operation + val options = tableProperties ++ Map( HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString, - HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava), - HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(newTable.properties.asJava), + HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tableProperties.asJava), + HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(updatedTable.properties.asJava), DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(), DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true" ) - val partitionSpec = newTable.partitionColumnNames.map((_, None)).toMap - val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, reOrderedQuery, partitionSpec, + val partitionSpec = updatedTable.partitionColumnNames.map((_, None)).toMap + val success = InsertIntoHoodieTableCommand.run(sparkSession, updatedTable, query, partitionSpec, mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options) if (success) { // If write success, create the table in catalog if it has not synced to the // catalog by the meta sync. - if (!sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { + if (!sparkSession.sessionState.catalog.tableExists(qualifiedTableIdentifier)) { // create catalog table for this hoodie table CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, mode == SaveMode.Ignore) } @@ -132,16 +128,4 @@ case class CreateHoodieTableAsSelectCommand( val fs = path.getFileSystem(conf) fs.delete(path, true) } - - private def reOrderPartitionColumn(query: LogicalPlan, - partitionColumns: Seq[String]): LogicalPlan = { - if (partitionColumns.isEmpty) { - query - } else { - val nonPartitionAttrs = query.output.filter(p => !partitionColumns.contains(p.name)) - val partitionAttrs = query.output.filter(p => partitionColumns.contains(p.name)) - val reorderAttrs = nonPartitionAttrs ++ partitionAttrs - Project(reorderAttrs, query) - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 125e8028020b..0228e5ddcf7c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -132,7 +132,6 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi val targetPartitionSchema = catalogTable.partitionSchema val staticPartitionValues = filterStaticPartitionValues(partitionsSpec) - validate(removeMetaFields(query.schema), partitionsSpec, catalogTable) // Make sure we strip out meta-fields from the incoming dataset (these will have to be discarded anyway) val cleanedQuery = stripMetaFields(query) // To validate and align properly output of the query, we simply filter out partition columns with already @@ -144,6 +143,8 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi // positionally for example val expectedQueryColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)) val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, catalogTable, conf) + // After potential reshaping validate that the output of the query conforms to the table's schema + validate(removeMetaFields(coercedQueryOutput.schema), partitionsSpec, catalogTable) val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index e7848320ff35..13800e6c8b8c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -128,7 +128,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { spark.sql(sql) } catch { case e: Throwable => - assertResult(errorMsg)(e.getMessage) + assertResult(errorMsg.trim)(e.getMessage.trim) hasException = true } assertResult(true)(hasException) @@ -139,7 +139,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { try { spark.sql(sql) } catch { - case e: Throwable if e.getMessage.contains(errorMsg) => hasException = true + case e: Throwable if e.getMessage.trim.contains(errorMsg.trim) => hasException = true case f: Throwable => fail("Exception should contain: " + errorMsg + ", error message: " + f.getMessage, f) } assertResult(true)(hasException) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 2fa6b939acbb..4c2bec48a684 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -529,17 +529,19 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | tblproperties (primaryKey = 'id') | partitioned by (dt) """.stripMargin) - checkException(s"insert into $tableName partition(dt = '2021-06-20') select 1, 'a1', 10, '2021-06-20'") ( - "Expected table's schema: " + - "[StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(dt,StringType,true)], " + - "query's output (including static partition values): " + - "[StructField(1,IntegerType,false), StructField(a1,StringType,false), StructField(10,IntegerType,false), StructField(2021-06-20,StringType,false), StructField(dt,StringType,true)]" + checkExceptionContain(s"insert into $tableName partition(dt = '2021-06-20') select 1, 'a1', 10, '2021-06-20'") ( + """ + |too many data columns: + |Table columns: 'id', 'name', 'price' + |Data columns: '1', 'a1', '10', '2021-06-20' + |""".stripMargin ) - checkException(s"insert into $tableName select 1, 'a1', 10")( - "Expected table's schema: " + - "[StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(dt,StringType,true)], " + - "query's output (including static partition values): " + - "[StructField(1,IntegerType,false), StructField(a1,StringType,false), StructField(10,IntegerType,false)]" + checkExceptionContain(s"insert into $tableName select 1, 'a1', 10")( + """ + |not enough data columns: + |Table columns: 'id', 'name', 'price', 'dt' + |Data columns: '1', 'a1', '10' + |""".stripMargin ) spark.sql("set hoodie.sql.bulk.insert.enable = true") spark.sql("set hoodie.sql.insert.mode = strict") diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala index 2672e2c4cbee..9be7198e6d43 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like} +import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, ExplainCommand} @@ -31,8 +32,14 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { expected: Seq[Attribute], query: LogicalPlan, byName: Boolean, - conf: SQLConf): LogicalPlan = - SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName, expected, query, byName) + conf: SQLConf): LogicalPlan = { + // NOTE: We have to apply [[ResolveUpCast]] and [[SimplifyCasts]] rules since by default Spark 2.x will + // always be wrapping matched attributes into [[UpCast]]s which aren't resolvable and render some + // APIs like [[QueryPlan.schema]] unusable + SimplifyCasts.apply( + SimpleAnalyzer.ResolveUpCast.apply( + SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName, expected, query, byName))) + } def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan = ExplainCommand(plan, extended = extended)