diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 29c2da307a0f6..b990f59bfd90e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -46,7 +46,7 @@ public interface TableCatalog extends CatalogPlugin { /** * A reserved property to specify the location of the table. The files of the table - * should be under this location. + * should be under this location. The location is a Hadoop Path string. */ String PROP_LOCATION = "location"; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala index da201e816497c..8928ba57f06c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala @@ -22,7 +22,7 @@ import java.util import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TableIdentifierHelper import org.apache.spark.sql.connector.catalog.V1Table.addV2TableProperties import org.apache.spark.sql.connector.expressions.{LogicalExpressions, Transform} @@ -38,7 +38,7 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table { lazy val options: Map[String, String] = { v1Table.storage.locationUri match { case Some(uri) => - v1Table.storage.properties + ("path" -> uri.toString) + v1Table.storage.properties + ("path" -> CatalogUtils.URIToString(uri)) case _ => v1Table.storage.properties } @@ -81,7 +81,9 @@ private[sql] object V1Table { TableCatalog.OPTION_PREFIX + key -> value } ++ v1Table.provider.map(TableCatalog.PROP_PROVIDER -> _) ++ v1Table.comment.map(TableCatalog.PROP_COMMENT -> _) ++ - v1Table.storage.locationUri.map(TableCatalog.PROP_LOCATION -> _.toString) ++ + v1Table.storage.locationUri.map { loc => + TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(loc) + } ++ (if (managed) Some(TableCatalog.PROP_IS_MANAGED_LOCATION -> "true") else None) ++ (if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++ Some(TableCatalog.PROP_OWNER -> v1Table.owner) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 8c679c4d57fc3..aee243b6529da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -361,8 +361,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) serdeProperties, partitionSpec) - case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) => - AlterTableSetLocationCommand(ident, partitionSpec, location) + case SetTableLocation(ResolvedV1TableIdentifier(ident), None, location) => + AlterTableSetLocationCommand(ident, None, location) + + // V2 catalog doesn't support setting partition location yet, we must use v1 command here. + case SetTableLocation( + ResolvedTable(catalog, _, t: V1Table, _), + Some(partitionSpec), + location) if isSessionCatalog(catalog) => + AlterTableSetLocationCommand(t.v1Table.identifier, Some(partitionSpec), location) case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) => AlterViewAsCommand(ident, originalText, query) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 5fd4aa970a62a..d46c5116e6151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.internal.Logging @@ -105,8 +106,19 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = { - tableSpec.withNewLocation(tableSpec.location.map(loc => CatalogUtils.makeQualifiedPath( - CatalogUtils.stringToURI(loc), hadoopConf).toString)) + val newLoc = tableSpec.location.map { loc => + val locationUri = CatalogUtils.stringToURI(loc) + val qualified = if (locationUri.isAbsolute) { + locationUri + } else if (new Path(locationUri).isAbsolute) { + CatalogUtils.makeQualifiedPath(locationUri, hadoopConf) + } else { + // Leave it to the catalog implementation to qualify relative paths. + locationUri + } + CatalogUtils.URIToString(qualified) + } + tableSpec.withNewLocation(newLoc) } override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { diff --git a/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out b/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out index dcb96b9d2dce6..e1f4e3068b458 100644 --- a/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out @@ -78,7 +78,7 @@ CREATE TABLE spark_catalog.default.tbl ( b STRING, c INT) USING parquet -LOCATION 'file:///path/to/table' +LOCATION 'file:/path/to/table' -- !query @@ -108,7 +108,7 @@ CREATE TABLE spark_catalog.default.tbl ( b STRING, c INT) USING parquet -LOCATION 'file:///path/to/table' +LOCATION 'file:/path/to/table' -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 219c8e198fa00..79fbabbeacaa6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -84,7 +84,7 @@ class DataSourceV2DataFrameSessionCatalogSuite spark.range(20).write.format(v2Format).option("path", "/abc").saveAsTable(t1) val cat = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1)) - assert(tableInfo.properties().get("location") === "file:///abc") + assert(tableInfo.properties().get("location") === "file:/abc") assert(tableInfo.properties().get("provider") === v2Format) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 77e447062d40e..0638b50cfb9d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -432,6 +432,25 @@ class DataSourceV2SQLSuiteV1Filter } } + test("SPARK-49152: CreateTable should store location as qualified") { + val tbl = "testcat.table_name" + + def testWithLocation(location: String, qualified: String): Unit = { + withTable(tbl) { + sql(s"CREATE TABLE $tbl USING foo LOCATION '$location'") + val loc = catalog("testcat").asTableCatalog + .loadTable(Identifier.of(Array.empty, "table_name")) + .properties().get(TableCatalog.PROP_LOCATION) + assert(loc === qualified) + } + } + + testWithLocation("/absolute/path", "file:/absolute/path") + testWithLocation("s3://host/full/path", "s3://host/full/path") + testWithLocation("relative/path", "relative/path") + testWithLocation("/path/special+ char", "file:/path/special+ char") + } + test("SPARK-37545: CreateTableAsSelect should store location as qualified") { val basicIdentifier = "testcat.table_name" val atomicIdentifier = "testcat_atomic.table_name" @@ -442,7 +461,7 @@ class DataSourceV2SQLSuiteV1Filter val location = spark.sql(s"DESCRIBE EXTENDED $identifier") .filter("col_name = 'Location'") .select("data_type").head().getString(0) - assert(location === "file:///tmp/foo") + assert(location === "file:/tmp/foo") } } } @@ -459,7 +478,7 @@ class DataSourceV2SQLSuiteV1Filter val location = spark.sql(s"DESCRIBE EXTENDED $identifier") .filter("col_name = 'Location'") .select("data_type").head().getString(0) - assert(location === "file:///tmp/foo") + assert(location === "file:/tmp/foo") } } } @@ -1357,8 +1376,7 @@ class DataSourceV2SQLSuiteV1Filter val identifier = Identifier.of(Array(), "reservedTest") val location = tableCatalog.loadTable(identifier).properties() .get(TableCatalog.PROP_LOCATION) - assert(location.startsWith("file:") && location.endsWith("foo"), - "path as a table property should not have side effects") + assert(location == "foo", "path as a table property should not have side effects") assert(tableCatalog.loadTable(identifier).properties().get("path") == "bar", "path as a table property should not have side effects") assert(tableCatalog.loadTable(identifier).properties().get("Path") == "noop", @@ -3148,7 +3166,7 @@ class DataSourceV2SQLSuiteV1Filter val properties = table.properties assert(properties.get(TableCatalog.PROP_PROVIDER) == "parquet") assert(properties.get(TableCatalog.PROP_COMMENT) == "This is a comment") - assert(properties.get(TableCatalog.PROP_LOCATION) == "file:///tmp") + assert(properties.get(TableCatalog.PROP_LOCATION) == "file:/tmp") assert(properties.containsKey(TableCatalog.PROP_OWNER)) assert(properties.get(TableCatalog.PROP_EXTERNAL) == "true") assert(properties.get(s"${TableCatalog.OPTION_PREFIX}from") == "0") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala index fec33d811b461..adda9dcfffe46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala @@ -106,7 +106,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command "'via' = '2')", "PARTITIONED BY (a)", "COMMENT 'This is a comment'", - "LOCATION 'file:///tmp'", + "LOCATION 'file:/tmp'", "TBLPROPERTIES (", "'password' = '*********(redacted)',", "'prop1' = '1',", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index c70675497064e..c6bf220e45d52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -754,7 +754,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(table.properties().get("comment").equals(description)) assert(table.properties().get("path").equals(dir.getAbsolutePath)) assert(table.properties().get("external").equals("true")) - assert(table.properties().get("location").equals("file://" + dir.getAbsolutePath)) + assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath)) } }