Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46393][SQL][FOLLOWUP] Classify exceptions in JDBCTableCatalog.loadTable and Fix UT #46912

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,11 @@
"List namespaces."
]
},
"LOAD_TABLE" : {
"message" : [
"Load the table <tableName>."
]
},
"NAMESPACE_EXISTS" : {
"message" : [
"Check that the namespace <namespace> exists."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ package org.apache.spark.sql.jdbc.v2
import org.apache.logging.log4j.Level

import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample, Sort}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.NullOrdering
Expand Down Expand Up @@ -84,6 +83,17 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def testCreateTableWithProperty(tbl: String): Unit = {}

def checkErrorFailedLoadTable(e: AnalysisException, tbl: String): Unit = {
checkError(
exception = e,
errorClass = "FAILED_JDBC.UNCLASSIFIED",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be LOAD_TABLE?

Copy link
Contributor Author

@panbingkun panbingkun Jun 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
classifyException(description, e)
}
/**
* Gets a dialect exception, classifies it and wraps it by `AnalysisException`.
* @param message The error message to be placed to the returned exception.
* @param e The dialect specific exception.
* @return `AnalysisException` or its sub-class.
*/
@deprecated("Please override the classifyException method with an error class", "4.0.0")
def classifyException(message: String, e: Throwable): AnalysisException = {
new AnalysisException(
errorClass = "FAILED_JDBC.UNCLASSIFIED",
messageParameters = Map(
"url" -> "jdbc:",
"message" -> message),
cause = Some(e))
}

image
It lost the upstream errorClass in this place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we should correct it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If necessary, I handle it together with the prefix logic above?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

parameters = Map(
"url" -> "jdbc:",
"message" -> s"Failed to load table: $tbl"
)
)
}

test("SPARK-33034: ALTER TABLE ... add new columns") {
withTable(s"$catalogName.alt_table") {
sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)")
Expand Down Expand Up @@ -122,9 +132,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)")
}
checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`",
ExpectedContext(s"$catalogName.not_existing_table", 12,
11 + s"$catalogName.not_existing_table".length))
checkErrorFailedLoadTable(e, "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... drop column") {
Expand All @@ -146,9 +154,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1")
}
checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`",
ExpectedContext(s"$catalogName.not_existing_table", 12,
11 + s"$catalogName.not_existing_table".length))
checkErrorFailedLoadTable(e, "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... update column type") {
Expand All @@ -164,9 +170,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE")
}
checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`",
ExpectedContext(s"$catalogName.not_existing_table", 12,
11 + s"$catalogName.not_existing_table".length))
checkErrorFailedLoadTable(e, "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... rename column") {
Expand Down Expand Up @@ -194,11 +198,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C")
}
checkErrorTableNotFound(e,
UnresolvedAttribute.parseAttributeName(s"$catalogName.not_existing_table")
.map(part => quoteIdentifier(part)).mkString("."),
ExpectedContext(s"$catalogName.not_existing_table", 12,
11 + s"$catalogName.not_existing_table".length))
checkErrorFailedLoadTable(e, "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... update column nullability") {
Expand All @@ -209,9 +209,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL")
}
checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`",
ExpectedContext(s"$catalogName.not_existing_table", 12,
11 + s"$catalogName.not_existing_table".length))
checkErrorFailedLoadTable(e, "not_existing_table")
}

test("CREATE TABLE with table comment") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,16 @@ class JDBCTableCatalog extends TableCatalog
checkNamespace(ident.namespace())
val optionsWithTableName = new JDBCOptions(
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
try {
JdbcUtils.classifyException(
errorClass = "FAILED_JDBC.LOAD_TABLE",
messageParameters = Map(
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect,
description = s"Failed to load table: $ident"
Copy link
Contributor Author

@panbingkun panbingkun Jun 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @yaooqinn
Also, do we need catalogname as the prefix for the table here?
Although I found this class JDBCTableCatalog.scala, similar ones do not have this prefix attached.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine to add catalog info. Let's do it in a separated PR, as not only loadTable needs to be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Let me to do it in a separated PR.

) {
val schema = JDBCRDD.resolveTable(optionsWithTableName)
JDBCTable(ident, schema, optionsWithTableName)
} catch {
case e: SQLException =>
logWarning("Failed to load table", e)
throw QueryCompilationErrors.noSuchTableError(ident)
}
}

Expand Down