Skip to content

Commit

Permalink
[SPARK-41575][SQL] Assign name to _LEGACY_ERROR_TEMP_2054
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to assign name to _LEGACY_ERROR_TEMP_2054, "TASK_WRITE_FAILED".

### Why are the changes needed?

We should assign proper name to _LEGACY_ERROR_TEMP_*

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`

Closes #39394 from itholic/LEGACY_2054.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
itholic authored and MaxGekk committed Jan 10, 2023
1 parent 8871f6d commit aaee89a
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 39 deletions.
10 changes: 5 additions & 5 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,11 @@
],
"sqlState" : "42000"
},
"TASK_WRITE_FAILED" : {
"message" : [
"Task failed while writing rows to <path>."
]
},
"TEMP_TABLE_OR_VIEW_ALREADY_EXISTS" : {
"message" : [
"Cannot create the temporary view <relationName> because it already exists.",
Expand Down Expand Up @@ -3728,11 +3733,6 @@
"buildReader is not supported for <format>"
]
},
"_LEGACY_ERROR_TEMP_2054" : {
"message" : [
"Task failed while writing rows. <message>"
]
},
"_LEGACY_ERROR_TEMP_2055" : {
"message" : [
"<message>",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
messageParameters = Map("format" -> format))
}

def taskFailedWhileWritingRowsError(cause: Throwable): Throwable = {
def taskFailedWhileWritingRowsError(path: String, cause: Throwable): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2054",
messageParameters = Map("message" -> cause.getMessage),
errorClass = "TASK_WRITE_FAILED",
messageParameters = Map("path" -> path),
cause = cause)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ object FileFormatWriter extends Logging {
// We throw the exception and let Executor throw ExceptionFailure to abort the job.
throw new TaskOutputFileAlreadyExistException(f)
case t: Throwable =>
throw QueryExecutionErrors.taskFailedWhileWritingRowsError(t)
throw QueryExecutionErrors.taskFailedWhileWritingRowsError(description.path, t)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,26 +178,6 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
}
}

test("char/varchar type values length check: partitioned columns of other types") {
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)")
Seq(1, 10, 100, 1000, 10000).foreach { v =>
sql(s"INSERT OVERWRITE t VALUES ('1', $v)")
checkPlainResult(spark.table("t"), typ, v.toString)
sql(s"ALTER TABLE t DROP PARTITION(c=$v)")
checkAnswer(spark.table("t"), Nil)
}

val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE t VALUES ('1', 100000)"))
assert(e1.getCause.getMessage.contains("Exceeds char/varchar type length limitation: 5"))

val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)"))
assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
}
}
}

test("char type values should be padded: nested in struct") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c STRUCT<c: CHAR(5)>) USING $format")
Expand Down Expand Up @@ -332,12 +312,18 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
test("length check for input string values: partitioned columns") {
// DS V2 doesn't support partitioned table.
if (!conf.contains(SQLConf.DEFAULT_CATALOG.key)) {
val tableName = "t"
testTableWrite { typeName =>
sql(s"CREATE TABLE t(i INT, c $typeName(5)) USING $format PARTITIONED BY (c)")
sql("INSERT INTO t VALUES (1, null)")
checkAnswer(spark.table("t"), Row(1, null))
val e = intercept[SparkException](sql("INSERT INTO t VALUES (1, '123456')"))
assert(e.getCause.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
sql(s"CREATE TABLE $tableName(i INT, c $typeName(5)) USING $format PARTITIONED BY (c)")
sql(s"INSERT INTO $tableName VALUES (1, null)")
checkAnswer(spark.table(tableName), Row(1, null))
val e = intercept[SparkException](sql(s"INSERT INTO $tableName VALUES (1, '123456')"))
checkError(
exception = e.getCause.asInstanceOf[SparkException],
errorClass = "TASK_WRITE_FAILED",
parameters = Map("path" -> s".*$tableName.*"),
matchPVals = true
)
}
}
}
Expand Down Expand Up @@ -884,6 +870,32 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa
}
}
}

test("char/varchar type values length check: partitioned columns of other types") {
val tableName = "t"
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable(tableName) {
sql(s"CREATE TABLE $tableName(i STRING, c $typ) USING $format PARTITIONED BY (c)")
Seq(1, 10, 100, 1000, 10000).foreach { v =>
sql(s"INSERT OVERWRITE $tableName VALUES ('1', $v)")
checkPlainResult(spark.table(tableName), typ, v.toString)
sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)")
checkAnswer(spark.table(tableName), Nil)
}

val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)"))
checkError(
exception = e1.getCause.asInstanceOf[SparkException],
errorClass = "TASK_WRITE_FAILED",
parameters = Map("path" -> s".*$tableName"),
matchPVals = true
)

val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)"))
assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
}
}
}
}

class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
Expand All @@ -894,4 +906,24 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
.set("spark.sql.catalog.testcat", classOf[InMemoryPartitionTableCatalog].getName)
.set(SQLConf.DEFAULT_CATALOG.key, "testcat")
}

test("char/varchar type values length check: partitioned columns of other types") {
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)")
Seq(1, 10, 100, 1000, 10000).foreach { v =>
sql(s"INSERT OVERWRITE t VALUES ('1', $v)")
checkPlainResult(spark.table("t"), typ, v.toString)
sql(s"ALTER TABLE t DROP PARTITION(c=$v)")
checkAnswer(spark.table("t"), Nil)
}

val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE t VALUES ('1', 100000)"))
assert(e1.getCause.getMessage.contains("Exceeds char/varchar type length limitation: 5"))

val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)"))
assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2027,27 +2027,33 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}

test("Stop task set if FileAlreadyExistsException was thrown") {
val tableName = "t"
Seq(true, false).foreach { fastFail =>
withSQLConf("fs.file.impl" -> classOf[FileExistingTestFileSystem].getName,
"fs.file.impl.disable.cache" -> "true",
SQLConf.FASTFAIL_ON_FILEFORMAT_OUTPUT.key -> fastFail.toString) {
withTable("t") {
withTable(tableName) {
sql(
"""
|CREATE TABLE t(i INT, part1 INT) USING PARQUET
s"""
|CREATE TABLE $tableName(i INT, part1 INT) USING PARQUET
|PARTITIONED BY (part1)
""".stripMargin)

val df = Seq((1, 1)).toDF("i", "part1")
val err = intercept[SparkException] {
df.write.mode("overwrite").format("parquet").insertInto("t")
df.write.mode("overwrite").format("parquet").insertInto(tableName)
}

if (fastFail) {
assert(err.getMessage.contains("can not write to output file: " +
"org.apache.hadoop.fs.FileAlreadyExistsException"))
} else {
assert(err.getMessage.contains("Task failed while writing rows"))
checkError(
exception = err.getCause.asInstanceOf[SparkException],
errorClass = "TASK_WRITE_FAILED",
parameters = Map("path" -> s".*$tableName"),
matchPVals = true
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.SparkException
import org.apache.spark.sql.execution.command.CharVarcharDDLTestBase
import org.apache.spark.sql.hive.test.TestHiveSingleton

Expand Down Expand Up @@ -73,6 +74,32 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite with TestHiveSinglet
}
}
}

test("char/varchar type values length check: partitioned columns of other types") {
val tableName = "t"
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable(tableName) {
sql(s"CREATE TABLE $tableName(i STRING, c $typ) USING $format PARTITIONED BY (c)")
Seq(1, 10, 100, 1000, 10000).foreach { v =>
sql(s"INSERT OVERWRITE $tableName VALUES ('1', $v)")
checkPlainResult(spark.table(tableName), typ, v.toString)
sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)")
checkAnswer(spark.table(tableName), Nil)
}

val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)"))
checkError(
exception = e1.getCause.asInstanceOf[SparkException],
errorClass = "TASK_WRITE_FAILED",
parameters = Map("path" -> s".*$tableName.*"),
matchPVals = true
)

val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)"))
assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
}
}
}
}

class HiveCharVarcharDDLTestSuite extends CharVarcharDDLTestBase with TestHiveSingleton {
Expand Down

0 comments on commit aaee89a

Please sign in to comment.