Skip to content

Commit

Permalink
[SPARK-49152][SQL] V2SessionCatalog should use V2Command
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

V2SessionCatalog should use V2Command when possible.

### Why are the changes needed?

This is because the session catalog can be overwritten thus the overwritten's catalog should use v2 commands, otherwise the V1Command will still call hive metastore or the built-in session catalog.
### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

NO

Closes #47724 from amaliujia/branch-3.5.

Lead-authored-by: Rui Wang <rui.wang@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
amaliujia and cloud-fan committed Aug 13, 2024
1 parent 3237b8e commit d824219
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -66,7 +66,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS")

case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _)
if isSessionCatalog(catalog) =>
if supportsV1Command(catalog) =>
if (a.column.name.length > 1) {
throw QueryCompilationErrors.unsupportedTableOperationError(
catalog, ident, "ALTER COLUMN with qualified column")
Expand Down Expand Up @@ -117,13 +117,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true)

case DescribeNamespace(DatabaseInSessionCatalog(db), extended, output) if conf.useV1Command =>
case DescribeNamespace(ResolvedV1Database(db), extended, output) if conf.useV1Command =>
DescribeDatabaseCommand(db, extended, output)

case SetNamespaceProperties(DatabaseInSessionCatalog(db), properties) if conf.useV1Command =>
case SetNamespaceProperties(ResolvedV1Database(db), properties) if conf.useV1Command =>
AlterDatabasePropertiesCommand(db, properties)

case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if conf.useV1Command =>
case SetNamespaceLocation(ResolvedV1Database(db), location) if conf.useV1Command =>
if (StringUtils.isEmpty(location)) {
throw QueryExecutionErrors.invalidEmptyLocationError(location)
}
Expand Down Expand Up @@ -218,7 +218,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) =>
DropTempViewCommand(ident)

case DropView(ResolvedV1Identifier(ident), ifExists) =>
case DropView(ResolvedIdentifierInSessionCatalog(ident), ifExists) =>
DropTableCommand(ident, ifExists, isView = true, purge = false)

case DropView(r @ ResolvedIdentifier(catalog, ident), _) =>
Expand All @@ -237,14 +237,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties)

case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) if conf.useV1Command =>
case d @ DropNamespace(ResolvedV1Database(db), _, _) if conf.useV1Command =>
DropDatabaseCommand(db, d.ifExists, d.cascade)

case ShowTables(DatabaseInSessionCatalog(db), pattern, output) if conf.useV1Command =>
case ShowTables(ResolvedV1Database(db), pattern, output) if conf.useV1Command =>
ShowTablesCommand(Some(db), pattern, output)

case ShowTableExtended(
DatabaseInSessionCatalog(db),
ResolvedV1Database(db),
pattern,
partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _))),
output) =>
Expand All @@ -265,7 +265,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AnalyzePartitionCommand(ident, partitionSpec, noScan)
}

case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) =>
case AnalyzeTables(ResolvedV1Database(db), noScan) =>
AnalyzeTablesCommand(Some(db), noScan)

case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
Expand Down Expand Up @@ -293,7 +293,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
if conf.useV1Command => ShowCreateTableCommand(ident, output)

case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output)
if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
if supportsV1Command(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
ShowCreateTableCommand(table.catalogTable.identifier, output)

case TruncateTable(ResolvedV1TableIdentifier(ident)) =>
Expand Down Expand Up @@ -367,7 +367,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
AlterViewAsCommand(ident, originalText, query)

case CreateView(ResolvedV1Identifier(ident), userSpecifiedColumns, comment,
case CreateView(ResolvedIdentifierInSessionCatalog(ident), userSpecifiedColumns, comment,
properties, originalText, child, allowExisting, replace) =>
CreateViewCommand(
name = ident,
Expand All @@ -385,7 +385,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

case ShowViews(ns: ResolvedNamespace, pattern, output) =>
ns match {
case DatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output)
case ResolvedDatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output)
case _ =>
throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views")
}
Expand All @@ -408,7 +408,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "functions")
}

case ShowFunctions(DatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) =>
case ShowFunctions(
ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) =>
ShowFunctionsCommand(db, pattern, userScope, systemScope, output)

case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) =>
Expand All @@ -429,7 +430,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION")
}

case CreateFunction(ResolvedV1Identifier(ident), className, resources, ifExists, replace) =>
case CreateFunction(
ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) =>
CreateFunctionCommand(
FunctionIdentifier(ident.table, ident.database, ident.catalog),
className,
Expand Down Expand Up @@ -564,7 +566,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

object ResolvedV1TableIdentifier {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) =>
case ResolvedTable(catalog, _, t: V1Table, _) if supportsV1Command(catalog) =>
Some(t.catalogTable.identifier)
case _ => None
}
Expand All @@ -579,6 +581,18 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}

object ResolvedV1Identifier {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedIdentifier(catalog, ident) if supportsV1Command(catalog) =>
if (ident.namespace().length != 1) {
throw QueryCompilationErrors.requiresSinglePartNamespaceError(ident.namespace())
}
Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
case _ => None
}
}

// Use this object to help match commands that do not have a v2 implementation.
object ResolvedIdentifierInSessionCatalog{
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) =>
if (ident.namespace().length != 1) {
Expand Down Expand Up @@ -610,7 +624,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
}

private object DatabaseInSessionCatalog {
private object ResolvedV1Database {
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
case ResolvedNamespace(catalog, _) if !supportsV1Command(catalog) => None
case ResolvedNamespace(_, Seq()) =>
throw QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError()
case ResolvedNamespace(_, Seq(dbName)) => Some(dbName)
case _ =>
assert(resolved.namespace.length > 1)
throw QueryCompilationErrors.nestedDatabaseUnsupportedByV1SessionCatalogError(
resolved.namespace.map(quoteIfNeeded).mkString("."))
}
}

// Use this object to help match commands that do not have a v2 implementation.
private object ResolvedDatabaseInSessionCatalog {
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None
case ResolvedNamespace(_, Seq()) =>
Expand All @@ -625,11 +653,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

private object DatabaseNameInSessionCatalog {
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None
case ResolvedNamespace(catalog, _) if !supportsV1Command(catalog) => None
case ResolvedNamespace(_, Seq(dbName)) => Some(dbName)
case _ =>
assert(resolved.namespace.length > 1)
throw QueryCompilationErrors.invalidDatabaseNameError(resolved.namespace.quoted)
}
}

private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) &&
!SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
}

private def hadoopConf = session.sessionState.newHadoopConf()

private def refreshCache(r: DataSourceV2Relation)(): Unit = {
session.sharedState.cacheManager.recacheByPlan(session, r)
}
Expand Down Expand Up @@ -103,7 +105,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}

private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
tableSpec.withNewLocation(tableSpec.location.map(makeQualifiedDBObjectPath(_)))
tableSpec.withNewLocation(tableSpec.location.map(loc => CatalogUtils.makeQualifiedPath(
CatalogUtils.stringToURI(loc), hadoopConf).toString))
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
spark.range(20).write.format(v2Format).option("path", "/abc").saveAsTable(t1)
val cat = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1))
assert(tableInfo.properties().get("location") === "file:/abc")
assert(tableInfo.properties().get("location") === "file:///abc")
assert(tableInfo.properties().get("provider") === v2Format)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ class DataSourceV2SQLSuiteV1Filter
"AS SELECT id FROM source")
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head.getString(0)
assert(location === "file:/tmp/foo")
.select("data_type").head().getString(0)
assert(location === "file:///tmp/foo")
}
}
}
Expand All @@ -458,8 +458,8 @@ class DataSourceV2SQLSuiteV1Filter
"AS SELECT id FROM source")
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head.getString(0)
assert(location === "file:/tmp/foo")
.select("data_type").head().getString(0)
assert(location === "file:///tmp/foo")
}
}
}
Expand Down Expand Up @@ -2068,15 +2068,10 @@ class DataSourceV2SQLSuiteV1Filter
}

test("REPLACE TABLE: v1 table") {
val e = intercept[AnalysisException] {
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
}
checkError(
exception = e,
errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
sqlState = "0A000",
parameters = Map("tableName" -> "`spark_catalog`.`default`.`tbl`",
"operation" -> "REPLACE TABLE"))
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
val v2Catalog = catalog("spark_catalog").asTableCatalog
val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl"))
assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName)
}

test("DeleteFrom: - delete with invalid predicate") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,22 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating
partitions: Array[Transform],
properties: java.util.Map[String, String]): Table = {
val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY
val propsWithLocation = if (properties.containsKey(key)) {
val newProps = new java.util.HashMap[String, String]()
newProps.putAll(properties)
if (properties.containsKey(TableCatalog.PROP_LOCATION)) {
newProps.put(TableCatalog.PROP_EXTERNAL, "true")
}

val propsWithLocation = if (newProps.containsKey(key)) {
// Always set a location so that CREATE EXTERNAL TABLE won't fail with LOCATION not specified.
if (!properties.containsKey(TableCatalog.PROP_LOCATION)) {
val newProps = new java.util.HashMap[String, String]()
newProps.putAll(properties)
if (!newProps.containsKey(TableCatalog.PROP_LOCATION)) {
newProps.put(TableCatalog.PROP_LOCATION, "file:/abc")
newProps
} else {
properties
newProps
}
} else {
properties
newProps
}
val created = super.createTable(ident, schema, partitions, propsWithLocation)
val t = newTable(created.name(), schema, partitions, propsWithLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command
"'via' = '2')",
"PARTITIONED BY (a)",
"COMMENT 'This is a comment'",
"LOCATION 'file:/tmp'",
"LOCATION 'file:///tmp'",
"TBLPROPERTIES (",
"'password' = '*********(redacted)',",
"'prop1' = '1',",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(table.properties().get("comment").equals(description))
assert(table.properties().get("path").equals(dir.getAbsolutePath))
assert(table.properties().get("external").equals("true"))
assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath))
assert(table.properties().get("location").equals("file://" + dir.getAbsolutePath))
}
}

Expand Down

0 comments on commit d824219

Please sign in to comment.