Skip to content

Commit

Permalink
[SPARK-44555][SQL] Use checkError() to check Exception in command Sui…
Browse files Browse the repository at this point in the history
…te & assign some error class names

### What changes were proposed in this pull request?
The pr aims to
1. Use `checkError()` to check Exception in `command` Suite.
2. Assign some error class names, include: `UNSUPPORTED_FEATURE.PURGE_PARTITION` and `UNSUPPORTED_FEATURE.PURGE_TABLE`.

### Why are the changes needed?
The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Manually test.
- Pass GA.

Closes apache#42169 from panbingkun/checkError_for_command.

Authored-by: panbingkun <pbk1982@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
panbingkun authored and ragnarok56 committed Mar 2, 2024
1 parent 9497342 commit 0b5bb10
Show file tree
Hide file tree
Showing 22 changed files with 172 additions and 85 deletions.
10 changes: 10 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3020,6 +3020,16 @@
"Pivoting by the value '<value>' of the column data type <type>."
]
},
"PURGE_PARTITION" : {
"message" : [
"Partition purge."
]
},
"PURGE_TABLE" : {
"message" : [
"Purge table."
]
},
"PYTHON_UDF_IN_ON_CLAUSE" : {
"message" : [
"Python UDF in the ON clause of a <joinType> JOIN. In case of an INNNER JOIN consider rewriting to a CROSS JOIN with a WHERE clause."
Expand Down
8 changes: 8 additions & 0 deletions docs/sql-error-conditions-unsupported-feature-error-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ PIVOT clause following a GROUP BY clause. Consider pushing the GROUP BY into a s

Pivoting by the value '`<value>`' of the column data type `<type>`.

## PURGE_PARTITION

Partition purge.

## PURGE_TABLE

Purge table.

## PYTHON_UDF_IN_ON_CLAUSE

Python UDF in the ON clause of a `<joinType>` JOIN. In case of an INNNER JOIN consider rewriting to a CROSS JOIN with a WHERE clause.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;
import org.apache.spark.sql.errors.QueryExecutionErrors;

/**
* An atomic partition interface of {@link Table} to operate multiple partitions atomically.
Expand Down Expand Up @@ -107,7 +108,7 @@ void createPartitions(
*/
default boolean purgePartitions(InternalRow[] idents)
throws NoSuchPartitionException, UnsupportedOperationException {
throw new UnsupportedOperationException("Partition purge is not supported");
throw QueryExecutionErrors.unsupportedPurgePartitionError();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;
import org.apache.spark.sql.errors.QueryExecutionErrors;
import org.apache.spark.sql.types.StructType;

/**
Expand Down Expand Up @@ -88,7 +89,7 @@ void createPartition(
*/
default boolean purgePartition(InternalRow ident)
throws NoSuchPartitionException, UnsupportedOperationException {
throw new UnsupportedOperationException("Partition purge is not supported");
throw QueryExecutionErrors.unsupportedPurgePartitionError();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.errors.QueryCompilationErrors;
import org.apache.spark.sql.errors.QueryExecutionErrors;
import org.apache.spark.sql.types.StructType;

import java.util.Collections;
Expand Down Expand Up @@ -256,7 +257,7 @@ Table alterTable(
* @since 3.1.0
*/
default boolean purgeTable(Identifier ident) throws UnsupportedOperationException {
throw new UnsupportedOperationException("Purge table is not supported.");
throw QueryExecutionErrors.unsupportedPurgeTableError();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2795,4 +2795,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
errorClass = "MERGE_CARDINALITY_VIOLATION",
messageParameters = Map.empty)
}

def unsupportedPurgePartitionError(): SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException(
errorClass = "UNSUPPORTED_FEATURE.PURGE_PARTITION",
messageParameters = Map.empty)
}

def unsupportedPurgeTableError(): SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException(
errorClass = "UNSUPPORTED_FEATURE.PURGE_TABLE",
messageParameters = Map.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.catalog

import java.util

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
Expand Down Expand Up @@ -117,10 +117,13 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
partTable.createPartitions(
partIdents,
Array(new util.HashMap[String, String](), new util.HashMap[String, String]()))
val errMsg = intercept[UnsupportedOperationException] {
partTable.purgePartitions(partIdents)
}.getMessage
assert(errMsg.contains("purge is not supported"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
partTable.purgePartitions(partIdents)
},
errorClass = "UNSUPPORTED_FEATURE.PURGE_PARTITION",
parameters = Map.empty
)
}

test("dropPartitions failed if partition not exists") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util

import scala.collection.JavaConverters._

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
Expand Down Expand Up @@ -89,10 +89,13 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
val errMsg = intercept[UnsupportedOperationException] {
partTable.purgePartition(InternalRow.apply("3"))
}.getMessage
assert(errMsg.contains("purge is not supported"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
partTable.purgePartition(InternalRow.apply("3"))
},
errorClass = "UNSUPPORTED_FEATURE.PURGE_PARTITION",
parameters = Map.empty
)
}

test("replacePartitionMetadata") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit
test("empty string as partition value") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t ADD PARTITION (p1 = '')")
}.getMessage
assert(errMsg.contains("Partition spec is invalid. " +
"The spec ([p1=]) contains an empty partition column value"))
checkError(
exception = intercept[AnalysisException] {
sql(s"ALTER TABLE $t ADD PARTITION (p1 = '')")
},
errorClass = "_LEGACY_ERROR_TEMP_1076",
parameters = Map(
"details" -> "The spec ([p1=]) contains an empty partition column value"
)
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ class AlterTableDropPartitionSuite
test("empty string as partition value") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t DROP PARTITION (p1 = '')")
}.getMessage
assert(errMsg.contains("Partition spec is invalid. " +
"The spec ([p1=]) contains an empty partition column value"))
checkError(
exception = intercept[AnalysisException] {
sql(s"ALTER TABLE $t DROP PARTITION (p1 = '')")
},
errorClass = "_LEGACY_ERROR_TEMP_1076",
parameters = Map("details" -> "The spec ([p1=]) contains an empty partition column value")
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ trait AlterTableRenameSuiteBase extends command.AlterTableRenameSuiteBase with Q
sql(s"CREATE NAMESPACE $catalog.src_ns")
val src = dst.replace("dst", "src")
sql(s"CREATE TABLE $src (c0 INT) $defaultUsing")
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $src RENAME TO dst_ns.dst_tbl")
}.getMessage
assert(errMsg.contains("source and destination databases do not match"))
checkError(
exception = intercept[AnalysisException] {
sql(s"ALTER TABLE $src RENAME TO dst_ns.dst_tbl")
},
errorClass = "_LEGACY_ERROR_TEMP_1073",
parameters = Map("db" -> "src_ns", "newDb" -> "dst_ns")
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,13 @@ trait AlterTableSetLocationSuiteBase extends command.AlterTableSetLocationSuiteB
checkLocation(tableIdent, new URI("/path/to/part/ways2"), Some(partSpec))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t PARTITION (A='1', B='2') SET LOCATION '/path/to/part/ways3'")
}.getMessage
assert(e.contains("not a valid partition column"))
checkError(
exception = intercept[AnalysisException] {
sql(s"ALTER TABLE $t PARTITION (A='1', B='2') SET LOCATION '/path/to/part/ways3'")
},
errorClass = "_LEGACY_ERROR_TEMP_1231",
parameters = Map("key" -> "A", "tblName" -> "`spark_catalog`.`ns`.`tbl`")
)
}

sessionCatalog.setCurrentDatabase("ns")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,13 @@ trait ShowCreateTableSuiteBase extends command.ShowCreateTableSuiteBase
""".stripMargin
)

val cause = intercept[AnalysisException] {
getShowCreateDDL(t, true)
}

assert(cause.getMessage.contains("Use `SHOW CREATE TABLE` without `AS SERDE` instead"))
checkError(
exception = intercept[AnalysisException] {
getShowCreateDDL(t, true)
},
errorClass = "_LEGACY_ERROR_TEMP_1274",
parameters = Map("table" -> "`spark_catalog`.`ns1`.`tbl`")
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ trait ShowTablesSuiteBase extends command.ShowTablesSuiteBase with command.Tests
}

test("only support single-level namespace") {
val errMsg = intercept[AnalysisException] {
runShowTablesSql("SHOW TABLES FROM a.b", Seq())
}.getMessage
assert(errMsg.contains("Nested databases are not supported by v1 session catalog: a.b"))
checkError(
exception = intercept[AnalysisException] {
runShowTablesSql("SHOW TABLES FROM a.b", Seq())
},
errorClass = "_LEGACY_ERROR_TEMP_1126",
parameters = Map("catalog" -> "a.b")
)
}

test("SHOW TABLE EXTENDED from default") {
Expand Down Expand Up @@ -96,10 +99,13 @@ trait ShowTablesSuiteBase extends command.ShowTablesSuiteBase with command.Tests
Seq(
s"SHOW TABLES IN $catalog",
s"SHOW TABLE EXTENDED IN $catalog LIKE '*tbl'").foreach { showTableCmd =>
val errMsg = intercept[AnalysisException] {
sql(showTableCmd)
}.getMessage
assert(errMsg.contains("Database from v1 session catalog is not specified"))
checkError(
exception = intercept[AnalysisException] {
sql(showTableCmd)
},
errorClass = "_LEGACY_ERROR_TEMP_1125",
parameters = Map.empty
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,13 @@ class TruncateTableSuite extends TruncateTableSuiteBase with CommandSuiteBase {
withNamespaceAndTable("ns", "tbl") { t =>
(("a", "b") :: Nil).toDF().write.parquet(tempDir.getCanonicalPath)
sql(s"CREATE TABLE $t $defaultUsing LOCATION '${tempDir.toURI}'")
val errMsg = intercept[AnalysisException] {
sql(s"TRUNCATE TABLE $t")
}.getMessage
assert(errMsg.contains("Operation not allowed: TRUNCATE TABLE on external tables"))
checkError(
exception = intercept[AnalysisException] {
sql(s"TRUNCATE TABLE $t")
},
errorClass = "_LEGACY_ERROR_TEMP_1266",
parameters = Map("tableIdentWithDB" -> "`spark_catalog`.`ns`.`tbl`")
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.command.v2

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.util.quoteIdentifier
Expand Down Expand Up @@ -56,10 +57,13 @@ class AlterTableDropPartitionSuite
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
sql(s"ALTER TABLE $t ADD PARTITION (id=1)")
try {
val errMsg = intercept[UnsupportedOperationException] {
sql(s"ALTER TABLE $t DROP PARTITION (id=1) PURGE")
}.getMessage
assert(errMsg.contains("purge is not supported"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
sql(s"ALTER TABLE $t DROP PARTITION (id=1) PURGE")
},
errorClass = "UNSUPPORTED_FEATURE.PURGE_PARTITION",
parameters = Map.empty
)
} finally {
sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ class AlterTableRecoverPartitionsSuite
test("partition recovering of v2 tables is not supported") {
withNamespaceAndTable("ns", "tbl") { t =>
spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t RECOVER PARTITIONS")
}.getMessage
assert(errMsg.contains("ALTER TABLE ... RECOVER PARTITIONS is not supported for v2 tables"))
checkError(
exception = intercept[AnalysisException] {
sql(s"ALTER TABLE $t RECOVER PARTITIONS")
},
errorClass = "NOT_SUPPORTED_COMMAND_FOR_V2_TABLE",
parameters = Map("cmd" -> "ALTER TABLE ... RECOVER PARTITIONS")
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ class AlterTableSetLocationSuite
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id int) USING foo")

val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t PARTITION(ds='2017-06-10') SET LOCATION 's3://bucket/path'")
}
assert(e.getMessage.contains(
"ALTER TABLE SET LOCATION does not support partition for v2 tables"))
checkError(
exception = intercept[AnalysisException] {
sql(s"ALTER TABLE $t PARTITION(ds='2017-06-10') SET LOCATION 's3://bucket/path'")
},
errorClass = "_LEGACY_ERROR_TEMP_1045",
parameters = Map.empty
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.command.v2

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.Row
import org.apache.spark.sql.connector.InMemoryTableSessionCatalog
import org.apache.spark.sql.execution.command
Expand All @@ -29,11 +30,14 @@ class DropTableSuite extends command.DropTableSuiteBase with CommandSuiteBase {
test("purge option") {
withNamespaceAndTable("ns", "tbl") { t =>
createTable(t)
val errMsg = intercept[UnsupportedOperationException] {
sql(s"DROP TABLE $catalog.ns.tbl PURGE")
}.getMessage
// The default TableCatalog.purgeTable implementation throws an exception.
assert(errMsg.contains("Purge table is not supported"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
sql(s"DROP TABLE $catalog.ns.tbl PURGE")
},
errorClass = "UNSUPPORTED_FEATURE.PURGE_TABLE",
parameters = Map.empty
)
}
}

Expand Down
Loading

0 comments on commit 0b5bb10

Please sign in to comment.