diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 5435aad05e88a..cad30eca24469 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -763,4 +763,22 @@ class TestCreateTable extends HoodieSparkSqlTestBase { assertResult(true)(shown.contains("COMMENT 'This is a simple hudi table'")) } } + + test("Test CTAS using an illegal definition -- a COW table with compaction enabled.") { + val tableName = generateTableName + checkExceptionContain( + s""" + | create table $tableName using hudi + | tblproperties( + | primaryKey = 'id', + | type = 'cow', + | hoodie.compact.inline='true' + | ) + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + |""".stripMargin)("Compaction is not supported on a CopyOnWrite table") + val dbPath = spark.sessionState.catalog.getDatabaseMetadata("default").locationUri.getPath + val tablePath = s"${dbPath}/${tableName}" + assertResult(false)(existsPath(tablePath)) + } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala index 5f4572dcc9388..7550ccd29118a 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig} import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _} +import java.net.URI import java.util import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} @@ -49,7 +50,9 @@ class HoodieCatalog extends DelegatingCatalogExtension override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = { if (sparkAdapter.isHoodieTable(properties)) { - HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE) + val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties) + HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions, + properties, TableCreationMode.STAGE_CREATE) } else { BasicStagedTable( ident, @@ -60,7 +63,9 @@ class HoodieCatalog extends DelegatingCatalogExtension override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = { if (sparkAdapter.isHoodieTable(properties)) { - HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE) + val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties) + HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions, + properties, TableCreationMode.STAGE_REPLACE) } else { super.dropTable(ident) BasicStagedTable( @@ -75,8 +80,9 @@ class HoodieCatalog extends DelegatingCatalogExtension partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = { if (sparkAdapter.isHoodieTable(properties)) { - HoodieStagedTable( - ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE) + val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties) + HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions, + properties, TableCreationMode.CREATE_OR_REPLACE) } else { try super.dropTable(ident) catch { case _: NoSuchTableException => // ignore the exception @@ -111,7 +117,9 @@ class HoodieCatalog extends DelegatingCatalogExtension schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { - createHoodieTable(ident, schema, partitions, properties, Map.empty, Option.empty, TableCreationMode.CREATE) + val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties) + createHoodieTable(ident, schema, locUriAndTableType, partitions, properties, + Map.empty, Option.empty, TableCreationMode.CREATE) } override def tableExists(ident: Identifier): Boolean = super.tableExists(ident) @@ -192,8 +200,30 @@ class HoodieCatalog extends DelegatingCatalogExtension loadTable(ident) } + private def deduceTableLocationURIAndTableType( + ident: Identifier, properties: util.Map[String, String]): (URI, CatalogTableType) = { + val locOpt = if (isPathIdentifier(ident)) { + Option(ident.name()) + } else { + Option(properties.get("location")) + } + val tableType = if (locOpt.nonEmpty) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + val locUriOpt = locOpt.map(CatalogUtils.stringToURI) + val tableIdent = ident.asTableIdentifier + val existingTableOpt = getExistingTableIfExists(tableIdent) + val locURI = locUriOpt + .orElse(existingTableOpt.flatMap(_.storage.locationUri)) + .getOrElse(spark.sessionState.catalog.defaultTablePath(tableIdent)) + (locURI, tableType) + } + def createHoodieTable(ident: Identifier, schema: StructType, + locUriAndTableType: (URI, CatalogTableType), partitions: Array[Transform], allTableProperties: util.Map[String, String], writeOptions: Map[String, String], @@ -205,29 +235,17 @@ class HoodieCatalog extends DelegatingCatalogExtension val newPartitionColumns = partitionColumns val newBucketSpec = maybeBucketSpec - val isByPath = isPathIdentifier(ident) - - val location = if (isByPath) Option(ident.name()) else Option(allTableProperties.get("location")) - val id = ident.asTableIdentifier - - val locUriOpt = location.map(CatalogUtils.stringToURI) - val existingTableOpt = getExistingTableIfExists(id) - val loc = locUriOpt - .orElse(existingTableOpt.flatMap(_.storage.locationUri)) - .getOrElse(spark.sessionState.catalog.defaultTablePath(id)) val storage = DataSource.buildStorageFormatFromOptions(writeOptions) - .copy(locationUri = Option(loc)) - val tableType = - if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED + .copy(locationUri = Option(locUriAndTableType._1)) val commentOpt = Option(allTableProperties.get("comment")) val tablePropertiesNew = new util.HashMap[String, String](allTableProperties) // put path to table properties. - tablePropertiesNew.put("path", loc.getPath) + tablePropertiesNew.put("path", locUriAndTableType._1.getPath) val tableDesc = new CatalogTable( - identifier = id, - tableType = tableType, + identifier = ident.asTableIdentifier, + tableType = locUriAndTableType._2, storage = storage, schema = newSchema, provider = Option("hudi"), diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieStagedTable.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieStagedTable.scala index 4034862167aa5..e18f23ebde03f 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieStagedTable.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieStagedTable.scala @@ -21,16 +21,18 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions.RECORDKEY_FIELD import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, SupportsWrite, TableCapability} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.{LogicalWriteInfo, V1Write, WriteBuilder} -import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.types.StructType +import java.net.URI import java.util import scala.collection.JavaConverters.{mapAsScalaMapConverter, setAsJavaSetConverter} case class HoodieStagedTable(ident: Identifier, + locUriAndTableType: (URI, CatalogTableType), catalog: HoodieCatalog, override val schema: StructType, partitions: Array[Transform], @@ -59,13 +61,14 @@ case class HoodieStagedTable(ident: Identifier, props.putAll(properties) props.put("hoodie.table.name", ident.name()) props.put(RECORDKEY_FIELD.key, properties.get("primaryKey")) - catalog.createHoodieTable(ident, schema, partitions, props, writeOptions, sourceQuery, mode) + catalog.createHoodieTable( + ident, schema, locUriAndTableType, partitions, props, writeOptions, sourceQuery, mode) } override def name(): String = ident.name() override def abortStagedChanges(): Unit = { - clearTablePath(properties.get("location"), catalog.spark.sparkContext.hadoopConfiguration) + clearTablePath(locUriAndTableType._1.getPath, catalog.spark.sparkContext.hadoopConfiguration) } private def clearTablePath(tablePath: String, conf: Configuration): Unit = { @@ -85,13 +88,9 @@ case class HoodieStagedTable(ident: Identifier, * WriteBuilder for creating a Hoodie table. */ private class HoodieV1WriteBuilder extends WriteBuilder { - override def build(): V1Write = new V1Write { - override def toInsertableRelation(): InsertableRelation = { - new InsertableRelation { - override def insert(data: DataFrame, overwrite: Boolean): Unit = { - sourceQuery = Option(data) - } - } + override def build(): V1Write = () => { + (data: DataFrame, overwrite: Boolean) => { + sourceQuery = Option(data) } } }