Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49152][SQL] V2SessionCatalog should use V2Command #47724

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -66,7 +66,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS")

case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _)
if isSessionCatalog(catalog) =>
if supportsV1Command(catalog) =>
if (a.column.name.length > 1) {
throw QueryCompilationErrors.unsupportedTableOperationError(
catalog, ident, "ALTER COLUMN with qualified column")
Expand Down Expand Up @@ -117,13 +117,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true)

case DescribeNamespace(DatabaseInSessionCatalog(db), extended, output) if conf.useV1Command =>
case DescribeNamespace(ResolvedV1Database(db), extended, output) if conf.useV1Command =>
DescribeDatabaseCommand(db, extended, output)

case SetNamespaceProperties(DatabaseInSessionCatalog(db), properties) if conf.useV1Command =>
case SetNamespaceProperties(ResolvedV1Database(db), properties) if conf.useV1Command =>
AlterDatabasePropertiesCommand(db, properties)

case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if conf.useV1Command =>
case SetNamespaceLocation(ResolvedV1Database(db), location) if conf.useV1Command =>
if (StringUtils.isEmpty(location)) {
throw QueryExecutionErrors.invalidEmptyLocationError(location)
}
Expand Down Expand Up @@ -218,7 +218,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) =>
DropTempViewCommand(ident)

case DropView(ResolvedV1Identifier(ident), ifExists) =>
case DropView(ResolvedIdentifierInSessionCatalog(ident), ifExists) =>
DropTableCommand(ident, ifExists, isView = true, purge = false)

case DropView(r @ ResolvedIdentifier(catalog, ident), _) =>
Expand All @@ -237,14 +237,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties)

case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) if conf.useV1Command =>
case d @ DropNamespace(ResolvedV1Database(db), _, _) if conf.useV1Command =>
DropDatabaseCommand(db, d.ifExists, d.cascade)

case ShowTables(DatabaseInSessionCatalog(db), pattern, output) if conf.useV1Command =>
case ShowTables(ResolvedV1Database(db), pattern, output) if conf.useV1Command =>
ShowTablesCommand(Some(db), pattern, output)

case ShowTableExtended(
DatabaseInSessionCatalog(db),
ResolvedV1Database(db),
pattern,
partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _))),
output) =>
Expand All @@ -265,7 +265,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AnalyzePartitionCommand(ident, partitionSpec, noScan)
}

case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) =>
case AnalyzeTables(ResolvedV1Database(db), noScan) =>
AnalyzeTablesCommand(Some(db), noScan)

case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
Expand Down Expand Up @@ -293,7 +293,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
if conf.useV1Command => ShowCreateTableCommand(ident, output)

case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output)
if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
if supportsV1Command(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
ShowCreateTableCommand(table.catalogTable.identifier, output)

case TruncateTable(ResolvedV1TableIdentifier(ident)) =>
Expand Down Expand Up @@ -367,7 +367,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
AlterViewAsCommand(ident, originalText, query)

case CreateView(ResolvedV1Identifier(ident), userSpecifiedColumns, comment,
case CreateView(ResolvedIdentifierInSessionCatalog(ident), userSpecifiedColumns, comment,
properties, originalText, child, allowExisting, replace) =>
CreateViewCommand(
name = ident,
Expand All @@ -385,7 +385,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

case ShowViews(ns: ResolvedNamespace, pattern, output) =>
ns match {
case DatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output)
case ResolvedDatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output)
case _ =>
throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views")
}
Expand All @@ -408,7 +408,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "functions")
}

case ShowFunctions(DatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) =>
case ShowFunctions(
ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) =>
ShowFunctionsCommand(db, pattern, userScope, systemScope, output)

case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) =>
Expand All @@ -429,7 +430,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION")
}

case CreateFunction(ResolvedV1Identifier(ident), className, resources, ifExists, replace) =>
case CreateFunction(
ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) =>
CreateFunctionCommand(
FunctionIdentifier(ident.table, ident.database, ident.catalog),
className,
Expand Down Expand Up @@ -564,7 +566,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

object ResolvedV1TableIdentifier {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) =>
case ResolvedTable(catalog, _, t: V1Table, _) if supportsV1Command(catalog) =>
Some(t.catalogTable.identifier)
case _ => None
}
Expand All @@ -579,6 +581,18 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}

object ResolvedV1Identifier {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedIdentifier(catalog, ident) if supportsV1Command(catalog) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amaliujia @cloud-fan
This change looks to have broken creating V1 table from a V2_SESSION_CATALOG_IMPLEMENTATION like Iceberg's SparkSessionCatalog

// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
ctas = false)
if (!isV2Provider(provider)) {
constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning,
c.ignoreIfExists, storageFormat, provider)
} else {
c
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Iceberg catalog extend DelegatingCatalogExtension?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do want to use v2 commands for custom catalogs that do not extend DelegatingCatalogExtension

Copy link
Contributor

@manuzhang manuzhang Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Iceberg catalog extend DelegatingCatalogExtension?

Nope.

We do want to use v2 commands for custom catalogs that do not extend DelegatingCatalogExtension

Even so, is it the right time to introduce such a behavior change in a bug fix release?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider it as a bug. People implementing DS V2 catalog APIs expect to see v2 commands to customize the table behaviors. And there is a backdoor: DelegatingCatalogExtension.

For iceberg, it should be easy to work around it by extending DelegatingCatalogExtension? Iceberg catalog can still keep all its methods unchanged, don't use the delegate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg's SparkSessionCatalog already extends a base class. There's no easy way to extend DelegatingCatalogExtension without a major refactoring.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make either the iceberg BaseCatalog or the Spark DelegatingCatalogExtension an interface. It looks easier to make BaseCatalog an interface?

if (ident.namespace().length != 1) {
throw QueryCompilationErrors.requiresSinglePartNamespaceError(ident.namespace())
}
Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
case _ => None
}
}

// Use this object to help match commands that do not have a v2 implementation.
object ResolvedIdentifierInSessionCatalog{
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) =>
if (ident.namespace().length != 1) {
Expand Down Expand Up @@ -610,7 +624,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
}

private object DatabaseInSessionCatalog {
private object ResolvedV1Database {
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
case ResolvedNamespace(catalog, _) if !supportsV1Command(catalog) => None
case ResolvedNamespace(_, Seq()) =>
throw QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError()
case ResolvedNamespace(_, Seq(dbName)) => Some(dbName)
case _ =>
assert(resolved.namespace.length > 1)
throw QueryCompilationErrors.nestedDatabaseUnsupportedByV1SessionCatalogError(
resolved.namespace.map(quoteIfNeeded).mkString("."))
}
}

// Use this object to help match commands that do not have a v2 implementation.
private object ResolvedDatabaseInSessionCatalog {
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None
case ResolvedNamespace(_, Seq()) =>
Expand All @@ -625,11 +653,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

private object DatabaseNameInSessionCatalog {
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None
case ResolvedNamespace(catalog, _) if !supportsV1Command(catalog) => None
case ResolvedNamespace(_, Seq(dbName)) => Some(dbName)
case _ =>
assert(resolved.namespace.length > 1)
throw QueryCompilationErrors.invalidDatabaseNameError(resolved.namespace.quoted)
}
}

private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) &&
!SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
}

private def hadoopConf = session.sessionState.newHadoopConf()

private def refreshCache(r: DataSourceV2Relation)(): Unit = {
session.sharedState.cacheManager.recacheByPlan(session, r)
}
Expand Down Expand Up @@ -103,7 +105,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}

private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
tableSpec.withNewLocation(tableSpec.location.map(makeQualifiedDBObjectPath(_)))
tableSpec.withNewLocation(tableSpec.location.map(loc => CatalogUtils.makeQualifiedPath(
CatalogUtils.stringToURI(loc), hadoopConf).toString))
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
spark.range(20).write.format(v2Format).option("path", "/abc").saveAsTable(t1)
val cat = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1))
assert(tableInfo.properties().get("location") === "file:/abc")
assert(tableInfo.properties().get("location") === "file:///abc")
assert(tableInfo.properties().get("provider") === v2Format)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ class DataSourceV2SQLSuiteV1Filter
"AS SELECT id FROM source")
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head.getString(0)
assert(location === "file:/tmp/foo")
.select("data_type").head().getString(0)
assert(location === "file:///tmp/foo")
}
}
}
Expand All @@ -458,8 +458,8 @@ class DataSourceV2SQLSuiteV1Filter
"AS SELECT id FROM source")
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head.getString(0)
assert(location === "file:/tmp/foo")
.select("data_type").head().getString(0)
assert(location === "file:///tmp/foo")
}
}
}
Expand Down Expand Up @@ -2068,15 +2068,10 @@ class DataSourceV2SQLSuiteV1Filter
}

test("REPLACE TABLE: v1 table") {
val e = intercept[AnalysisException] {
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
}
checkError(
exception = e,
errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
sqlState = "0A000",
parameters = Map("tableName" -> "`spark_catalog`.`default`.`tbl`",
"operation" -> "REPLACE TABLE"))
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
val v2Catalog = catalog("spark_catalog").asTableCatalog
val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl"))
assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName)
}

test("DeleteFrom: - delete with invalid predicate") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,22 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating
partitions: Array[Transform],
properties: java.util.Map[String, String]): Table = {
val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY
val propsWithLocation = if (properties.containsKey(key)) {
val newProps = new java.util.HashMap[String, String]()
newProps.putAll(properties)
if (properties.containsKey(TableCatalog.PROP_LOCATION)) {
newProps.put(TableCatalog.PROP_EXTERNAL, "true")
}

val propsWithLocation = if (newProps.containsKey(key)) {
// Always set a location so that CREATE EXTERNAL TABLE won't fail with LOCATION not specified.
if (!properties.containsKey(TableCatalog.PROP_LOCATION)) {
val newProps = new java.util.HashMap[String, String]()
newProps.putAll(properties)
if (!newProps.containsKey(TableCatalog.PROP_LOCATION)) {
newProps.put(TableCatalog.PROP_LOCATION, "file:/abc")
newProps
} else {
properties
newProps
}
} else {
properties
newProps
}
val created = super.createTable(ident, schema, partitions, propsWithLocation)
val t = newTable(created.name(), schema, partitions, propsWithLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command
"'via' = '2')",
"PARTITIONED BY (a)",
"COMMENT 'This is a comment'",
"LOCATION 'file:/tmp'",
"LOCATION 'file:///tmp'",
"TBLPROPERTIES (",
"'password' = '*********(redacted)',",
"'prop1' = '1',",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(table.properties().get("comment").equals(description))
assert(table.properties().get("path").equals(dir.getAbsolutePath))
assert(table.properties().get("external").equals("true"))
assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath))
assert(table.properties().get("location").equals("file://" + dir.getAbsolutePath))
}
}

Expand Down
Loading