Skip to content

Commit

Permalink
[SPARK-49183][SQL] V2SessionCatalog.createTable should respect PROP_I…
Browse files Browse the repository at this point in the history
…S_MANAGED_LOCATION

Even if the table definition has a location, the table should be a managed table if `PROP_IS_MANAGED_LOCATION` is specified, in `V2SessionCatalog.createTable`.

It's a bug fix. A custom `spark_catalog` may generate custom location for managed table and delegate the actual table creation to `V2SessionCatalog`. The table should still be a managed table if `PROP_IS_MANAGED_LOCATION` is specified.

Yes, now users who use custom `spark_catalog` that generates custom location for managed table, can correctly create managed tables.

a new test

no

Closes #47684 from cloud-fan/table.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit ed04e16)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Aug 12, 2024
1 parent 7bfb4f0 commit 204dd81
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public interface TableCatalog extends CatalogPlugin {

/**
* A reserved property to indicate that the table location is managed, not user-specified.
* If this property is "true", SHOW CREATE TABLE will not generate the LOCATION clause.
* If this property is "true", it means it's a managed table even if it has a location. As an
* example, SHOW CREATE TABLE will not generate the LOCATION clause.
*/
String PROP_IS_MANAGED_LOCATION = "is_managed_location";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ case class ShowCreateTableExec(
private def showTableLocation(table: Table, builder: StringBuilder): Unit = {
val isManagedOption = Option(table.properties.get(TableCatalog.PROP_IS_MANAGED_LOCATION))
// Only generate LOCATION clause if it's not managed.
if (isManagedOption.forall(_.equalsIgnoreCase("false"))) {
if (isManagedOption.isEmpty || !isManagedOption.get.equalsIgnoreCase("true")) {
Option(table.properties.get(TableCatalog.PROP_LOCATION))
.map("LOCATION '" + escapeSingleQuotedString(_) + "'\n")
.foreach(builder.append)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ class V2SessionCatalog(catalog: SessionCatalog)
val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
.copy(locationUri = location.map(CatalogUtils.stringToURI))
val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
val tableType = if (isExternal || location.isDefined) {
val isManagedLocation = Option(properties.get(TableCatalog.PROP_IS_MANAGED_LOCATION))
.exists(_.equalsIgnoreCase("true"))
val tableType = if (isExternal || (location.isDefined && !isManagedLocation)) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.connector

import java.sql.Timestamp
import java.time.{Duration, LocalDate, Period}
import java.util
import java.util.Locale

import scala.collection.JavaConverters._
Expand All @@ -35,7 +36,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, _}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform}
import org.apache.spark.sql.errors.QueryErrorsBase
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand Down Expand Up @@ -3340,12 +3341,29 @@ class DataSourceV2SQLSuiteV1Filter
}

test("SPARK-49099: Switch current schema with custom spark_catalog") {
// Reset CatalogManager to clear the materialized `spark_catalog` instance, so that we can
// configure a new implementation.
spark.sessionState.catalogManager.reset()
withSQLConf(V2_SESSION_CATALOG_IMPLEMENTATION.key -> classOf[InMemoryCatalog].getName) {
sql("CREATE DATABASE test_db")
sql("USE test_db")
}
}

test("SPARK-49183: custom spark_catalog generates location for managed tables") {
// Reset CatalogManager to clear the materialized `spark_catalog` instance, so that we can
// configure a new implementation.
spark.sessionState.catalogManager.reset()
withSQLConf(V2_SESSION_CATALOG_IMPLEMENTATION.key -> classOf[SimpleDelegatingCatalog].getName) {
withTable("t") {
sql(s"CREATE TABLE t (i INT) USING $v2Format")
val table = catalog(SESSION_CATALOG_NAME).asTableCatalog
.loadTable(Identifier.of(Array("default"), "t"))
assert(!table.properties().containsKey(TableCatalog.PROP_EXTERNAL))
}
}
}

private def testNotSupportedV2Command(
sqlCommand: String,
sqlParams: String,
Expand Down Expand Up @@ -3374,3 +3392,17 @@ class FakeV2Provider extends SimpleTableProvider {
class ReserveSchemaNullabilityCatalog extends InMemoryCatalog {
override def useNullableQuerySchema(): Boolean = false
}

class SimpleDelegatingCatalog extends DelegatingCatalogExtension {
override def createTable(
ident: Identifier,
columns: Array[ColumnV2],
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
val newProps = new util.HashMap[String, String]
newProps.putAll(properties)
newProps.put(TableCatalog.PROP_LOCATION, "/tmp/test_path")
newProps.put(TableCatalog.PROP_IS_MANAGED_LOCATION, "true")
super.createTable(ident, columns, partitions, newProps)
}
}

0 comments on commit 204dd81

Please sign in to comment.