-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43742][SQL] Refactor default column value resolution #41262
Conversation
*/ | ||
case object ResolveReferencesInInsert extends SQLConfHelper with ColumnResolutionHelper { | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: plan
is always InsertIntoStatement
from the caller side. I also noticed that the input of ResolveReferencesInUpdate
is UpdateTable
. Shall we make them consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will add v2 write commands later. I'll add a TODO here.
* 2. The plan nodes between [[Project]]/[[UnresolvedInlineTable]] and [[InsertIntoStatement]] are | ||
* all unary nodes that inherit the output columns from its child. | ||
*/ | ||
case object ResolveReferencesInInsert extends SQLConfHelper with ColumnResolutionHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ResolveColumnDefaultInInsert
?
LGTM, pending on tests. |
@@ -201,30 +201,6 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { | |||
} | |||
} | |||
|
|||
test("insert with column list - missing columns") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing columns should not fail and we test it in ResolveDefaultColumnsSuite
options = CaseInsensitiveStringMap.empty) | ||
} | ||
|
||
test("SPARK-43313: Add missing default values for MERGE INSERT actions") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MERGE/UPDATE are tested in Align[Update|Merge]AssignmentsSuite
* all unary nodes that inherit the output columns from its child. | ||
*/ | ||
case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper { | ||
// TODO: support v2 write commands as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind, please file a JIRA and use the IDed TODO here.
object ResolveInsertInto extends Rule[LogicalPlan] { | ||
|
||
/** Add a project to use the table column names for INSERT INTO BY NAME */ | ||
private def createProjectForByNameQuery(i: InsertIntoStatement): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code here is unchanged but just moved to ResolveInsertionBase
@@ -785,7 +785,18 @@ | |||
}, | |||
"INSERT_COLUMN_ARITY_MISMATCH" : { | |||
"message" : [ | |||
"<tableName> requires that the data to be inserted have the same number of columns as the target table: target table has <targetColumns> column(s) but the inserted data has <insertedColumns> column(s), including <staticPartCols> partition column(s) having constant value(s)." | |||
"Cannot write to '<tableName>', <reason>:", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unify the errors between v1 and v2 inserts.
@@ -1080,7 +1077,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor | |||
|
|||
def apply(plan: LogicalPlan) | |||
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) { | |||
case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved => | |||
case i @ InsertIntoStatement(table, _, _, _, _, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is needed. We want to resolve the table first, then resolve the column "DEFAULT" in the query. This means we can't wait for the query to be resolved before resolving the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, with any luck this can help reduce dependencies on rule orderings within the analyzer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we remove the pattern guard in this code, some operations on the "i.query" will fail later on. I create #44326 to fix
} | ||
val nameToQueryExpr = CUtils.toMap(cols, query.output) | ||
// Static partition columns in the table output should not appear in the column list | ||
// they will be handled in another rule ResolveInsertInto |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fragile to resolve insert column list and static partitions separately. This PR resolves both in PreprocessTableInsertion
for v1 insert. Spark already resolves both for v2 inserts in ResolveInsertInto
.
@@ -36,7 +37,9 @@ object TableOutputResolver { | |||
expected: Seq[Attribute], | |||
query: LogicalPlan, | |||
byName: Boolean, | |||
conf: SQLConf): LogicalPlan = { | |||
conf: SQLConf, | |||
// TODO: Only DS v1 writing will set it to true. We should enable in for DS v2 as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an existing issue that the default column value doesn't work for v2 inserts. I decided to fix it later as it needs to update quite some v2 tests.
@@ -164,7 +164,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { | |||
InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite) | |||
|
|||
case i @ InsertIntoStatement( | |||
l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, _, query, overwrite, _) => | |||
l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, _, query, overwrite, _) | |||
if query.resolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed due to https://github.com/apache/spark/pull/41262/files#r1204075426 . Now it's possible that table is resolved but the query is not.
fragment = s"INSERT INTO $t1(data, data)", | ||
start = 0, | ||
stop = 26)) | ||
errorClass = "COLUMN_ALREADY_EXISTS", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error was changed by 9f0bf51 and now it's restored. I think it's more accurate to report column already exists error rather than inline table error.
|WHEN NOT MATCHED THEN | ||
| INSERT (i, txt) VALUES (src.i, src.txt) | ||
|""".stripMargin, | ||
"No assignment for 'l'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have tests to make sure missing columns will be filled with default values, e.g. https://github.com/apache/spark/pull/41262/files#diff-960688d2ad5179d1592810c50de1a163364c01c5f164bbded5d0d0dce05b39fdR859
None) if cast1.getTagValue(Cast.BY_TABLE_INSERTION).isDefined && | ||
cast2.getTagValue(Cast.BY_TABLE_INSERTION).isDefined => | ||
Assignment(i: AttributeReference, Literal(null, IntegerType)), | ||
Assignment(s: AttributeReference, Literal(null, StringType))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now we create null literal with the expected data type directly.
@@ -1143,38 +1140,34 @@ class PlanResolutionSuite extends AnalysisTest { | |||
|
|||
checkError( | |||
exception = intercept[AnalysisException] { | |||
parseAndResolve(sql8) | |||
parseAndResolve(sql8, checkAnalysis = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql8
is s"UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT WHERE i=DEFAULT"
I think it's almost impossible to find out all the improper places for hosting the column "DEFAULT", e.g. how about the UPDATE/MERGE assignment key? Other operators like Sort? This PR only checks the nested column "DEFAULT" and fails. If the column "DEFAULT" appears in improper places, we won't resolve it and users will hit unresolved column error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds reasonable, we can leave this test here, but we don't have to exhaustively cover all the cases.
|WHEN NOT MATCHED AND (target.s=DEFAULT) | ||
| THEN INSERT (target.i, target.s) values (source.i, source.s) | ||
|WHEN NOT MATCHED BY SOURCE AND (target.s='delete') THEN DELETE | ||
|WHEN NOT MATCHED BY SOURCE AND (target.s='update') THEN UPDATE SET target.s = target.i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simplify the MERGE statement to focus on missing cols.
@@ -142,17 +142,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { | |||
) | |||
} | |||
|
|||
test("SELECT clause generating a different number of columns is not allowed.") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be allowed. According to the tests the default value resolution is not triggered in some cases before this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we keep the test, but change its result to assert that it succeeds? or is this behavior exercised elsewhere in this test file?
@@ -863,42 +844,39 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { | |||
} | |||
|
|||
test("Allow user to insert specified columns into insertable view") { | |||
withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this conf setting as it's true by default.
@@ -1258,13 +1258,11 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd | |||
"""INSERT INTO TABLE dp_test PARTITION(dp) | |||
|SELECT key, value, key % 5 FROM src""".stripMargin) | |||
}, | |||
errorClass = "INSERT_COLUMN_ARITY_MISMATCH", | |||
sqlState = "21S01", | |||
errorClass = "_LEGACY_ERROR_TEMP_1169", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more accurate to report that the partition column list in INSERT does not match the actual table partition columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this! Took one initial review pass.
@@ -1080,7 +1077,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor | |||
|
|||
def apply(plan: LogicalPlan) | |||
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) { | |||
case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved => | |||
case i @ InsertIntoStatement(table, _, _, _, _, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, with any luck this can help reduce dependencies on rule orderings within the analyzer.
def apply(plan: LogicalPlan): LogicalPlan = plan match { | ||
case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved && | ||
i.query.containsPattern(UNRESOLVED_ATTRIBUTE) => | ||
val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit hard to read, can we split the transformations into different lines with vals, and use an explicit name instead of _2 to refer to the column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InsertIntoStatement#partitionSpec
is Map[String, Option[String]]
, and in Scala we can only use _2
to refer to the map value.
case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved && | ||
i.query.containsPattern(UNRESOLVED_ATTRIBUTE) => | ||
val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet | ||
val expectedQuerySchema = i.table.schema.filter { field => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have a brief comment saying what this is?
case p: Project if acceptProject && p.child.resolved && | ||
p.containsPattern(UNRESOLVED_ATTRIBUTE) && | ||
p.projectList.length <= expectedQuerySchema.length => | ||
val newProjectList = p.projectList.zipWithIndex.map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have some comment here describing the logic of adding new unresolved attributes referring to "DEFAULT" if the provided query has fewer columns than the target table, or else converting such existing unresolved attributes to their corresponding values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add doc for the resolveColumnDefault
method.
exprs.zipWithIndex.map { | ||
case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) => | ||
val field = expectedQuerySchema(i) | ||
getDefaultValueExpr(field).getOrElse(Literal(null, field.dataType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could integrate the Literal(null)
part into getDefaultValueExpr
since we want to use the NULL value if the default metadata is not present in every case. Or is this getDefaultValueExprOrNullLiteral
, which we can use instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a subtle difference: For missing cols, the default null value is optional (controlled by a flag). For the column "DEFAULT", it's a new feature when we add default value support and we can always use null as the default value if it's not defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: should we add a boolean argument to getDefaultValueExprOrNullLiteral
to switch the behavior between the two modes?
resolvedKey match { | ||
case attr: AttributeReference if conf.enableDefaultColumns => | ||
resolved match { | ||
case u: UnresolvedAttribute if isExplicitDefaultColumn(u) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same, let's add a comment here mentioning that we're looking for unresolved attribute references to "DEFAULT" and replacing them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this! Took one initial review pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed carefully through the whole PR again, the logic and testing looks good. For any tables with capability ACCEPT_ANY_SCHEMA, we will bypass all this logic and the rest of the work is up to custom logic for those tables. We might have to duplicate some of this if any of those tables want to support default column values. But that sounds fair given the intended meaning of this capability.
p.projectList.length <= expectedQuerySchema.length => | ||
val newProjectList = p.projectList.zipWithIndex.map { | ||
case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) => | ||
val field = expectedQuerySchema(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: when I wrote the original ResolveDefaultColumns
rule, I named this variable insertTableSchemaWithoutPartitionColumns
because I found myself confused frequently when reading the variable name. We could name this insertTargetTableSchema
to clarify this, or insertTargetTableSchemaWithoutPartitionColumns
if you don't think that's too verbose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tableSchema
is not very accurate, and neither does insertTargetTableSchemaWithoutPartitionColumns
. It's actually table schema excluding partition columns with static values.
That's why I choose expectedQuerySchema
. People can read comments of the caller of this function to understand how we define the expected query schema.
|
||
case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject => | ||
plan.mapChildren( | ||
resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: It looks like the only purpose of acceptInlineTable
is setting it to false here in the event of a LIMIT and/or OFFSET and/or ORDER BY on top of a VALUES list. Do you think this check is strictly necessary? If not, we can simplify by removing acceptInlineTable
as an argument to this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's necessary but just want to keep the old behavior. Let me remove it.
exprs.zipWithIndex.map { | ||
case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) => | ||
val field = expectedQuerySchema(i) | ||
getDefaultValueExpr(field).getOrElse(Literal(null, field.dataType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: should we add a boolean argument to getDefaultValueExprOrNullLiteral
to switch the behavior between the two modes?
@@ -1143,38 +1140,34 @@ class PlanResolutionSuite extends AnalysisTest { | |||
|
|||
checkError( | |||
exception = intercept[AnalysisException] { | |||
parseAndResolve(sql8) | |||
parseAndResolve(sql8, checkAnalysis = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds reasonable, we can leave this test here, but we don't have to exhaustively cover all the cases.
@@ -142,17 +142,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { | |||
) | |||
} | |||
|
|||
test("SELECT clause generating a different number of columns is not allowed.") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we keep the test, but change its result to assert that it succeeds? or is this behavior exercised elsewhere in this test file?
} | ||
withTable("t") { | ||
sql("create table t(i boolean default true, s bigint) using parquet") | ||
assert(intercept[AnalysisException] { | ||
sql("insert into t (i) values (default)") | ||
}.getMessage.contains(addOneColButExpectedTwo)) | ||
}.getMessage.contains("Cannot find data for output column 's'")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we dedup this expected error message substring into one place, or even better, use checkError
to assert on the error class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Merged to master for Apache Spark 3.5.0. |
### What changes were proposed in this pull request? This PR refactors the default column value resolution so that we don't need an extra DS v2 API for external v2 sources. The general idea is to split the default column value resolution into two parts: 1. resolve the column "DEFAULT" to the column default expression. This applies to `Project`/`UnresolvedInlineTable` under `InsertIntoStatement`, and assignment expressions in `UpdateTable`/`MergeIntoTable`. 2. fill missing columns with column default values for the input query. This does not apply to UPDATE and non-INSERT action of MERGE as they use the column from the target table as the default value. The first part should be done for all the data sources, as it's part of column resolution. The second part should not be applied to v2 data sources with `ACCEPT_ANY_SCHEMA`, as they are free to define how to handle missing columns. More concretely, this PR: 1. put the column "DEFAULT" resolution logic in the rule `ResolveReferences`, with two new virtual rules. This is to follow apache#38888 2. put the missing column handling in `TableOutputResolver`, which is shared by both the v1 and v2 insertion resolution rule. External v2 data sources can add custom catalyst rules to deal with missing columns for themselves. 3. Remove the old rule `ResolveDefaultColumns`. Note that, with the refactor, we no long need to manually look up the table. We will deal with column default values after the target table of INSERT/UPDATE/MERGE is resolved. 4. Remove the rule `ResolveUserSpecifiedColumns` and merge it to `PreprocessTableInsertion`. These two rules are both to resolve v1 insertion, and it's tricky to reason about their interactions. It's clearer to resolve the insertion with one pass. ### Why are the changes needed? code cleanup and remove unneeded DS v2 API. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? updated tests Closes apache#41262 from cloud-fan/def-val. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… for non-nullable columns ### What changes were proposed in this pull request? A followup of #41262 to fix a mistake. If a column has no default value and is not nullable, we should fail if people want to use its default value via the explicit `DEFAULT` name, and do not fill missing columns in INSERT. ### Why are the changes needed? fix a wrong behavior ### Does this PR introduce _any_ user-facing change? yes, otherwise the DML command will fail later at runtime. ### How was this patch tested? new tests Closes #41656 from cloud-fan/def-val. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? In the PR, I propose to raise an error when an user uses V1 `INSERT` without a list of columns, and the number of inserting columns doesn't match to the number of actual table columns. At the moment Spark inserts data successfully in such case after the PR #41262 which changed the behaviour of Spark 3.4.x. ### Why are the changes needed? 1. To conform the SQL standard which requires the number of columns must be the same: ![Screenshot 2023-08-07 at 11 01 27 AM](https://github.com/apache/spark/assets/1580697/c55badec-5716-490f-a83a-0bb6b22c84c7) Apparently, the insertion below must not succeed: ```sql spark-sql (default)> CREATE TABLE tabtest(c1 INT, c2 INT); spark-sql (default)> INSERT INTO tabtest SELECT 1; ``` 2. To have the same behaviour as **Spark 3.4**: ```sql spark-sql (default)> INSERT INTO tabtest SELECT 1; `spark_catalog`.`default`.`tabtest` requires that the data to be inserted have the same number of columns as the target table: target table has 2 column(s) but the inserted data has 1 column(s), including 0 partition column(s) having constant value(s). ``` ### Does this PR introduce _any_ user-facing change? Yes. After the changes: ```sql spark-sql (default)> INSERT INTO tabtest SELECT 1; [INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`tabtest`, the reason is not enough data columns: Table columns: `c1`, `c2`. Data columns: `1`. ``` ### How was this patch tested? By running the modified tests: ``` $ build/sbt "test:testOnly *InsertSuite" $ build/sbt "test:testOnly *ResolveDefaultColumnsSuite" $ build/sbt -Phive "test:testOnly *HiveQuerySuite" ``` Closes #42393 from MaxGekk/fix-num-cols-insert. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request? In the PR, I propose to raise an error when an user uses V1 `INSERT` without a list of columns, and the number of inserting columns doesn't match to the number of actual table columns. At the moment Spark inserts data successfully in such case after the PR #41262 which changed the behaviour of Spark 3.4.x. ### Why are the changes needed? 1. To conform the SQL standard which requires the number of columns must be the same: ![Screenshot 2023-08-07 at 11 01 27 AM](https://github.com/apache/spark/assets/1580697/c55badec-5716-490f-a83a-0bb6b22c84c7) Apparently, the insertion below must not succeed: ```sql spark-sql (default)> CREATE TABLE tabtest(c1 INT, c2 INT); spark-sql (default)> INSERT INTO tabtest SELECT 1; ``` 2. To have the same behaviour as **Spark 3.4**: ```sql spark-sql (default)> INSERT INTO tabtest SELECT 1; `spark_catalog`.`default`.`tabtest` requires that the data to be inserted have the same number of columns as the target table: target table has 2 column(s) but the inserted data has 1 column(s), including 0 partition column(s) having constant value(s). ``` ### Does this PR introduce _any_ user-facing change? Yes. After the changes: ```sql spark-sql (default)> INSERT INTO tabtest SELECT 1; [INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`tabtest`, the reason is not enough data columns: Table columns: `c1`, `c2`. Data columns: `1`. ``` ### How was this patch tested? By running the modified tests: ``` $ build/sbt "test:testOnly *InsertSuite" $ build/sbt "test:testOnly *ResolveDefaultColumnsSuite" $ build/sbt -Phive "test:testOnly *HiveQuerySuite" ``` Closes #42393 from MaxGekk/fix-num-cols-insert. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com> (cherry picked from commit a7eef21) Signed-off-by: Max Gekk <max.gekk@gmail.com>
addError(s"Cannot find data for output column '${newColPath.quoted}'") | ||
None | ||
val defaultExpr = if (fillDefaultValue) { | ||
getDefaultValueExprOrNullLit(expectedCol, conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pollutes the expressions with unreplaced char/varchar and could result in bugs
… to a table with char/varchar ### What changes were proposed in this pull request? #41262 introduced a regression by applying literals with char/varchar type in query output for table insertions, see https://github.com/apache/spark/pull/41262/files#diff-6e331e8f1c67b5920fb46263b6e582ec6e6a253ee45543559c9692a72a1a40ecR187-R188 This causes bugs ```java 24/07/03 16:29:01 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: [INTERNAL_ERROR] Unsupported data type VarcharType(64). SQLSTATE: XX000 at org.apache.spark.SparkException$.internalError(SparkException.scala:92) at org.apache.spark.SparkException$.internalError(SparkException.scala:96) ``` ```java org.apache.spark.SparkUnsupportedOperationException: VarcharType(64) is not supported yet. at org.apache.spark.sql.errors.QueryExecutionErrors$.dataTypeUnsupportedYetError(QueryExecutionErrors.scala:993) at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.newConverter(OrcSerializer.scala:209) at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.$anonfun$converters$2(OrcSerializer.scala:35) at scala.collection.immutable.List.map(List.scala:247) ``` ### Why are the changes needed? Bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #47198 from yaooqinn/SPARK-48792. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
… to a table with char/varchar ### What changes were proposed in this pull request? apache#41262 introduced a regression by applying literals with char/varchar type in query output for table insertions, see https://github.com/apache/spark/pull/41262/files#diff-6e331e8f1c67b5920fb46263b6e582ec6e6a253ee45543559c9692a72a1a40ecR187-R188 This causes bugs ```java 24/07/03 16:29:01 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: [INTERNAL_ERROR] Unsupported data type VarcharType(64). SQLSTATE: XX000 at org.apache.spark.SparkException$.internalError(SparkException.scala:92) at org.apache.spark.SparkException$.internalError(SparkException.scala:96) ``` ```java org.apache.spark.SparkUnsupportedOperationException: VarcharType(64) is not supported yet. at org.apache.spark.sql.errors.QueryExecutionErrors$.dataTypeUnsupportedYetError(QueryExecutionErrors.scala:993) at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.newConverter(OrcSerializer.scala:209) at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.$anonfun$converters$2(OrcSerializer.scala:35) at scala.collection.immutable.List.map(List.scala:247) ``` ### Why are the changes needed? Bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#47198 from yaooqinn/SPARK-48792. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
… to a table with char/varchar ### What changes were proposed in this pull request? apache#41262 introduced a regression by applying literals with char/varchar type in query output for table insertions, see https://github.com/apache/spark/pull/41262/files#diff-6e331e8f1c67b5920fb46263b6e582ec6e6a253ee45543559c9692a72a1a40ecR187-R188 This causes bugs ```java 24/07/03 16:29:01 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: [INTERNAL_ERROR] Unsupported data type VarcharType(64). SQLSTATE: XX000 at org.apache.spark.SparkException$.internalError(SparkException.scala:92) at org.apache.spark.SparkException$.internalError(SparkException.scala:96) ``` ```java org.apache.spark.SparkUnsupportedOperationException: VarcharType(64) is not supported yet. at org.apache.spark.sql.errors.QueryExecutionErrors$.dataTypeUnsupportedYetError(QueryExecutionErrors.scala:993) at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.newConverter(OrcSerializer.scala:209) at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.$anonfun$converters$2(OrcSerializer.scala:35) at scala.collection.immutable.List.map(List.scala:247) ``` ### Why are the changes needed? Bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#47198 from yaooqinn/SPARK-48792. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
What changes were proposed in this pull request?
This PR refactors the default column value resolution so that we don't need an extra DS v2 API for external v2 sources. The general idea is to split the default column value resolution into two parts:
Project
/UnresolvedInlineTable
underInsertIntoStatement
, and assignment expressions inUpdateTable
/MergeIntoTable
.The first part should be done for all the data sources, as it's part of column resolution. The second part should not be applied to v2 data sources with
ACCEPT_ANY_SCHEMA
, as they are free to define how to handle missing columns.More concretely, this PR:
ResolveReferences
, with two new virtual rules. This is to follow [SPARK-41405][SQL] Centralize the column resolution logic #38888TableOutputResolver
, which is shared by both the v1 and v2 insertion resolution rule. External v2 data sources can add custom catalyst rules to deal with missing columns for themselves.ResolveDefaultColumns
. Note that, with the refactor, we no long need to manually look up the table. We will deal with column default values after the target table of INSERT/UPDATE/MERGE is resolved.ResolveUserSpecifiedColumns
and merge it toPreprocessTableInsertion
. These two rules are both to resolve v1 insertion, and it's tricky to reason about their interactions. It's clearer to resolve the insertion with one pass.Why are the changes needed?
code cleanup and remove unneeded DS v2 API.
Does this PR introduce any user-facing change?
No
How was this patch tested?
updated tests