Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43742][SQL] Refactor default column value resolution #41262

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,18 @@
},
"INSERT_COLUMN_ARITY_MISMATCH" : {
"message" : [
"<tableName> requires that the data to be inserted have the same number of columns as the target table: target table has <targetColumns> column(s) but the inserted data has <insertedColumns> column(s), including <staticPartCols> partition column(s) having constant value(s)."
"Cannot write to '<tableName>', <reason>:",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unify the errors between v1 and v2 inserts.

"Table columns: <tableColumns>.",
"Data columns: <dataColumns>."
],
"sqlState" : "21S01"
},
"INSERT_PARTITION_COLUMN_ARITY_MISMATCH" : {
"message" : [
"Cannot write to '<tableName>', <reason>:",
"Table columns: <tableColumns>.",
"Partition columns with static values: <staticPartCols>.",
"Data columns: <dataColumns>."
],
"sqlState" : "21S01"
},
Expand Down Expand Up @@ -3173,20 +3184,6 @@
"Cannot resolve column name \"<colName>\" among (<fieldNames>)."
]
},
"_LEGACY_ERROR_TEMP_1202" : {
"message" : [
"Cannot write to '<tableName>', too many data columns:",
"Table columns: <tableColumns>.",
"Data columns: <dataColumns>."
]
},
"_LEGACY_ERROR_TEMP_1203" : {
"message" : [
"Cannot write to '<tableName>', not enough data columns:",
"Table columns: <tableColumns>.",
"Data columns: <dataColumns>."
]
},
"_LEGACY_ERROR_TEMP_1204" : {
"message" : [
"Cannot write incompatible data to table '<tableName>':",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS, CharVarcharUtils, StringUtils}
import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS, CharVarcharUtils}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.{View => _, _}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
Expand All @@ -55,8 +55,7 @@ import org.apache.spark.sql.internal.SQLConf.{PartitionOverwriteMode, StoreAssig
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType.DAY
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
import org.apache.spark.util.collection.{Utils => CUtils}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and
Expand Down Expand Up @@ -280,7 +279,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
KeepLegacyOutputs),
Batch("Resolution", fixedPoint,
new ResolveCatalogs(catalogManager) ::
ResolveUserSpecifiedColumns ::
ResolveInsertInto ::
ResolveRelations ::
ResolvePartitionSpec ::
Expand Down Expand Up @@ -313,7 +311,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
TimeWindowing ::
SessionWindowing ::
ResolveWindowTime ::
ResolveDefaultColumns(ResolveRelations.resolveRelationOrTempView) ::
ResolveInlineTables ::
ResolveLambdaVariables ::
ResolveTimeZone ::
Expand Down Expand Up @@ -1080,7 +1077,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor

def apply(plan: LogicalPlan)
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
case i @ InsertIntoStatement(table, _, _, _, _, _) =>
Copy link
Contributor Author

@cloud-fan cloud-fan May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is needed. We want to resolve the table first, then resolve the column "DEFAULT" in the query. This means we can't wait for the query to be resolved before resolving the table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, with any luck this can help reduce dependencies on rule orderings within the analyzer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove the pattern guard in this code, some operations on the "i.query" will fail later on. I create #44326 to fix

val relation = table match {
case u: UnresolvedRelation if !u.isStreaming =>
resolveRelation(u).getOrElse(u)
Expand Down Expand Up @@ -1278,53 +1275,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}

/** Handle INSERT INTO for DSv2 */
object ResolveInsertInto extends Rule[LogicalPlan] {

/** Add a project to use the table column names for INSERT INTO BY NAME */
private def createProjectForByNameQuery(i: InsertIntoStatement): LogicalPlan = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code here is unchanged but just moved to ResolveInsertionBase

SchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols, resolver)

if (i.userSpecifiedCols.size != i.query.output.size) {
throw QueryCompilationErrors.writeTableWithMismatchedColumnsError(
i.userSpecifiedCols.size, i.query.output.size, i.query)
}
val projectByName = i.userSpecifiedCols.zip(i.query.output)
.map { case (userSpecifiedCol, queryOutputCol) =>
val resolvedCol = i.table.resolve(Seq(userSpecifiedCol), resolver)
.getOrElse(
throw QueryCompilationErrors.unresolvedAttributeError(
"UNRESOLVED_COLUMN", userSpecifiedCol, i.table.output.map(_.name), i.origin))
(queryOutputCol.dataType, resolvedCol.dataType) match {
case (input: StructType, expected: StructType) =>
// Rename inner fields of the input column to pass the by-name INSERT analysis.
Alias(Cast(queryOutputCol, renameFieldsInStruct(input, expected)), resolvedCol.name)()
case _ =>
Alias(queryOutputCol, resolvedCol.name)()
}
}
Project(projectByName, i.query)
}

private def renameFieldsInStruct(input: StructType, expected: StructType): StructType = {
if (input.length == expected.length) {
val newFields = input.zip(expected).map { case (f1, f2) =>
(f1.dataType, f2.dataType) match {
case (s1: StructType, s2: StructType) =>
f1.copy(name = f2.name, dataType = renameFieldsInStruct(s1, s2))
case _ =>
f1.copy(name = f2.name)
}
}
StructType(newFields)
} else {
input
}
}

object ResolveInsertInto extends ResolveInsertionBase {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
AlwaysProcess.fn, ruleId) {
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _)
if i.query.resolved =>
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _) if i.query.resolved =>
// ifPartitionNotExists is append with validation, but validation is not supported
if (i.ifPartitionNotExists) {
throw QueryCompilationErrors.unsupportedIfNotExistsError(r.table.name)
Expand Down Expand Up @@ -1527,6 +1481,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
// Don't wait other rules to resolve the child plans of `InsertIntoStatement` as we need
// to resolve column "DEFAULT" in the child plans so that they must be unresolved.
case i: InsertIntoStatement => ResolveColumnDefaultInInsert(i)

// Wait for other rules to resolve child plans first
case p: LogicalPlan if !p.childrenResolved => p

Expand Down Expand Up @@ -1646,6 +1604,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
// implementation and should be resolved based on the table schema.
o.copy(deleteExpr = resolveExpressionByPlanOutput(o.deleteExpr, o.table))

case u: UpdateTable => ResolveReferencesInUpdate(u)

case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _)
if !m.resolved && targetTable.resolved && sourceTable.resolved =>

Expand Down Expand Up @@ -1796,23 +1756,32 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case MergeResolvePolicy.SOURCE => Project(Nil, mergeInto.sourceTable)
case MergeResolvePolicy.TARGET => Project(Nil, mergeInto.targetTable)
}
resolveMergeExprOrFail(c, resolvePlan)
val resolvedExpr = resolveExprInAssignment(c, resolvePlan)
val withDefaultResolved = if (conf.enableDefaultColumns) {
resolveColumnDefaultInAssignmentValue(
resolvedKey,
resolvedExpr,
QueryCompilationErrors
.defaultReferencesNotAllowedInComplexExpressionsInMergeInsertsOrUpdates())
} else {
resolvedExpr
}
checkResolvedMergeExpr(withDefaultResolved, resolvePlan)
withDefaultResolved
case o => o
}
Assignment(resolvedKey, resolvedValue)
}
}

private def resolveMergeExprOrFail(e: Expression, p: LogicalPlan): Expression = {
val resolved = resolveExpressionByPlanChildren(e, p)
resolved.references.filter { attribute: Attribute =>
!attribute.resolved &&
// We exclude attribute references named "DEFAULT" from consideration since they are
// handled exclusively by the ResolveDefaultColumns analysis rule. That rule checks the
// MERGE command for such references and either replaces each one with a corresponding
// value, or returns a custom error message.
normalizeFieldName(attribute.name) != normalizeFieldName(CURRENT_DEFAULT_COLUMN_NAME)
}.foreach { a =>
val resolved = resolveExprInAssignment(e, p)
checkResolvedMergeExpr(resolved, p)
resolved
}

private def checkResolvedMergeExpr(e: Expression, p: LogicalPlan): Unit = {
e.references.filter(!_.resolved).foreach { a =>
// Note: This will throw error only on unresolved attribute issues,
// not other resolution errors like mismatched data types.
val cols = p.inputSet.toSeq.map(_.sql).mkString(", ")
Expand All @@ -1822,10 +1791,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
"sqlExpr" -> a.sql,
"cols" -> cols))
}
resolved match {
case Alias(child: ExtractValue, _) => child
case other => other
}
}

// Expand the star expression using the input plan first. If failed, try resolve
Expand Down Expand Up @@ -3346,53 +3311,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}
}

/**
* A special rule to reorder columns for DSv1 when users specify a column list in INSERT INTO.
* DSv2 is handled by [[ResolveInsertInto]] separately.
*/
object ResolveUserSpecifiedColumns extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
AlwaysProcess.fn, ruleId) {
case i: InsertIntoStatement if !i.table.isInstanceOf[DataSourceV2Relation] &&
i.table.resolved && i.query.resolved && i.userSpecifiedCols.nonEmpty =>
val resolved = resolveUserSpecifiedColumns(i)
val projection = addColumnListOnQuery(i.table.output, resolved, i.query)
i.copy(userSpecifiedCols = Nil, query = projection)
}

private def resolveUserSpecifiedColumns(i: InsertIntoStatement): Seq[NamedExpression] = {
SchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols, resolver)

i.userSpecifiedCols.map { col =>
i.table.resolve(Seq(col), resolver).getOrElse {
val candidates = i.table.output.map(_.qualifiedName)
val orderedCandidates = StringUtils.orderSuggestedIdentifiersBySimilarity(col, candidates)
throw QueryCompilationErrors
.unresolvedAttributeError("UNRESOLVED_COLUMN", col, orderedCandidates, i.origin)
}
}
}

private def addColumnListOnQuery(
tableOutput: Seq[Attribute],
cols: Seq[NamedExpression],
query: LogicalPlan): LogicalPlan = {
if (cols.size != query.output.size) {
throw QueryCompilationErrors.writeTableWithMismatchedColumnsError(
cols.size, query.output.size, query)
}
val nameToQueryExpr = CUtils.toMap(cols, query.output)
// Static partition columns in the table output should not appear in the column list
// they will be handled in another rule ResolveInsertInto
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fragile to resolve insert column list and static partitions separately. This PR resolves both in PreprocessTableInsertion for v1 insert. Spark already resolves both for v2 inserts in ResolveInsertInto.

val reordered = tableOutput.flatMap { nameToQueryExpr.get(_).orElse(None) }
if (reordered == query.output) {
query
} else {
Project(reordered, query)
}
}
}

private def validateStoreAssignmentPolicy(): Unit = {
// SPARK-28730: LEGACY store assignment policy is disallowed in data source v2.
if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, Expression, GetStructField, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Assignment
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getDefaultValueExprOrNullLit
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{DataType, StructType}
Expand Down Expand Up @@ -103,8 +104,11 @@ object AssignmentUtils extends SQLConfHelper with CastSupport {
case assignment if assignment.key.semanticEquals(attr) => assignment
}
val resolvedValue = if (matchingAssignments.isEmpty) {
errors += s"No assignment for '${attr.name}'"
attr
val defaultExpr = getDefaultValueExprOrNullLit(attr, conf)
if (defaultExpr.isEmpty) {
errors += s"No assignment for '${attr.name}'"
}
defaultExpr.getOrElse(attr)
} else if (matchingAssignments.length > 1) {
val conflictingValuesStr = matchingAssignments.map(_.value.sql).mkString(", ")
errors += s"Multiple assignments for '${attr.name}': $conflictingValuesStr"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
}

def checkAnalysis0(plan: LogicalPlan): Unit = {
// The target table is not a child plan of the insert command. We should report errors for table
// not found first, instead of errors in the input query of the insert command, by doing a
// top-down traversal.
plan.foreach {
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) =>
u.tableNotFound(u.multipartIdentifier)

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] =>
val tblName = write.table.asInstanceOf[UnresolvedRelation].multipartIdentifier
write.table.tableNotFound(tblName)

case _ =>
}

// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
Expand Down Expand Up @@ -195,14 +210,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
errorClass = "_LEGACY_ERROR_TEMP_2313",
messageParameters = Map("name" -> u.name))

case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) =>
u.tableNotFound(u.multipartIdentifier)

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] =>
val tblName = write.table.asInstanceOf[UnresolvedRelation].multipartIdentifier
write.table.tableNotFound(tblName)

case command: V2PartitionCommand =>
command.table match {
case r @ ResolvedTable(_, _, table, _) => table match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ trait ColumnResolutionHelper extends Logging {
allowOuter = allowOuter)
}

def resolveExprInAssignment(expr: Expression, hostPlan: LogicalPlan): Expression = {
resolveExpressionByPlanChildren(expr, hostPlan) match {
// Assignment key and value does not need the alias when resolving nested columns.
case Alias(child: ExtractValue, _) => child
case other => other
}
}

private def resolveExpressionByPlanId(
e: Expression,
q: LogicalPlan): Expression = {
Expand Down
Loading