Skip to content

Commit

Permalink
[HUDI-5163] Fix failure handling with spark datasource write (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored and fengjian committed Apr 5, 2023
1 parent 486585b commit 94ff928
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,15 @@ class DefaultSource extends RelationProvider

if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
HoodieSparkSqlWriter.cleanup()
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
HoodieSparkSqlWriter.cleanup()
if (!success) {
throw new HoodieException("Write to Hudi failed")
}
}

HoodieSparkSqlWriter.cleanup()
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi.command

import org.apache.hudi.HoodieSparkSqlWriter
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
Expand Down Expand Up @@ -59,11 +60,14 @@ case class AlterHoodieTableDropPartitionCommand(
// delete partition files by enabling cleaner and setting retention policies.
val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs)
val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop)
HoodieSparkSqlWriter.write(
val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
sparkSession.sqlContext,
SaveMode.Append,
parameters,
sparkSession.emptyDataFrame)
if (!success) {
throw new HoodieException("Alter table command failed")
}

sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
logInfo(s"Finish execute alter table drop partition command for $fullTableName")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hudi.HoodieSparkSqlWriter
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable}
Expand Down Expand Up @@ -85,11 +86,14 @@ case class TruncateHoodieTableCommand(
// drop partitions to lazy clean
val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs)
val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop)
HoodieSparkSqlWriter.write(
val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
sparkSession.sqlContext,
SaveMode.Append,
parameters,
sparkSession.emptyDataFrame)
if (!success) {
throw new HoodieException("Truncate Hoodie Table command failed")
}
}

// After deleting the data, refresh the table to make sure we don't keep around a stale
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi

val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery))

if (!success) {
throw new HoodieException("Insert Into to Hudi table failed")
}

if (success && refreshTable) {
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TBL_NAME}
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport}
Expand Down Expand Up @@ -354,7 +356,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
writeParams += (PAYLOAD_RECORD_AVRO_SCHEMA ->
convertStructTypeToAvroSchema(trimmedSourceDF.schema, "record", "").toString)

HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, trimmedSourceDF)
val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, trimmedSourceDF)
if (!success) {
throw new HoodieException("Merge into Hoodie table command failed")
}
}

private def checkUpdateAssignments(updateActions: Seq[UpdateAction]): Unit = {
Expand Down

0 comments on commit 94ff928

Please sign in to comment.