diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index d569f1ed484cc..02ad2e79a5645 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, DelegatingCatalogExtension, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command._ @@ -284,10 +284,20 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) => AnalyzeColumnCommand(ident, columnNames, allColumns) - case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) => + // V2 catalog doesn't support REPAIR TABLE yet, we must use v1 command here. + case RepairTable( + ResolvedV1TableIdentifierInSessionCatalog(ident), + addPartitions, + dropPartitions) => RepairTableCommand(ident, addPartitions, dropPartitions) - case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) => + // V2 catalog doesn't support LOAD DATA yet, we must use v1 command here. + case LoadData( + ResolvedV1TableIdentifierInSessionCatalog(ident), + path, + isLocal, + isOverwrite, + partition) => LoadDataCommand( ident, path, @@ -336,7 +346,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } ShowColumnsCommand(db, v1TableName, output) - case RecoverPartitions(ResolvedV1TableIdentifier(ident)) => + // V2 catalog doesn't support RECOVER PARTITIONS yet, we must use v1 command here. + case RecoverPartitions(ResolvedV1TableIdentifierInSessionCatalog(ident)) => RepairTableCommand( ident, enableAddPartitions = true, @@ -364,8 +375,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) purge, retainData = false) + // V2 catalog doesn't support setting serde properties yet, we must use v1 command here. case SetTableSerDeProperties( - ResolvedV1TableIdentifier(ident), + ResolvedV1TableIdentifierInSessionCatalog(ident), serdeClassName, serdeProperties, partitionSpec) => @@ -380,10 +392,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // V2 catalog doesn't support setting partition location yet, we must use v1 command here. case SetTableLocation( - ResolvedTable(catalog, _, t: V1Table, _), + ResolvedV1TableIdentifierInSessionCatalog(ident), Some(partitionSpec), - location) if isSessionCatalog(catalog) => - AlterTableSetLocationCommand(t.v1Table.identifier, Some(partitionSpec), location) + location) => + AlterTableSetLocationCommand(ident, Some(partitionSpec), location) case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) => AlterViewAsCommand(ident, originalText, query) @@ -600,6 +612,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } } + object ResolvedV1TableIdentifierInSessionCatalog { + def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { + case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) => + Some(t.catalogTable.identifier) + case _ => None + } + } + object ResolvedV1TableOrViewIdentifier { def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { case ResolvedV1TableIdentifier(ident) => Some(ident) @@ -684,7 +704,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } private def supportsV1Command(catalog: CatalogPlugin): Boolean = { - isSessionCatalog(catalog) && - SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty + isSessionCatalog(catalog) && ( + SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty || + catalog.isInstanceOf[DelegatingCatalogExtension]) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala index 95624f3f61c5c..7463eb34d17ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala @@ -71,4 +71,12 @@ class DataSourceV2SQLSessionCatalogSuite sql(s"CREATE EXTERNAL TABLE t (i INT) USING $v2Format TBLPROPERTIES($prop)") } } + + test("SPARK-49152: partition columns should be put at the end") { + withTable("t") { + sql("CREATE TABLE t (c1 INT, c2 INT) USING json PARTITIONED BY (c1)") + // partition columns should be put at the end. + assert(getTableMetadata("default.t").columns().map(_.name()) === Seq("c2", "c1")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 1d37c6aa4eb7f..922bf01b541a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2125,10 +2125,18 @@ class DataSourceV2SQLSuiteV1Filter } test("REPLACE TABLE: v1 table") { - sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") - val v2Catalog = catalog("spark_catalog").asTableCatalog - val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl")) - assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName) + val e = intercept[AnalysisException] { + sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") + } + checkError( + exception = e, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + sqlState = "0A000", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`tbl`", + "operation" -> "REPLACE TABLE" + ) + ) } test("DeleteFrom: - delete with invalid predicate") {