Skip to content

Commit

Permalink
[HUDI-4946] fix merge into with no preCombineField having dup row by …
Browse files Browse the repository at this point in the history
…only insert (#6824)
  • Loading branch information
KnightChess authored Oct 27, 2022
1 parent 7accc47 commit 5b9fcc4
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie

// Create the write parameters
val parameters = buildMergeIntoConfig(hoodieCatalogTable)
executeUpsert(sourceDF, parameters)

if (mergeInto.matchedActions.nonEmpty) { // Do the upsert
executeUpsert(sourceDF, parameters)
} else { // If there is no match actions in the statement, execute insert operation only.
val targetDF = Dataset.ofRows(sparkSession, mergeInto.targetTable)
val primaryKeys = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp.split(",")
// Only records that are not included in the target table can be inserted
val insertSourceDF = sourceDF.join(targetDF, primaryKeys,"leftanti")

// column order changed after left anti join , we should keep column order of source dataframe
val cols = removeMetaFields(sourceDF).columns
executeInsertOnly(insertSourceDF.select(cols.head, cols.tail:_*), parameters)
}
sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString)
Seq.empty[Row]
}
Expand Down Expand Up @@ -299,35 +288,30 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
* expressions to the ExpressionPayload#getInsertValue.
*/
private def executeUpsert(sourceDF: DataFrame, parameters: Map[String, String]): Unit = {
val updateActions = mergeInto.matchedActions.filter(_.isInstanceOf[UpdateAction])
.map(_.asInstanceOf[UpdateAction])
// Check for the update actions
checkUpdateAssignments(updateActions)

val deleteActions = mergeInto.matchedActions.filter(_.isInstanceOf[DeleteAction])
.map(_.asInstanceOf[DeleteAction])
assert(deleteActions.size <= 1, "Should be only one delete action in the merge into statement.")
val deleteAction = deleteActions.headOption

val insertActions =
mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction])

// Check for the insert actions
checkInsertAssignments(insertActions)

// Append the table schema to the parameters. In the case of merge into, the schema of sourceDF
// may be different from the target table, because the are transform logical in the update or
// insert actions.
val operation = if (StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) {
INSERT_OPERATION_OPT_VAL
} else {
UPSERT_OPERATION_OPT_VAL
}

// Append the table schema to the parameters. In the case of merge into, the schema of sourceDF
// may be different from the target table, because the are transform logical in the update or
// insert actions.
var writeParams = parameters +
(OPERATION.key -> operation) +
(HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString) +
(DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType)

val updateActions = mergeInto.matchedActions.filter(_.isInstanceOf[UpdateAction])
.map(_.asInstanceOf[UpdateAction])
// Check for the update actions
checkUpdateAssignments(updateActions)

val deleteActions = mergeInto.matchedActions.filter(_.isInstanceOf[DeleteAction])
.map(_.asInstanceOf[DeleteAction])
assert(deleteActions.size <= 1, "Should be only one delete action in the merge into statement.")
val deleteAction = deleteActions.headOption

// Map of Condition -> Assignments
val updateConditionToAssignments =
updateActions.map(update => {
Expand All @@ -352,28 +336,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
writeParams += (PAYLOAD_DELETE_CONDITION -> serializedDeleteCondition)
}

// Serialize the Map[InsertCondition, InsertAssignments] to base64 string
writeParams += (PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS ->
serializedInsertConditionAndExpressions(insertActions))

// Remove the meta fields from the sourceDF as we do not need these when writing.
val sourceDFWithoutMetaFields = removeMetaFields(sourceDF)
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDFWithoutMetaFields)
}
val insertActions =
mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction])

/**
* If there are not matched actions, we only execute the insert operation.
* @param sourceDF
* @param parameters
*/
private def executeInsertOnly(sourceDF: DataFrame, parameters: Map[String, String]): Unit = {
val insertActions = mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction])
// Check for the insert actions
checkInsertAssignments(insertActions)

var writeParams = parameters +
(OPERATION.key -> INSERT_OPERATION_OPT_VAL) +
(HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString)

// Serialize the Map[InsertCondition, InsertAssignments] to base64 string
writeParams += (PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS ->
serializedInsertConditionAndExpressions(insertActions))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
}
}

test("Test Merge into with String cast to Double") {
test ("Test Merge into with String cast to Double") {
withTempDir { tmp =>
val tableName = generateTableName
// Create a cow partitioned table.
Expand Down Expand Up @@ -750,4 +750,136 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
)
}
}

test("Test only insert for source table in dup key with preCombineField") {
Seq("cow", "mor").foreach {
tableType => {
withTempDir { tmp =>
val tableName = generateTableName
// Create a cow partitioned table with preCombineField
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
| ) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| partitioned by(dt)
| location '${tmp.getCanonicalPath}'
""".stripMargin)
// Insert data without match condition
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 1 as id, 'a1' as name, 10.1 as price, 1000 as ts, '2021-03-21' as dt
| union all
| select 1 as id, 'a2' as name, 10.2 as price, 1002 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched then insert *
""".stripMargin
)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a2", 10.2, 1002, "2021-03-21")
)

// Insert data with match condition
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 1 as id, 'a1_new' as name, 10.1 as price, 1003 as ts, '2021-03-21' as dt
| union all
| select 3 as id, 'a3' as name, 10.3 as price, 1003 as ts, '2021-03-21' as dt
| union all
| select 3 as id, 'a3' as name, 10.3 as price, 1003 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched then update set *
| when not matched then insert *
""".stripMargin
)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1_new", 10.1, 1003, "2021-03-21"),
Seq(3, "a3", 10.3, 1003, "2021-03-21")
)
}
}
}
}

test("Test only insert for source table in dup key without preCombineField") {
Seq("cow", "mor").foreach {
tableType => {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
| ) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id'
| )
| partitioned by(dt)
| location '${tmp.getCanonicalPath}'
""".stripMargin)
// append records to small file is use update bucket, set this conf use concat handler
spark.sql("set hoodie.merge.allow.duplicate.on.inserts = true")

// Insert data without matched condition
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 1 as id, 'a1' as name, 10.1 as price, 1000 as ts, '2021-03-21' as dt
| union all
| select 1 as id, 'a2' as name, 10.2 as price, 1002 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched then insert *
""".stripMargin
)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.1, 1000, "2021-03-21"),
Seq(1, "a2", 10.2, 1002, "2021-03-21")
)

// Insert data with matched condition
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 3 as id, 'a3' as name, 10.3 as price, 1003 as ts, '2021-03-21' as dt
| union all
| select 1 as id, 'a2' as name, 10.2 as price, 1002 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched then update set *
| when not matched then insert *
""".stripMargin
)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.1, 1000, "2021-03-21"),
Seq(1, "a2", 10.2, 1002, "2021-03-21"),
Seq(3, "a3", 10.3, 1003, "2021-03-21"),
Seq(1, "a2", 10.2, 1002, "2021-03-21")
)
}
}
}
}
}

0 comments on commit 5b9fcc4

Please sign in to comment.