From e33403ff32b8bb2a49cb34a16c743fa515e43360 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Tue, 27 Sep 2022 01:53:22 +0800 Subject: [PATCH 1/7] Revert "add auto col cast in MergeIntoHoodieTableCommand" This reverts commit 9ad4c06fb5eedab0725589e1958284301e751296. --- .../spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index d403f1998c6b..d38cce4827bb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -153,6 +153,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie if (mergeInto.matchedActions.nonEmpty) { // Do the upsert executeUpsert(sourceDF, parameters) } else { // If there is no match actions in the statement, execute insert operation only. + executeInsertOnly(sourceDF, parameters) val targetDF = Dataset.ofRows(sparkSession, mergeInto.targetTable) val primaryKeys = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp.split(",") // Only records that are not included in the target table can be inserted From 31a6b32754018c8641c687b1ba18ce3859e8703d Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Tue, 27 Sep 2022 02:01:25 +0800 Subject: [PATCH 2/7] force to use ExpressionPayload in MergeIntoHoodieTableCommand --- .../spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index d38cce4827bb..c7094890a596 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -495,7 +495,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie PRECOMBINE_FIELD.key -> preCombineField, TBL_NAME.key -> hoodieCatalogTable.tableName, PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, - PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, @@ -514,6 +513,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL ) .filter { case (_, v) => v != null } - } + } + (PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName) } } From 10dd6bf7b3974db815a906a7804deb2410170d4b Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Wed, 28 Sep 2022 12:30:56 +0800 Subject: [PATCH 3/7] [HUDI-4925] minor update --- .../spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index c7094890a596..ceedbdeca03c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -474,7 +474,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val targetTableDb = targetTableIdentify.database.getOrElse("default") val targetTableName = targetTableIdentify.identifier val path = hoodieCatalogTable.tableLocation - val catalogProperties = hoodieCatalogTable.catalogProperties + // force to use ExpressionPayload as WRITE_PAYLOAD_CLASS_NAME in MergeIntoHoodieTableCommand + val catalogProperties = hoodieCatalogTable.catalogProperties + (PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName) val tableConfig = hoodieCatalogTable.tableConfig val tableSchema = hoodieCatalogTable.tableSchema val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) @@ -513,6 +514,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL ) .filter { case (_, v) => v != null } - } + (PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName) + } } } From 90dd4ca5753b5bf1ce434bc1ab3a05b5381a9aa0 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Wed, 28 Sep 2022 17:00:22 +0800 Subject: [PATCH 4/7] [HUDI-4925] minor update --- .../spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index ceedbdeca03c..c3e5fc35290e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -153,7 +153,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie if (mergeInto.matchedActions.nonEmpty) { // Do the upsert executeUpsert(sourceDF, parameters) } else { // If there is no match actions in the statement, execute insert operation only. - executeInsertOnly(sourceDF, parameters) val targetDF = Dataset.ofRows(sparkSession, mergeInto.targetTable) val primaryKeys = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp.split(",") // Only records that are not included in the target table can be inserted From 1323a940de5a28b75dcec8c422916032d6740fe3 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Thu, 29 Sep 2022 23:26:14 +0800 Subject: [PATCH 5/7] [HUDI-4925] add ut --- .../command/MergeIntoHoodieTableCommand.scala | 2 +- .../spark/sql/hudi/TestMergeIntoTable2.scala | 38 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index c3e5fc35290e..e212b5153e37 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -488,7 +488,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) - withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) { + withSparkConf(sparkSession, catalogProperties) { Map( "path" -> path, RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index 8e6acd1be58c..9b1c72dc66d5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -674,7 +674,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { } } - test ("Test Merge into with String cast to Double") { + test("Test Merge into with String cast to Double") { withTempDir { tmp => val tableName = generateTableName // Create a cow partitioned table. @@ -713,4 +713,40 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { ) } } + test("Test Merge into where manually set DefaultHoodieRecordPayload") { + withTempDir { tmp => + val tableName = generateTableName + // Create a cow partitioned table. + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | ts long + | ) using hudi + | tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' + | ) location '${tmp.getCanonicalPath}' + """.stripMargin) + // Insert data + spark.sql(s"insert into $tableName select 1, 'a1', 999") + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 'a2' as name, 1 as id, 1000 as ts + | ) as s0 + | on t0.id = s0.id + | when matched then update set t0.name = s0.name, t0.ts = s0.ts + | when not matched then insert * + """.stripMargin + ) + checkAnswer(s"select id,name,ts from $tableName")( + Seq(1, "a2", 1000) + ) + } + } } From c94db130d89f3e0c221d7f1972f3d2235ea97f04 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Thu, 29 Sep 2022 23:33:57 +0800 Subject: [PATCH 6/7] [HUDI-4925] minor update --- .../scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index 9b1c72dc66d5..f0102e8440f4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -716,7 +716,8 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { test("Test Merge into where manually set DefaultHoodieRecordPayload") { withTempDir { tmp => val tableName = generateTableName - // Create a cow partitioned table. + // Create a cow table with default payload class, check whether it will be overwritten by ExpressionPayload. + // if not, this ut cannot pass since DefaultHoodieRecordPayload can not promotion int to long when insert a ts with Integer value spark.sql( s""" | create table $tableName ( From 51fe330035a595e4d65cdf58554077ed0916fd25 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Thu, 29 Sep 2022 23:46:33 +0800 Subject: [PATCH 7/7] [HUDI-4925] minor update --- .../scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index f0102e8440f4..8a6aa9691d93 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -713,6 +713,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { ) } } + test("Test Merge into where manually set DefaultHoodieRecordPayload") { withTempDir { tmp => val tableName = generateTableName