Skip to content

Commit

Permalink
[SPARK-48851][SQL] Change the value of SCHEMA_NOT_FOUND from `names…
Browse files Browse the repository at this point in the history
…pace` to `catalog.namespace`

### What changes were proposed in this pull request?
The pr aims to change the value of `SCHEMA_NOT_FOUND` from `namespace` to `catalog.namespace`.

### Why are the changes needed?
As discussing apache#47038 (comment), we should provide more friendly and clear prompt error message.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Update existed UT & Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47276 from panbingkun/db_with_catalog.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
panbingkun authored and cloud-fan committed Jul 11, 2024
1 parent 225b6cb commit b4e3c2a
Show file tree
Hide file tree
Showing 30 changed files with 96 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{QueryContext, QueryContextType, SparkArithmeticExceptio
import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, SparkConnectServiceGrpc, UserContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.streaming.StreamingQueryException
Expand Down Expand Up @@ -219,6 +219,8 @@ private[client] object GrpcExceptionConverter {
params.errorClass.orNull,
params.messageParameters,
params.cause)),
errorConstructor(params =>
new NoSuchNamespaceException(params.errorClass.orNull, params.messageParameters)),
errorConstructor(params =>
new NoSuchTableException(params.errorClass.orNull, params.messageParameters, params.cause)),
errorConstructor[NumberFormatException](params =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.scalatest.PrivateMethodTester

import org.apache.spark.{SparkArithmeticException, SparkException, SparkUpgradeException}
import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, TableAlreadyExistsException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, TableAlreadyExistsException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.parser.ParseException
Expand Down Expand Up @@ -165,8 +165,8 @@ class ClientE2ETestSuite
}
}

test("throw NoSuchDatabaseException") {
val ex = intercept[NoSuchDatabaseException] {
test("throw NoSuchNamespaceException") {
val ex = intercept[NoSuchNamespaceException] {
spark.sql("use database123")
}
assert(ex.getErrorClass != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class SessionCatalog(

private def requireDbExists(db: String): Unit = {
if (!databaseExists(db)) {
throw new NoSuchDatabaseException(db)
throw new NoSuchNamespaceException(Seq(CatalogManager.SESSION_CATALOG_NAME, db))
}
}

Expand Down Expand Up @@ -291,7 +291,8 @@ class SessionCatalog(
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
val dbName = format(db)
if (dbName == DEFAULT_DATABASE) {
throw QueryCompilationErrors.cannotDropDefaultDatabaseError(dbName)
throw QueryCompilationErrors.cannotDropDefaultDatabaseError(
Seq(CatalogManager.SESSION_CATALOG_NAME, dbName))
}
if (!ignoreIfNotExists) {
requireDbExists(dbName)
Expand Down Expand Up @@ -527,7 +528,7 @@ class SessionCatalog(
* We replace char/varchar with "annotated" string type in the table schema, as the query
* engine doesn't support char/varchar yet.
*/
@throws[NoSuchDatabaseException]
@throws[NoSuchNamespaceException]
@throws[NoSuchTableException]
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val t = getTableRawMetadata(name)
Expand All @@ -538,7 +539,7 @@ class SessionCatalog(
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database.
*/
@throws[NoSuchDatabaseException]
@throws[NoSuchNamespaceException]
@throws[NoSuchTableException]
def getTableRawMetadata(name: TableIdentifier): CatalogTable = {
val qualifiedIdent = qualifyIdentifier(name)
Expand All @@ -556,7 +557,7 @@ class SessionCatalog(
* For example, if none of the requested tables could be retrieved, an empty list is returned.
* There is no guarantee of ordering of the returned tables.
*/
@throws[NoSuchDatabaseException]
@throws[NoSuchNamespaceException]
def getTablesByName(names: Seq[TableIdentifier]): Seq[CatalogTable] = {
if (names.nonEmpty) {
val qualifiedIdents = names.map(qualifyIdentifier)
Expand Down Expand Up @@ -1056,7 +1057,6 @@ class SessionCatalog(
getTempViewOrPermanentTableMetadata(ident).tableType == CatalogTableType.VIEW
} catch {
case _: NoSuchTableException => false
case _: NoSuchDatabaseException => false
case _: NoSuchNamespaceException => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,10 +1072,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("database" -> database))
}

def cannotDropDefaultDatabaseError(database: String): Throwable = {
def cannotDropDefaultDatabaseError(nameParts: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.DROP_DATABASE",
messageParameters = Map("database" -> toSQLId(database)))
messageParameters = Map("database" -> toSQLId(nameParts)))
}

def cannotUsePreservedDatabaseAsCurrentDatabaseError(database: String): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("get database should throw exception when the database does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.getDatabaseMetadata("db_that_does_not_exist")
}
}
Expand Down Expand Up @@ -283,7 +283,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("drop database when the database does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
}
catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
Expand All @@ -295,7 +295,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
catalog.setCurrentDatabase("db1")
assert(catalog.getCurrentDatabase == "db1")
catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = true)
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.createTable(newTable("tbl1", "db1"), ignoreIfExists = false)
}
catalog.setCurrentDatabase("default")
Expand All @@ -321,7 +321,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("alter database should throw exception when the database does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.alterDatabase(newDb("unknown_db"))
}
}
Expand All @@ -332,7 +332,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
assert(catalog.getCurrentDatabase == "default")
catalog.setCurrentDatabase("db2")
assert(catalog.getCurrentDatabase == "db2")
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.setCurrentDatabase("deebo")
}
catalog.createDatabase(newDb("deebo"), ignoreIfExists = false)
Expand Down Expand Up @@ -370,10 +370,10 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
test("create table when database does not exist") {
withBasicCatalog { catalog =>
// Creating table in non-existent database should always fail
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = false)
}
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = true)
}
// Table already exists
Expand Down Expand Up @@ -419,11 +419,11 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
test("drop table when database/table does not exist") {
withBasicCatalog { catalog =>
// Should always throw exception when the database does not exist
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false,
purge = false)
}
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true,
purge = false)
}
Expand Down Expand Up @@ -494,7 +494,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("rename table when database/table does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.renameTable(TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2"))
}
intercept[NoSuchTableException] {
Expand Down Expand Up @@ -543,7 +543,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("alter table when database/table does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.alterTable(newTable("tbl1", "unknown_db"))
}
intercept[NoSuchTableException] {
Expand Down Expand Up @@ -608,7 +608,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("get table when database/table does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db")))
}
intercept[NoSuchTableException] {
Expand Down Expand Up @@ -856,7 +856,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
TableIdentifier("tbl4"),
TableIdentifier("tbl1", Some("db2")),
TableIdentifier("tbl2", Some("db2"))))
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.listTables("unknown_db")
}
}
Expand All @@ -876,7 +876,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
TableIdentifier("tbl2", Some("db2"))))
assert(catalog.listTables("db2", "*1").toSet ==
Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2"))))
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.listTables("unknown_db", "*")
}
}
Expand Down Expand Up @@ -970,7 +970,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("create partitions when database/table does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.createPartitions(
TableIdentifier("tbl1", Some("unknown_db")), Seq(), ignoreIfExists = false)
}
Expand Down Expand Up @@ -1077,7 +1077,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("drop partitions when database/table does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.dropPartitions(
TableIdentifier("tbl1", Some("unknown_db")),
Seq(),
Expand Down Expand Up @@ -1177,7 +1177,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("get partition when database/table does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.getPartition(TableIdentifier("tbl1", Some("unknown_db")), part1.spec)
}
intercept[NoSuchTableException] {
Expand Down Expand Up @@ -1258,7 +1258,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("rename partitions when database/table does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.renamePartitions(
TableIdentifier("tbl1", Some("unknown_db")), Seq(part1.spec), Seq(part2.spec))
}
Expand Down Expand Up @@ -1349,7 +1349,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("alter partitions when database/table does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.alterPartitions(TableIdentifier("tbl1", Some("unknown_db")), Seq(part1))
}
intercept[NoSuchTableException] {
Expand Down Expand Up @@ -1497,7 +1497,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("list partitions when database/table does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.listPartitions(TableIdentifier("tbl1", Some("unknown_db")))
}
intercept[NoSuchTableException] {
Expand Down Expand Up @@ -1544,7 +1544,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("create function when database does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.createFunction(
newFunc("func5", Some("does_not_exist")), ignoreIfExists = false)
}
Expand Down Expand Up @@ -1687,7 +1687,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("drop function when database/function does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.dropFunction(
FunctionIdentifier("something", Some("unknown_db")), ignoreIfNotExists = false)
}
Expand Down Expand Up @@ -1746,7 +1746,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("get function when database/function does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("unknown_db")))
}
intercept[NoSuchFunctionException] {
Expand Down Expand Up @@ -1799,7 +1799,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

test("list functions when database does not exist") {
withBasicCatalog { catalog =>
intercept[NoSuchDatabaseException] {
intercept[NoSuchNamespaceException] {
catalog.listFunctions("unknown_db", "func*")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp
case _ if namespaceExists(namespace) =>
util.Collections.emptyMap[String, String]
case _ =>
throw new NoSuchNamespaceException(namespace)
throw new NoSuchNamespaceException(name() +: namespace)
}
}

Expand Down Expand Up @@ -256,7 +256,7 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp
if (namespace.isEmpty || namespaceExists(namespace)) {
super.listTables(namespace)
} else {
throw new NoSuchNamespaceException(namespace)
throw new NoSuchNamespaceException(name() +: namespace)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case class DropNamespaceExec(
throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace)
}
} else if (!ifExists) {
throw QueryCompilationErrors.noSuchNamespaceError(ns)
throw QueryCompilationErrors.noSuchNamespaceError(catalog.name() +: ns)
}

Seq.empty
Expand Down
Loading

0 comments on commit b4e3c2a

Please sign in to comment.