-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49152][SQL] V2SessionCatalog should use V2Command #47724
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ | |||||||||||||||||||||||||
import org.apache.spark.sql.catalyst.rules.Rule | ||||||||||||||||||||||||||
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols} | ||||||||||||||||||||||||||
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ | ||||||||||||||||||||||||||
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} | ||||||||||||||||||||||||||
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} | ||||||||||||||||||||||||||
import org.apache.spark.sql.connector.expressions.Transform | ||||||||||||||||||||||||||
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} | ||||||||||||||||||||||||||
import org.apache.spark.sql.execution.command._ | ||||||||||||||||||||||||||
|
@@ -66,7 +66,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS") | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _) | ||||||||||||||||||||||||||
if isSessionCatalog(catalog) => | ||||||||||||||||||||||||||
if supportsV1Command(catalog) => | ||||||||||||||||||||||||||
if (a.column.name.length > 1) { | ||||||||||||||||||||||||||
throw QueryCompilationErrors.unsupportedTableOperationError( | ||||||||||||||||||||||||||
catalog, ident, "ALTER COLUMN with qualified column") | ||||||||||||||||||||||||||
|
@@ -117,13 +117,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) => | ||||||||||||||||||||||||||
AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case DescribeNamespace(DatabaseInSessionCatalog(db), extended, output) if conf.useV1Command => | ||||||||||||||||||||||||||
case DescribeNamespace(ResolvedV1Database(db), extended, output) if conf.useV1Command => | ||||||||||||||||||||||||||
DescribeDatabaseCommand(db, extended, output) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case SetNamespaceProperties(DatabaseInSessionCatalog(db), properties) if conf.useV1Command => | ||||||||||||||||||||||||||
case SetNamespaceProperties(ResolvedV1Database(db), properties) if conf.useV1Command => | ||||||||||||||||||||||||||
AlterDatabasePropertiesCommand(db, properties) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if conf.useV1Command => | ||||||||||||||||||||||||||
case SetNamespaceLocation(ResolvedV1Database(db), location) if conf.useV1Command => | ||||||||||||||||||||||||||
if (StringUtils.isEmpty(location)) { | ||||||||||||||||||||||||||
throw QueryExecutionErrors.invalidEmptyLocationError(location) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
@@ -218,7 +218,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) => | ||||||||||||||||||||||||||
DropTempViewCommand(ident) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case DropView(ResolvedV1Identifier(ident), ifExists) => | ||||||||||||||||||||||||||
case DropView(ResolvedIdentifierInSessionCatalog(ident), ifExists) => | ||||||||||||||||||||||||||
DropTableCommand(ident, ifExists, isView = true, purge = false) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case DropView(r @ ResolvedIdentifier(catalog, ident), _) => | ||||||||||||||||||||||||||
|
@@ -237,14 +237,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) if conf.useV1Command => | ||||||||||||||||||||||||||
case d @ DropNamespace(ResolvedV1Database(db), _, _) if conf.useV1Command => | ||||||||||||||||||||||||||
DropDatabaseCommand(db, d.ifExists, d.cascade) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case ShowTables(DatabaseInSessionCatalog(db), pattern, output) if conf.useV1Command => | ||||||||||||||||||||||||||
case ShowTables(ResolvedV1Database(db), pattern, output) if conf.useV1Command => | ||||||||||||||||||||||||||
ShowTablesCommand(Some(db), pattern, output) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case ShowTableExtended( | ||||||||||||||||||||||||||
DatabaseInSessionCatalog(db), | ||||||||||||||||||||||||||
ResolvedV1Database(db), | ||||||||||||||||||||||||||
pattern, | ||||||||||||||||||||||||||
partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _))), | ||||||||||||||||||||||||||
output) => | ||||||||||||||||||||||||||
|
@@ -265,7 +265,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
AnalyzePartitionCommand(ident, partitionSpec, noScan) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) => | ||||||||||||||||||||||||||
case AnalyzeTables(ResolvedV1Database(db), noScan) => | ||||||||||||||||||||||||||
AnalyzeTablesCommand(Some(db), noScan) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) => | ||||||||||||||||||||||||||
|
@@ -293,7 +293,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
if conf.useV1Command => ShowCreateTableCommand(ident, output) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output) | ||||||||||||||||||||||||||
if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) => | ||||||||||||||||||||||||||
if supportsV1Command(catalog) && DDLUtils.isHiveTable(table.catalogTable) => | ||||||||||||||||||||||||||
ShowCreateTableCommand(table.catalogTable.identifier, output) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case TruncateTable(ResolvedV1TableIdentifier(ident)) => | ||||||||||||||||||||||||||
|
@@ -367,7 +367,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) => | ||||||||||||||||||||||||||
AlterViewAsCommand(ident, originalText, query) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case CreateView(ResolvedV1Identifier(ident), userSpecifiedColumns, comment, | ||||||||||||||||||||||||||
case CreateView(ResolvedIdentifierInSessionCatalog(ident), userSpecifiedColumns, comment, | ||||||||||||||||||||||||||
properties, originalText, child, allowExisting, replace) => | ||||||||||||||||||||||||||
CreateViewCommand( | ||||||||||||||||||||||||||
name = ident, | ||||||||||||||||||||||||||
|
@@ -385,7 +385,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case ShowViews(ns: ResolvedNamespace, pattern, output) => | ||||||||||||||||||||||||||
ns match { | ||||||||||||||||||||||||||
case DatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output) | ||||||||||||||||||||||||||
case ResolvedDatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output) | ||||||||||||||||||||||||||
case _ => | ||||||||||||||||||||||||||
throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views") | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
@@ -408,7 +408,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "functions") | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case ShowFunctions(DatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) => | ||||||||||||||||||||||||||
case ShowFunctions( | ||||||||||||||||||||||||||
ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) => | ||||||||||||||||||||||||||
ShowFunctionsCommand(db, pattern, userScope, systemScope, output) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) => | ||||||||||||||||||||||||||
|
@@ -429,7 +430,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION") | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
case CreateFunction(ResolvedV1Identifier(ident), className, resources, ifExists, replace) => | ||||||||||||||||||||||||||
case CreateFunction( | ||||||||||||||||||||||||||
ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) => | ||||||||||||||||||||||||||
CreateFunctionCommand( | ||||||||||||||||||||||||||
FunctionIdentifier(ident.table, ident.database, ident.catalog), | ||||||||||||||||||||||||||
className, | ||||||||||||||||||||||||||
|
@@ -564,7 +566,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
|
||||||||||||||||||||||||||
object ResolvedV1TableIdentifier { | ||||||||||||||||||||||||||
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { | ||||||||||||||||||||||||||
case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) => | ||||||||||||||||||||||||||
case ResolvedTable(catalog, _, t: V1Table, _) if supportsV1Command(catalog) => | ||||||||||||||||||||||||||
Some(t.catalogTable.identifier) | ||||||||||||||||||||||||||
case _ => None | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
@@ -579,6 +581,18 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
object ResolvedV1Identifier { | ||||||||||||||||||||||||||
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { | ||||||||||||||||||||||||||
case ResolvedIdentifier(catalog, ident) if supportsV1Command(catalog) => | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @amaliujia @cloud-fan spark/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala Lines 159 to 170 in e693e18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does Iceberg catalog extend There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do want to use v2 commands for custom catalogs that do not extend There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Nope.
Even so, is it the right time to introduce such a behavior change in a bug fix release? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can consider it as a bug. People implementing DS V2 catalog APIs expect to see v2 commands to customize the table behaviors. And there is a backdoor: For iceberg, it should be easy to work around it by extending There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Iceberg's SparkSessionCatalog already extends a base class. There's no easy way to extend There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to make either the iceberg |
||||||||||||||||||||||||||
if (ident.namespace().length != 1) { | ||||||||||||||||||||||||||
throw QueryCompilationErrors.requiresSinglePartNamespaceError(ident.namespace()) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name))) | ||||||||||||||||||||||||||
case _ => None | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Use this object to help match commands that do not have a v2 implementation. | ||||||||||||||||||||||||||
object ResolvedIdentifierInSessionCatalog{ | ||||||||||||||||||||||||||
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { | ||||||||||||||||||||||||||
case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) => | ||||||||||||||||||||||||||
if (ident.namespace().length != 1) { | ||||||||||||||||||||||||||
|
@@ -610,7 +624,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
private object DatabaseInSessionCatalog { | ||||||||||||||||||||||||||
private object ResolvedV1Database { | ||||||||||||||||||||||||||
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { | ||||||||||||||||||||||||||
case ResolvedNamespace(catalog, _) if !supportsV1Command(catalog) => None | ||||||||||||||||||||||||||
case ResolvedNamespace(_, Seq()) => | ||||||||||||||||||||||||||
throw QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError() | ||||||||||||||||||||||||||
case ResolvedNamespace(_, Seq(dbName)) => Some(dbName) | ||||||||||||||||||||||||||
case _ => | ||||||||||||||||||||||||||
assert(resolved.namespace.length > 1) | ||||||||||||||||||||||||||
throw QueryCompilationErrors.nestedDatabaseUnsupportedByV1SessionCatalogError( | ||||||||||||||||||||||||||
resolved.namespace.map(quoteIfNeeded).mkString(".")) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Use this object to help match commands that do not have a v2 implementation. | ||||||||||||||||||||||||||
private object ResolvedDatabaseInSessionCatalog { | ||||||||||||||||||||||||||
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { | ||||||||||||||||||||||||||
case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None | ||||||||||||||||||||||||||
case ResolvedNamespace(_, Seq()) => | ||||||||||||||||||||||||||
|
@@ -625,11 +653,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |||||||||||||||||||||||||
|
||||||||||||||||||||||||||
private object DatabaseNameInSessionCatalog { | ||||||||||||||||||||||||||
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { | ||||||||||||||||||||||||||
case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None | ||||||||||||||||||||||||||
case ResolvedNamespace(catalog, _) if !supportsV1Command(catalog) => None | ||||||||||||||||||||||||||
case ResolvedNamespace(_, Seq(dbName)) => Some(dbName) | ||||||||||||||||||||||||||
case _ => | ||||||||||||||||||||||||||
assert(resolved.namespace.length > 1) | ||||||||||||||||||||||||||
throw QueryCompilationErrors.invalidDatabaseNameError(resolved.namespace.quoted) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
private def supportsV1Command(catalog: CatalogPlugin): Boolean = { | ||||||||||||||||||||||||||
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) && | ||||||||||||||||||||||||||
!SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.