Skip to content

Commit

Permalink
[SPARK-48618][SQL] Utilize the ErrorCode and SQLState returned in…
Browse files Browse the repository at this point in the history
… SQLException to make errors more accurate

### What changes were proposed in this pull request?
The pr aims to use the `ErrorCode` and `SQLState` returned in SQLException to make errors more accurate.

### Why are the changes needed?
- eg for `mysql`
  ```
  withTable(s"$catalogName.tbl1", s"$catalogName.tbl2") {
        sql(s"CREATE TABLE $catalogName.tbl1 (col1 INT, col2 INT)")
        sql(s"CREATE TABLE $catalogName.tbl2 (col1 INT, col2 INT)")
        sql(s"ALTER TABLE $catalogName.tbl2 RENAME TO tbl1")
      }
  ```
- Before:
  ```
  [FAILED_JDBC.UNCLASSIFIED] Failed JDBC jdbc: on the operation: Failed table renaming from tbl2 to tbl1 SQLSTATE:
  HV000
  org.apache.spark.sql.AnalysisException: [FAILED_JDBC.UNCLASSIFIED] Failed JDBC jdbc: on the operation: Failed table
  renaming from tbl2 to tbl1 SQLSTATE: HV000
  	  at org.apache.spark.sql.jdbc.JdbcDialect.classifyException(JdbcDialects.scala:767)
	  at org.apache.spark.sql.jdbc.JdbcDialect.classifyException(JdbcDialects.scala:751)
	  at org.apache.spark.sql.jdbc.MySQLDialect.classifyException(MySQLDialect.scala:348)
	  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.classifyException(JdbcUtils.scala:1271)
	  at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog.$anonfun$renameTable$1(JDBCTableCatalog.scala:125)
	  at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog.$anonfun$renameTable$1$adapted(JDBCTableCatalog.scala:116)
	  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.withConnection(JdbcUtils.scala:1279)
  ```

- After:
  ```
  [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `tbl1` because it already exists.
  Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing
  objects. SQLSTATE: 42P07
  org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `tbl1` because it already exists.
  Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects. SQLSTATE: 42P07
	  at org.apache.spark.sql.errors.QueryCompilationErrors$.tableAlreadyExistsError(QueryCompilationErrors.scala:2643)
	  at org.apache.spark.sql.jdbc.MySQLDialect.classifyException(MySQLDialect.scala:343)
	  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.classifyException(JdbcUtils.scala:1271)
	  at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog.$anonfun$renameTable$1(JDBCTableCatalog.scala:125)
	  at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog.$anonfun$renameTable$1$adapted(JDBCTableCatalog.scala:116)
	  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.withConnection(JdbcUtils.scala:1279)
  ```

- Aligning similar logic to `PostgresDialect`
https://github.com/apache/spark/blob/08e741b92b8fc9e43c838d0849317916218414ce/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L273-L275

### Does this PR introduce _any_ user-facing change?
Yes, End users will accurately know the cause of the error.

### How was this patch tested?
Add new UT.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#46969 from panbingkun/fine-grained_jdbc_error.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
panbingkun authored and attilapiros committed Oct 4, 2024
1 parent 20335d6 commit 27b2a70
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -944,4 +944,19 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
assert(row(2).getDouble(0) === 0.0)
}
}

test("SPARK-48618: Renaming the table to the name of an existing table") {
withTable(s"$catalogName.tbl1", s"$catalogName.tbl2") {
sql(s"CREATE TABLE $catalogName.tbl1 (col1 INT, col2 INT)")
sql(s"CREATE TABLE $catalogName.tbl2 (col3 INT, col4 INT)")

checkError(
exception = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.tbl2 RENAME TO tbl1")
},
errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
parameters = Map("relationName" -> "`tbl1`")
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -167,6 +168,9 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case "42710" if errorClass == "FAILED_JDBC.RENAME_TABLE" =>
val newTable = messageParameters("newName")
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
case _ => super.classifyException(e, errorClass, messageParameters, description)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.{Expression, NullOrdering, SortDirection}
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.MsSqlServerDialect.{GEOGRAPHY, GEOMETRY}
Expand Down Expand Up @@ -216,6 +216,9 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case 15335 if errorClass == "FAILED_JDBC.RENAME_TABLE" =>
val newTable = messageParameters("newName")
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
case _ => super.classifyException(e, errorClass, messageParameters, description)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSu
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference, NullOrdering, SortDirection}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -337,6 +337,9 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No
case sqlException: SQLException =>
sqlException.getErrorCode match {
// ER_DUP_KEYNAME
case 1050 if errorClass == "FAILED_JDBC.RENAME_TABLE" =>
val newTable = messageParameters("newName")
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.sql.jdbc

import java.sql.{Date, Timestamp, Types}
import java.sql.{Date, SQLException, Timestamp, Types}
import java.util.Locale

import scala.util.control.NonFatal

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.jdbc.OracleDialect._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -229,6 +231,23 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N
override def supportsLimit: Boolean = true

override def supportsOffset: Boolean = true

override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
case 955 if errorClass == "FAILED_JDBC.RENAME_TABLE" =>
val newTable = messageParameters("newName")
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
}
}

private[jdbc] object OracleDialect {
Expand Down

0 comments on commit 27b2a70

Please sign in to comment.