From 9f341af662f0cda9d6ac61997c37e389f4110961 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 7 Jun 2024 16:10:06 +0800 Subject: [PATCH 1/3] [SPARK-46393][SQL][TESTS] Fix UT --- .../apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 45c4f41ffb77b..b81645ab35afe 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.jdbc.v2 import org.apache.logging.log4j.Level import org.apache.spark.sql.{AnalysisException, DataFrame} -import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample, Sort} -import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog} import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.NullOrdering @@ -84,6 +83,17 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def testCreateTableWithProperty(tbl: String): Unit = {} + def checkErrorFailedLoadTable(e: AnalysisException): Unit = { + checkError( + exception = e, + errorClass = "FAILED_JDBC.UNCLASSIFIED", + parameters = Map( + "url" -> "jdbc:", + "message" -> "Failed to load table: not_existing_table" + ) + ) + } + test("SPARK-33034: ALTER TABLE ... add new columns") { withTable(s"$catalogName.alt_table") { sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)") @@ -122,9 +132,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)") } - checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`", - ExpectedContext(s"$catalogName.not_existing_table", 12, - 11 + s"$catalogName.not_existing_table".length)) + checkErrorFailedLoadTable(e) } test("SPARK-33034: ALTER TABLE ... drop column") { @@ -146,9 +154,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1") } - checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`", - ExpectedContext(s"$catalogName.not_existing_table", 12, - 11 + s"$catalogName.not_existing_table".length)) + checkErrorFailedLoadTable(e) } test("SPARK-33034: ALTER TABLE ... update column type") { @@ -164,9 +170,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE") } - checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`", - ExpectedContext(s"$catalogName.not_existing_table", 12, - 11 + s"$catalogName.not_existing_table".length)) + checkErrorFailedLoadTable(e) } test("SPARK-33034: ALTER TABLE ... rename column") { @@ -194,11 +198,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C") } - checkErrorTableNotFound(e, - UnresolvedAttribute.parseAttributeName(s"$catalogName.not_existing_table") - .map(part => quoteIdentifier(part)).mkString("."), - ExpectedContext(s"$catalogName.not_existing_table", 12, - 11 + s"$catalogName.not_existing_table".length)) + checkErrorFailedLoadTable(e) } test("SPARK-33034: ALTER TABLE ... update column nullability") { @@ -209,9 +209,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL") } - checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`", - ExpectedContext(s"$catalogName.not_existing_table", 12, - 11 + s"$catalogName.not_existing_table".length)) + checkErrorFailedLoadTable(e) } test("CREATE TABLE with table comment") { From b8a40c0d0f96dba619d4982cd946802f18af32a9 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 7 Jun 2024 16:24:11 +0800 Subject: [PATCH 2/3] update --- .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index b81645ab35afe..c78e87d0b8463 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -83,13 +83,13 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def testCreateTableWithProperty(tbl: String): Unit = {} - def checkErrorFailedLoadTable(e: AnalysisException): Unit = { + def checkErrorFailedLoadTable(e: AnalysisException, tbl: String): Unit = { checkError( exception = e, errorClass = "FAILED_JDBC.UNCLASSIFIED", parameters = Map( "url" -> "jdbc:", - "message" -> "Failed to load table: not_existing_table" + "message" -> s"Failed to load table: $tbl" ) ) } @@ -132,7 +132,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)") } - checkErrorFailedLoadTable(e) + checkErrorFailedLoadTable(e, "not_existing_table") } test("SPARK-33034: ALTER TABLE ... drop column") { @@ -154,7 +154,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1") } - checkErrorFailedLoadTable(e) + checkErrorFailedLoadTable(e, "not_existing_table") } test("SPARK-33034: ALTER TABLE ... update column type") { @@ -170,7 +170,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE") } - checkErrorFailedLoadTable(e) + checkErrorFailedLoadTable(e, "not_existing_table") } test("SPARK-33034: ALTER TABLE ... rename column") { @@ -198,7 +198,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C") } - checkErrorFailedLoadTable(e) + checkErrorFailedLoadTable(e, "not_existing_table") } test("SPARK-33034: ALTER TABLE ... update column nullability") { @@ -209,7 +209,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL") } - checkErrorFailedLoadTable(e) + checkErrorFailedLoadTable(e, "not_existing_table") } test("CREATE TABLE with table comment") { From 60b4228b2a8aa008f36f095aa429248b91a66388 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 7 Jun 2024 10:11:40 +0800 Subject: [PATCH 3/3] [SPARK-46393][SQL][FOLLOWUP] Classify exceptions in JDBCTableCatalog.loadTable ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/44335 , which missed to handle `loadTable` ### Why are the changes needed? better error message ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing test ### Was this patch authored or co-authored using generative AI tooling? no Closes #46905 from cloud-fan/jdbc. Authored-by: Wenchen Fan Signed-off-by: Kent Yao --- .../src/main/resources/error/error-conditions.json | 5 +++++ .../datasources/v2/jdbc/JDBCTableCatalog.scala | 13 ++++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 7b88300737700..36d8fe1daa379 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1255,6 +1255,11 @@ "List namespaces." ] }, + "LOAD_TABLE" : { + "message" : [ + "Load the table ." + ] + }, "NAMESPACE_EXISTS" : { "message" : [ "Check that the namespace exists." diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index dbd8ee5981daa..e7a3fe0f8aa7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -131,13 +131,16 @@ class JDBCTableCatalog extends TableCatalog checkNamespace(ident.namespace()) val optionsWithTableName = new JDBCOptions( options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) - try { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.LOAD_TABLE", + messageParameters = Map( + "url" -> options.getRedactUrl(), + "tableName" -> toSQLId(ident)), + dialect, + description = s"Failed to load table: $ident" + ) { val schema = JDBCRDD.resolveTable(optionsWithTableName) JDBCTable(ident, schema, optionsWithTableName) - } catch { - case e: SQLException => - logWarning("Failed to load table", e) - throw QueryCompilationErrors.noSuchTableError(ident) } }