Skip to content

Commit

Permalink
[SPARK-48792][SQL] Fix regression for INSERT with partial column list…
Browse files Browse the repository at this point in the history
… to a table with char/varchar

### What changes were proposed in this pull request?

apache#41262 introduced a regression by applying literals with char/varchar type in query output for table insertions, see

https://github.com/apache/spark/pull/41262/files#diff-6e331e8f1c67b5920fb46263b6e582ec6e6a253ee45543559c9692a72a1a40ecR187-R188

This causes bugs

```java
24/07/03 16:29:01 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: [INTERNAL_ERROR] Unsupported data type VarcharType(64). SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
```

```java
org.apache.spark.SparkUnsupportedOperationException: VarcharType(64) is not supported yet.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataTypeUnsupportedYetError(QueryExecutionErrors.scala:993)
	at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.newConverter(OrcSerializer.scala:209)
	at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.$anonfun$converters$2(OrcSerializer.scala:35)
	at scala.collection.immutable.List.map(List.scala:247)
```

### Why are the changes needed?

Bugfix

### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

new tests
### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#47198 from yaooqinn/SPARK-48792.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
yaooqinn authored and ericm-db committed Jul 10, 2024
1 parent e7fee09 commit 9d7d908
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,9 @@ object TableOutputResolver extends SQLConfHelper with Logging {
// TODO: Only DS v1 writing will set it to true. We should enable in for DS v2 as well.
supportColDefaultValue: Boolean = false): LogicalPlan = {

val actualExpectedCols = expected.map { attr =>
attr.withDataType(CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType))
}

if (actualExpectedCols.size < query.output.size) {
if (expected.size < query.output.size) {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
tableName, actualExpectedCols.map(_.name), query.output)
tableName, expected.map(_.name), query.output)
}

val errors = new mutable.ArrayBuffer[String]()
Expand All @@ -100,21 +96,21 @@ object TableOutputResolver extends SQLConfHelper with Logging {
reorderColumnsByName(
tableName,
query.output,
actualExpectedCols,
expected,
conf,
errors += _,
fillDefaultValue = supportColDefaultValue)
} else {
if (actualExpectedCols.size > query.output.size) {
if (expected.size > query.output.size) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
tableName, actualExpectedCols.map(_.name), query.output)
tableName, expected.map(_.name), query.output)
}
resolveColumnsByPosition(tableName, query.output, actualExpectedCols, conf, errors += _)
resolveColumnsByPosition(tableName, query.output, expected, conf, errors += _)
}

if (errors.nonEmpty) {
throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(
tableName, actualExpectedCols.map(_.name).map(toSQLId).mkString(", "))
tableName, expected.map(_.name).map(toSQLId).mkString(", "))
}

if (resolved == query.output) {
Expand Down Expand Up @@ -246,22 +242,25 @@ object TableOutputResolver extends SQLConfHelper with Logging {
case a: Alias => a.withName(expectedName)
case other => other
}
(matchedCol.dataType, expectedCol.dataType) match {
val actualExpectedCol = expectedCol.withDataType {
CharVarcharUtils.getRawType(expectedCol.metadata).getOrElse(expectedCol.dataType)
}
(matchedCol.dataType, actualExpectedCol.dataType) match {
case (matchedType: StructType, expectedType: StructType) =>
resolveStructType(
tableName, matchedCol, matchedType, expectedCol, expectedType,
tableName, matchedCol, matchedType, actualExpectedCol, expectedType,
byName = true, conf, addError, newColPath)
case (matchedType: ArrayType, expectedType: ArrayType) =>
resolveArrayType(
tableName, matchedCol, matchedType, expectedCol, expectedType,
tableName, matchedCol, matchedType, actualExpectedCol, expectedType,
byName = true, conf, addError, newColPath)
case (matchedType: MapType, expectedType: MapType) =>
resolveMapType(
tableName, matchedCol, matchedType, expectedCol, expectedType,
tableName, matchedCol, matchedType, actualExpectedCol, expectedType,
byName = true, conf, addError, newColPath)
case _ =>
checkField(
tableName, expectedCol, matchedCol, byName = true, conf, addError, newColPath)
tableName, actualExpectedCol, matchedCol, byName = true, conf, addError, newColPath)
}
}
}
Expand Down Expand Up @@ -292,34 +291,36 @@ object TableOutputResolver extends SQLConfHelper with Logging {
conf: SQLConf,
addError: String => Unit,
colPath: Seq[String] = Nil): Seq[NamedExpression] = {

if (inputCols.size > expectedCols.size) {
val extraColsStr = inputCols.takeRight(inputCols.size - expectedCols.size)
val actualExpectedCols = expectedCols.map { attr =>
attr.withDataType { CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType) }
}
if (inputCols.size > actualExpectedCols.size) {
val extraColsStr = inputCols.takeRight(inputCols.size - actualExpectedCols.size)
.map(col => toSQLId(col.name))
.mkString(", ")
if (colPath.isEmpty) {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(tableName,
expectedCols.map(_.name), inputCols.map(_.toAttribute))
actualExpectedCols.map(_.name), inputCols.map(_.toAttribute))
} else {
throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
tableName, colPath.quoted, extraColsStr
)
}
} else if (inputCols.size < expectedCols.size) {
val missingColsStr = expectedCols.takeRight(expectedCols.size - inputCols.size)
} else if (inputCols.size < actualExpectedCols.size) {
val missingColsStr = actualExpectedCols.takeRight(actualExpectedCols.size - inputCols.size)
.map(col => toSQLId(col.name))
.mkString(", ")
if (colPath.isEmpty) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(tableName,
expectedCols.map(_.name), inputCols.map(_.toAttribute))
actualExpectedCols.map(_.name), inputCols.map(_.toAttribute))
} else {
throw QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError(
tableName, colPath.quoted, missingColsStr
)
}
}

inputCols.zip(expectedCols).flatMap { case (inputCol, expectedCol) =>
inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) =>
val newColPath = colPath :+ expectedCol.name
(inputCol.dataType, expectedCol.dataType) match {
case (inputType: StructType, expectedType: StructType) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,19 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-48792: Fix INSERT with partial column list to a table with char/varchar") {
assume(format != "foo",
"TODO: TableOutputResolver.resolveOutputColumns supportColDefaultValue is false")
Seq("char", "varchar").foreach { typ =>
withTable("students") {
sql(s"CREATE TABLE students (name $typ(64), address $typ(64)) USING $format")
sql("INSERT INTO students VALUES ('Kent Yao', 'Hangzhou')")
sql("INSERT INTO students (address) VALUES ('<unknown>')")
checkAnswer(sql("SELECT count(*) FROM students"), Row(2))
}
}
}
}

// Some basic char/varchar tests which doesn't rely on table implementation.
Expand Down

0 comments on commit 9d7d908

Please sign in to comment.