Skip to content

Commit

Permalink
[SPARK-43438][SQL] Error on missing input columns in INSERT
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
In the PR, I propose to raise an error when an user uses V1 `INSERT` without a list of columns, and the number of inserting columns doesn't match to the number of actual table columns.

At the moment Spark inserts data successfully in such case after the PR #41262 which changed the behaviour of Spark 3.4.x.

### Why are the changes needed?
1. To conform the SQL standard which requires the number of columns must be the same:
![Screenshot 2023-08-07 at 11 01 27 AM](https://github.com/apache/spark/assets/1580697/c55badec-5716-490f-a83a-0bb6b22c84c7)

Apparently, the insertion below must not succeed:
```sql
spark-sql (default)> CREATE TABLE tabtest(c1 INT, c2 INT);
spark-sql (default)> INSERT INTO tabtest SELECT 1;
```

2. To have the same behaviour as **Spark 3.4**:
```sql
spark-sql (default)> INSERT INTO tabtest SELECT 1;
`spark_catalog`.`default`.`tabtest` requires that the data to be inserted have the same number of columns as the target table: target table has 2 column(s) but the inserted data has 1 column(s), including 0 partition column(s) having constant value(s).
```

### Does this PR introduce _any_ user-facing change?
Yes.

After the changes:
```sql
spark-sql (default)> INSERT INTO tabtest SELECT 1;
[INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`tabtest`, the reason is not enough data columns:
Table columns: `c1`, `c2`.
Data columns: `1`.
```

### How was this patch tested?
By running the modified tests:
```
$ build/sbt "test:testOnly *InsertSuite"
$ build/sbt "test:testOnly *ResolveDefaultColumnsSuite"
$ build/sbt -Phive "test:testOnly *HiveQuerySuite"
```

Closes #42393 from MaxGekk/fix-num-cols-insert.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
MaxGekk committed Aug 29, 2023
1 parent 8505084 commit a7eef21
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,11 @@ object TableOutputResolver {
errors += _,
fillDefaultValue = supportColDefaultValue)
} else {
// If the target table needs more columns than the input query, fill them with
// the columns' default values, if the `supportColDefaultValue` parameter is true.
val fillDefaultValue = supportColDefaultValue && actualExpectedCols.size > query.output.size
val queryOutputCols = if (fillDefaultValue) {
query.output ++ actualExpectedCols.drop(query.output.size).flatMap { expectedCol =>
getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues)
}
} else {
query.output
}
if (actualExpectedCols.size > queryOutputCols.size) {
if (actualExpectedCols.size > query.output.size) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
tableName, actualExpectedCols.map(_.name), query)
}

resolveColumnsByPosition(tableName, queryOutputCols, actualExpectedCols, conf, errors += _)
resolveColumnsByPosition(tableName, query.output, actualExpectedCols, conf, errors += _)
}

if (errors.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,11 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
}
val newQuery = try {
TableOutputResolver.resolveOutputColumns(
tblName, expectedColumns, query, byName = hasColumnList || insert.byName, conf,
tblName,
expectedColumns,
query,
byName = hasColumnList || insert.byName,
conf,
supportColDefaultValue = true)
} catch {
case e: AnalysisException if staticPartCols.nonEmpty &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {

// INSERT without user-defined columns
sql("truncate table t")
sql("insert into t values (timestamp'2020-12-31')")
checkAnswer(spark.table("t"),
sql("select timestamp'2020-12-31', null").collect().head)
checkError(
exception = intercept[AnalysisException] {
sql("insert into t values (timestamp'2020-12-31')")
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
"tableName" -> "`spark_catalog`.`default`.`t`",
"tableColumns" -> "`c1`, `c2`",
"dataColumns" -> "`col1`"))
}
}

Expand All @@ -57,18 +63,31 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {

// INSERT without user-defined columns
sql("truncate table t")
sql("insert into t values (timestamp'2020-12-31')")
checkAnswer(spark.table("t"),
sql("select timestamp'2020-12-31', timestamp'2020-01-01'").collect().head)
checkError(
exception = intercept[AnalysisException] {
sql("insert into t values (timestamp'2020-12-31')")
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
"tableName" -> "`spark_catalog`.`default`.`t`",
"tableColumns" -> "`c1`, `c2`",
"dataColumns" -> "`col1`"))
}
}

test("INSERT into partitioned tables") {
sql("create table t(c1 int, c2 int, c3 int, c4 int) using parquet partitioned by (c3, c4)")

// INSERT without static partitions
sql("insert into t values (1, 2, 3)")
checkAnswer(spark.table("t"), Row(1, 2, 3, null))
checkError(
exception = intercept[AnalysisException] {
sql("insert into t values (1, 2, 3)")
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
"tableName" -> "`spark_catalog`.`default`.`t`",
"tableColumns" -> "`c1`, `c2`, `c3`, `c4`",
"dataColumns" -> "`col1`, `col2`, `col3`"))

// INSERT without static partitions but with column list
sql("truncate table t")
Expand All @@ -77,8 +96,16 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {

// INSERT with static partitions
sql("truncate table t")
sql("insert into t partition(c3=3, c4=4) values (1)")
checkAnswer(spark.table("t"), Row(1, null, 3, 4))
checkError(
exception = intercept[AnalysisException] {
sql("insert into t partition(c3=3, c4=4) values (1)")
},
errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH",
parameters = Map(
"tableName" -> "`spark_catalog`.`default`.`t`",
"tableColumns" -> "`c1`, `c2`, `c3`, `c4`",
"dataColumns" -> "`col1`",
"staticPartCols" -> "`c3`, `c4`"))

// INSERT with static partitions and with column list
sql("truncate table t")
Expand All @@ -87,8 +114,16 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {

// INSERT with partial static partitions
sql("truncate table t")
sql("insert into t partition(c3=3, c4) values (1, 2)")
checkAnswer(spark.table("t"), Row(1, 2, 3, null))
checkError(
exception = intercept[AnalysisException] {
sql("insert into t partition(c3=3, c4) values (1, 2)")
},
errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH",
parameters = Map(
"tableName" -> "`spark_catalog`.`default`.`t`",
"tableColumns" -> "`c1`, `c2`, `c3`, `c4`",
"dataColumns" -> "`col1`, `col2`",
"staticPartCols" -> "`c3`"))

// INSERT with partial static partitions and with column list is not allowed
intercept[AnalysisException](sql("insert into t partition(c3=3, c4) (c1) values (1, 4)"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,11 +962,15 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
(1 to 10).map(i => Row(i, null))
)

sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
checkAnswer(
sql("SELECT a, b FROM jsonTable"),
(1 to 10).map(i => Row(i, null))
)
checkError(
exception = intercept[AnalysisException] {
sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
"tableName" -> "`unknown`",
"tableColumns" -> "`a`, `b`",
"dataColumns" -> "`a`"))

sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt")
checkAnswer(
Expand Down Expand Up @@ -1027,7 +1031,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}
withTable("t") {
sql("create table t(i int, s bigint default 42, x bigint) using parquet")
sql("insert into t values(1)")
sql("insert into t(i) values(1)")
checkAnswer(spark.table("t"), Row(1, 42L, null))
}
// The table has a partitioning column and a default value is injected.
Expand Down Expand Up @@ -1495,7 +1499,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
sql(createTableIntCol)
sql("alter table t add column s bigint default 42")
sql("alter table t add column x bigint")
sql("insert into t values(1)")
sql("insert into t(i) values(1)")
checkAnswer(spark.table("t"), Row(1, 42, null))
}
// The table has a partitioning column and a default value is injected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12")

// The data is missing a column. The default value for the missing column is null.
sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13")
sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) (a) SELECT 13")

// c is defined twice. Analyzer will complain.
intercept[ParseException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1258,11 +1258,11 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
"""INSERT INTO TABLE dp_test PARTITION(dp)
|SELECT key, value, key % 5 FROM src""".stripMargin)
},
errorClass = "_LEGACY_ERROR_TEMP_1169",
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
"tableName" -> "`spark_catalog`.`default`.`dp_test`",
"normalizedPartSpec" -> "dp",
"partColNames" -> "dp,sp"))
"tableColumns" -> "`key`, `value`, `dp`, `sp`",
"dataColumns" -> "`key`, `value`, `(key % 5)`"))

sql("SET hive.exec.dynamic.partition.mode=nonstrict")

Expand Down

0 comments on commit a7eef21

Please sign in to comment.