From 4fc31ba36fb632f66da23fce789a1de97233bac0 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 7 Dec 2022 09:17:27 -0800 Subject: [PATCH] [HUDI-5163] Fix failure handling with spark datasource write (#7140) --- .../src/main/scala/org/apache/hudi/DefaultSource.scala | 8 ++++++-- .../command/AlterHoodieTableDropPartitionCommand.scala | 6 +++++- .../sql/hudi/command/TruncateHoodieTableCommand.scala | 6 +++++- .../sql/hudi/command/InsertIntoHoodieTableCommand.scala | 4 ++++ .../sql/hudi/command/MergeIntoHoodieTableCommand.scala | 7 ++++++- 5 files changed, 26 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index ad1e2059bf5e..41e4f8cf7559 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -144,11 +144,15 @@ class DefaultSource extends RelationProvider if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) { HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols) + HoodieSparkSqlWriter.cleanup() } else { - HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols) + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols) + HoodieSparkSqlWriter.cleanup() + if (!success) { + throw new HoodieException("Write to Hudi failed") + } } - HoodieSparkSqlWriter.cleanup() new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index 628f383b6903..c6aa2e7aedac 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.HoodieSparkSqlWriter +import org.apache.hudi.exception.HoodieException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable @@ -59,11 +60,14 @@ case class AlterHoodieTableDropPartitionCommand( // delete partition files by enabling cleaner and setting retention policies. val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs) val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop) - HoodieSparkSqlWriter.write( + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write( sparkSession.sqlContext, SaveMode.Append, parameters, sparkSession.emptyDataFrame) + if (!success) { + throw new HoodieException("Alter table command failed") + } sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) logInfo(s"Finish execute alter table drop partition command for $fullTableName") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala index f5349ee5feed..05f96efdae53 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala @@ -22,6 +22,7 @@ import org.apache.hudi.HoodieSparkSqlWriter import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.exception.HoodieException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable} @@ -85,11 +86,14 @@ case class TruncateHoodieTableCommand( // drop partitions to lazy clean val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs) val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop) - HoodieSparkSqlWriter.write( + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write( sparkSession.sqlContext, SaveMode.Append, parameters, sparkSession.emptyDataFrame) + if (!success) { + throw new HoodieException("Truncate Hoodie Table command failed") + } } // After deleting the data, refresh the table to make sure we don't keep around a stale diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 8bd81df3d271..125e8028020b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -100,6 +100,10 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery)) + if (!success) { + throw new HoodieException("Insert Into to Hudi table failed") + } + if (success && refreshTable) { sparkSession.catalog.refreshTable(table.identifier.unquotedString) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 20402c600fe9..b90807858f3b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -23,6 +23,8 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TBL_NAME} +import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME +import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport} @@ -354,7 +356,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie writeParams += (PAYLOAD_RECORD_AVRO_SCHEMA -> convertStructTypeToAvroSchema(trimmedSourceDF.schema, "record", "").toString) - HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, trimmedSourceDF) + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, trimmedSourceDF) + if (!success) { + throw new HoodieException("Merge into Hoodie table command failed") + } } private def checkUpdateAssignments(updateActions: Seq[UpdateAction]): Unit = {