Skip to content

Commit

Permalink
[SPARK-49152][SQL][FOLLOWUP][3.5] table location string should be Had…
Browse files Browse the repository at this point in the history
…oop Path string

### What changes were proposed in this pull request?

This is a followup of #47660 to restore the behavior change. The table location string should be Hadoop Path string instead of URL string which escapes all special chars.

### Why are the changes needed?

restore the unintentional behavior change.

### Does this PR introduce _any_ user-facing change?

No, it's not released yet

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #47765 from cloud-fan/fix.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Aug 15, 2024
1 parent bd2cbd6 commit 8d05bf2
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface TableCatalog extends CatalogPlugin {

/**
* A reserved property to specify the location of the table. The files of the table
* should be under this location.
* should be under this location. The location is a Hadoop Path string.
*/
String PROP_LOCATION = "location";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TableIdentifierHelper
import org.apache.spark.sql.connector.catalog.V1Table.addV2TableProperties
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, Transform}
Expand All @@ -38,7 +38,7 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table {
lazy val options: Map[String, String] = {
v1Table.storage.locationUri match {
case Some(uri) =>
v1Table.storage.properties + ("path" -> uri.toString)
v1Table.storage.properties + ("path" -> CatalogUtils.URIToString(uri))
case _ =>
v1Table.storage.properties
}
Expand Down Expand Up @@ -81,7 +81,9 @@ private[sql] object V1Table {
TableCatalog.OPTION_PREFIX + key -> value } ++
v1Table.provider.map(TableCatalog.PROP_PROVIDER -> _) ++
v1Table.comment.map(TableCatalog.PROP_COMMENT -> _) ++
v1Table.storage.locationUri.map(TableCatalog.PROP_LOCATION -> _.toString) ++
v1Table.storage.locationUri.map { loc =>
TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(loc)
} ++
(if (managed) Some(TableCatalog.PROP_IS_MANAGED_LOCATION -> "true") else None) ++
(if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++
Some(TableCatalog.PROP_OWNER -> v1Table.owner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
serdeProperties,
partitionSpec)

case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
AlterTableSetLocationCommand(ident, partitionSpec, location)
case SetTableLocation(ResolvedV1TableIdentifier(ident), None, location) =>
AlterTableSetLocationCommand(ident, None, location)

// V2 catalog doesn't support setting partition location yet, we must use v1 command here.
case SetTableLocation(
ResolvedTable(catalog, _, t: V1Table, _),
Some(partitionSpec),
location) if isSessionCatalog(catalog) =>
AlterTableSetLocationCommand(t.v1Table.identifier, Some(partitionSpec), location)

case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
AlterViewAsCommand(ident, originalText, query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.mutable

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -105,8 +106,19 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}

private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
tableSpec.withNewLocation(tableSpec.location.map(loc => CatalogUtils.makeQualifiedPath(
CatalogUtils.stringToURI(loc), hadoopConf).toString))
val newLoc = tableSpec.location.map { loc =>
val locationUri = CatalogUtils.stringToURI(loc)
val qualified = if (locationUri.isAbsolute) {
locationUri
} else if (new Path(locationUri).isAbsolute) {
CatalogUtils.makeQualifiedPath(locationUri, hadoopConf)
} else {
// Leave it to the catalog implementation to qualify relative paths.
locationUri
}
CatalogUtils.URIToString(qualified)
}
tableSpec.withNewLocation(newLoc)
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ CREATE TABLE spark_catalog.default.tbl (
b STRING,
c INT)
USING parquet
LOCATION 'file:///path/to/table'
LOCATION 'file:/path/to/table'


-- !query
Expand Down Expand Up @@ -108,7 +108,7 @@ CREATE TABLE spark_catalog.default.tbl (
b STRING,
c INT)
USING parquet
LOCATION 'file:///path/to/table'
LOCATION 'file:/path/to/table'


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
spark.range(20).write.format(v2Format).option("path", "/abc").saveAsTable(t1)
val cat = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1))
assert(tableInfo.properties().get("location") === "file:///abc")
assert(tableInfo.properties().get("location") === "file:/abc")
assert(tableInfo.properties().get("provider") === v2Format)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,25 @@ class DataSourceV2SQLSuiteV1Filter
}
}

test("SPARK-49152: CreateTable should store location as qualified") {
val tbl = "testcat.table_name"

def testWithLocation(location: String, qualified: String): Unit = {
withTable(tbl) {
sql(s"CREATE TABLE $tbl USING foo LOCATION '$location'")
val loc = catalog("testcat").asTableCatalog
.loadTable(Identifier.of(Array.empty, "table_name"))
.properties().get(TableCatalog.PROP_LOCATION)
assert(loc === qualified)
}
}

testWithLocation("/absolute/path", "file:/absolute/path")
testWithLocation("s3://host/full/path", "s3://host/full/path")
testWithLocation("relative/path", "relative/path")
testWithLocation("/path/special+ char", "file:/path/special+ char")
}

test("SPARK-37545: CreateTableAsSelect should store location as qualified") {
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Expand All @@ -442,7 +461,7 @@ class DataSourceV2SQLSuiteV1Filter
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head().getString(0)
assert(location === "file:///tmp/foo")
assert(location === "file:/tmp/foo")
}
}
}
Expand All @@ -459,7 +478,7 @@ class DataSourceV2SQLSuiteV1Filter
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head().getString(0)
assert(location === "file:///tmp/foo")
assert(location === "file:/tmp/foo")
}
}
}
Expand Down Expand Up @@ -1357,8 +1376,7 @@ class DataSourceV2SQLSuiteV1Filter
val identifier = Identifier.of(Array(), "reservedTest")
val location = tableCatalog.loadTable(identifier).properties()
.get(TableCatalog.PROP_LOCATION)
assert(location.startsWith("file:") && location.endsWith("foo"),
"path as a table property should not have side effects")
assert(location == "foo", "path as a table property should not have side effects")
assert(tableCatalog.loadTable(identifier).properties().get("path") == "bar",
"path as a table property should not have side effects")
assert(tableCatalog.loadTable(identifier).properties().get("Path") == "noop",
Expand Down Expand Up @@ -3148,7 +3166,7 @@ class DataSourceV2SQLSuiteV1Filter
val properties = table.properties
assert(properties.get(TableCatalog.PROP_PROVIDER) == "parquet")
assert(properties.get(TableCatalog.PROP_COMMENT) == "This is a comment")
assert(properties.get(TableCatalog.PROP_LOCATION) == "file:///tmp")
assert(properties.get(TableCatalog.PROP_LOCATION) == "file:/tmp")
assert(properties.containsKey(TableCatalog.PROP_OWNER))
assert(properties.get(TableCatalog.PROP_EXTERNAL) == "true")
assert(properties.get(s"${TableCatalog.OPTION_PREFIX}from") == "0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command
"'via' = '2')",
"PARTITIONED BY (a)",
"COMMENT 'This is a comment'",
"LOCATION 'file:///tmp'",
"LOCATION 'file:/tmp'",
"TBLPROPERTIES (",
"'password' = '*********(redacted)',",
"'prop1' = '1',",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(table.properties().get("comment").equals(description))
assert(table.properties().get("path").equals(dir.getAbsolutePath))
assert(table.properties().get("external").equals("true"))
assert(table.properties().get("location").equals("file://" + dir.getAbsolutePath))
assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath))
}
}

Expand Down

0 comments on commit 8d05bf2

Please sign in to comment.