diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 5435aad05e88a..c774e4df5cb39 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -383,80 +383,86 @@ class TestCreateTable extends HoodieSparkSqlTestBase { } test("Test Create Table As Select With Tblproperties For Filter Props") { - Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - spark.sql( - s""" - | create table $tableName using hudi - | partitioned by (dt) - | tblproperties( - | hoodie.database.name = "databaseName", - | hoodie.table.name = "tableName", - | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.datasource.write.operation = 'upsert', - | type = '$tableType' - | ) - | AS - | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + spark.sql( + s""" + | create table $tableName using hudi + | partitioned by (dt) + | tblproperties( + | hoodie.database.name = "databaseName", + | hoodie.table.name = "tableName", + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.datasource.write.operation = 'upsert', + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$tableName' + | AS + | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts """.stripMargin - ) - checkAnswer(s"select id, name, price, dt from $tableName")( - Seq(1, "a1", 10, "2021-04-01") - ) - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) - assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key())) - assertFalse(table.properties.contains(HoodieTableConfig.NAME.key())) - assertFalse(table.properties.contains(OPERATION.key())) + ) + checkAnswer(s"select id, name, price, dt from $tableName")( + Seq(1, "a1", 10, "2021-04-01") + ) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key())) + assertFalse(table.properties.contains(HoodieTableConfig.NAME.key())) + assertFalse(table.properties.contains(OPERATION.key())) - val tablePath = table.storage.properties("path") - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(tablePath) - .setConf(spark.sessionState.newHadoopConf()) - .build() - val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap - assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key())) - assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key())) - assertFalse(tableConfig.contains(OPERATION.key())) + val tablePath = table.storage.properties("path") + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap + assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key())) + assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key())) + assertFalse(tableConfig.contains(OPERATION.key())) + } } } test("Test Create Table As Select With Options For Filter Props") { - Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - spark.sql( - s""" - | create table $tableName using hudi - | partitioned by (dt) - | options( - | hoodie.database.name = "databaseName", - | hoodie.table.name = "tableName", - | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.datasource.write.operation = 'upsert', - | type = '$tableType' - | ) - | AS - | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + spark.sql( + s""" + | create table $tableName using hudi + | partitioned by (dt) + | options( + | hoodie.database.name = "databaseName", + | hoodie.table.name = "tableName", + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.datasource.write.operation = 'upsert', + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$tableName' + | AS + | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts """.stripMargin - ) - checkAnswer(s"select id, name, price, dt from $tableName")( - Seq(1, "a1", 10, "2021-04-01") - ) - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) - assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key())) - assertFalse(table.properties.contains(HoodieTableConfig.NAME.key())) - assertFalse(table.properties.contains(OPERATION.key())) + ) + checkAnswer(s"select id, name, price, dt from $tableName")( + Seq(1, "a1", 10, "2021-04-01") + ) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key())) + assertFalse(table.properties.contains(HoodieTableConfig.NAME.key())) + assertFalse(table.properties.contains(OPERATION.key())) - val tablePath = table.storage.properties("path") - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(tablePath) - .setConf(spark.sessionState.newHadoopConf()) - .build() - val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap - assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key())) - assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key())) - assertFalse(tableConfig.contains(OPERATION.key())) + val tablePath = table.storage.properties("path") + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap + assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key())) + assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key())) + assertFalse(tableConfig.contains(OPERATION.key())) + } } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala index 5f4572dcc9388..2c5261a12f146 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala @@ -26,6 +26,7 @@ import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils, HoodieCatalogTable} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange, UpdateColumnComment, UpdateColumnType} @@ -215,7 +216,7 @@ class HoodieCatalog extends DelegatingCatalogExtension val loc = locUriOpt .orElse(existingTableOpt.flatMap(_.storage.locationUri)) .getOrElse(spark.sessionState.catalog.defaultTablePath(id)) - val storage = DataSource.buildStorageFormatFromOptions(writeOptions) + val storage = DataSource.buildStorageFormatFromOptions(writeOptions.--(needFilterProps)) .copy(locationUri = Option(loc)) val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED @@ -233,7 +234,7 @@ class HoodieCatalog extends DelegatingCatalogExtension provider = Option("hudi"), partitionColumnNames = newPartitionColumns, bucketSpec = newBucketSpec, - properties = tablePropertiesNew.asScala.toMap, + properties = tablePropertiesNew.asScala.toMap.--(needFilterProps), comment = commentOpt) val hoodieCatalogTable = HoodieCatalogTable(spark, tableDesc)