Skip to content

Commit

Permalink
[SPARK-44269][SQL] Assign names to the error class _LEGACY_ERROR_TEMP…
Browse files Browse the repository at this point in the history
…_[2310-2314]

### What changes were proposed in this pull request?
The pr aims to assign names to the error class _LEGACY_ERROR_TEMP_[2310-2314].

### Why are the changes needed?
Improve the error framework.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Exists test cases updated and added new test cases.

Closes #41816 from beliefer/SPARK-44269.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
beliefer authored and MaxGekk committed Jul 4, 2023
1 parent b573cca commit 7bc28d5
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 46 deletions.
25 changes: 5 additions & 20 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@
],
"sqlState" : "22003"
},
"CALL_ON_STREAMING_DATASET_UNSUPPORTED" : {
"message" : [
"The method <methodName> can not be called on streaming Dataset/DataFrame."
]
},
"CANNOT_CAST_DATATYPE" : {
"message" : [
"Cannot cast <sourceType> to <targetType>."
Expand Down Expand Up @@ -5609,26 +5614,6 @@
"The input <valueType> '<input>' does not match the given number format: '<format>'."
]
},
"_LEGACY_ERROR_TEMP_2311" : {
"message" : [
"'writeTo' can not be called on streaming Dataset/DataFrame."
]
},
"_LEGACY_ERROR_TEMP_2312" : {
"message" : [
"'write' can not be called on streaming Dataset/DataFrame."
]
},
"_LEGACY_ERROR_TEMP_2313" : {
"message" : [
"Hint not found: <name>."
]
},
"_LEGACY_ERROR_TEMP_2314" : {
"message" : [
"cannot resolve '<sqlExpr>' due to argument data type mismatch: <msg>"
]
},
"_LEGACY_ERROR_TEMP_2315" : {
"message" : [
"cannot resolve '<sqlExpr>' due to data type mismatch: <msg><hint>."
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ Unable to find batch `<batchMetadataFile>`.

`<value1>` `<symbol>` `<value2>` caused overflow.

### CALL_ON_STREAMING_DATASET_UNSUPPORTED

SQLSTATE: none assigned

The method `<methodName>` can not be called on streaming Dataset/DataFrame.

### CANNOT_CAST_DATATYPE

[SQLSTATE: 42846](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
u.origin)

case u: UnresolvedHint =>
u.failAnalysis(
errorClass = "_LEGACY_ERROR_TEMP_2313",
messageParameters = Map("name" -> u.name))
throw SparkException.internalError(
msg = s"Hint not found: ${toSQLId(u.name)}",
context = u.origin.getQueryContext,
summary = u.origin.context.summary)

case command: V2PartitionCommand =>
command.table match {
Expand Down Expand Up @@ -245,10 +246,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
hof.checkArgumentDataTypes() match {
case checkRes: TypeCheckResult.DataTypeMismatch =>
hof.dataTypeMismatch(hof, checkRes)
case TypeCheckResult.TypeCheckFailure(message) =>
hof.failAnalysis(
errorClass = "_LEGACY_ERROR_TEMP_2314",
messageParameters = Map("sqlExpr" -> hof.sql, "msg" -> message))
case checkRes: TypeCheckResult.InvalidFormat =>
hof.setTagValue(INVALID_FORMAT_ERROR, true)
hof.invalidFormat(checkRes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.analysis
import org.scalatest.Assertions._

import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand Down Expand Up @@ -1166,10 +1165,12 @@ class AnalysisErrorSuite extends AnalysisTest {
)
assert(plan.resolved)

val error = intercept[AnalysisException] {
SimpleAnalyzer.checkAnalysis(plan)
}
assert(error.message.contains(s"Hint not found: ${hintName}"))
checkError(
exception = intercept[SparkException] {
SimpleAnalyzer.checkAnalysis(plan)
},
errorClass = "INTERNAL_ERROR",
parameters = Map("message" -> "Hint not found: `some_random_hint_that_does_not_exist`"))

// UnresolvedHint be removed by batch `Remove Unresolved Hints`
assertAnalysisSuccess(plan, true)
Expand Down
8 changes: 4 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4036,8 +4036,8 @@ class Dataset[T] private[sql](
def write: DataFrameWriter[T] = {
if (isStreaming) {
logicalPlan.failAnalysis(
errorClass = "_LEGACY_ERROR_TEMP_2312",
messageParameters = Map.empty)
errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
messageParameters = Map("methodName" -> toSQLId("write")))
}
new DataFrameWriter[T](this)
}
Expand Down Expand Up @@ -4065,8 +4065,8 @@ class Dataset[T] private[sql](
// TODO: streaming could be adapted to use this interface
if (isStreaming) {
logicalPlan.failAnalysis(
errorClass = "_LEGACY_ERROR_TEMP_2311",
messageParameters = Map.empty)
errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
messageParameters = Map("methodName" -> toSQLId("writeTo")))
}
new DataFrameWriterV2[T](table, this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.FakeSourceOne
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -767,4 +768,22 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.partitioning === Seq(BucketTransform(LiteralValue(4, IntegerType),
Seq(FieldReference(Seq("ts", "timezone"))))))
}

test("can not be called on streaming Dataset/DataFrame") {
val ds = MemoryStream[Int].toDS()

checkError(
exception = intercept[AnalysisException] {
ds.write
},
errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
parameters = Map("methodName" -> "`write`"))

checkError(
exception = intercept[AnalysisException] {
ds.writeTo("testcat.table_name")
},
errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
parameters = Map("methodName" -> "`writeTo`"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,16 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
}

test("write cannot be called on streaming datasets") {
val e = intercept[AnalysisException] {
spark.readStream
.format("org.apache.spark.sql.streaming.test")
.load()
.write
.save()
}
Seq("'write'", "not", "streaming Dataset/DataFrame").foreach { s =>
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
}
checkError(
exception = intercept[AnalysisException] {
spark.readStream
.format("org.apache.spark.sql.streaming.test")
.load()
.write
.save()
},
errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
parameters = Map("methodName" -> "`write`"))
}

test("resolve default source") {
Expand Down

0 comments on commit 7bc28d5

Please sign in to comment.