diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 5676b72aefb59..7fb2dc1ebd4ba 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -157,7 +157,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val primaryKeys = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp.split(",") // Only records that are not included in the target table can be inserted val insertSourceDF = sourceDF.join(targetDF, primaryKeys,"leftanti") - executeInsertOnly(insertSourceDF, parameters) + + // column order changed after left anti join , we should keep column order of source dataframe + val cols = removeMetaFields(sourceDF).columns + executeInsertOnly(insertSourceDF.select(cols.head, cols.tail:_*), parameters) } sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString) Seq.empty[Row] diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index 1c2dc0aab61ba..b77b5c3dbdbf8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -636,4 +636,41 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { } } + test("Test Merge Into For Source Table With Different Column Order") { + withTempDir { tmp => + val tableName = generateTableName + // Create a mor partitioned table. + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + | ) using hudi + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by(dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + // Insert data which matched insert-condition. + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 'a1' as name, 1 as id, 10 as price, 1000 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when not matched and s0.id % 2 = 1 then insert * + """.stripMargin + ) + checkAnswer(s"select id,name,price,dt from $tableName")( + Seq(1, "a1", 10, "2021-03-21") + ) + } + } }