diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 4b6d8e06f748..91bcf38bb60a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, IndexedRecord} - import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord} +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, WriteOperationType} import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME @@ -31,7 +30,6 @@ import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.sql.InsertMode import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} - import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} @@ -41,7 +39,6 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} - import java.util.Properties import scala.collection.JavaConverters._ @@ -216,8 +213,12 @@ object InsertIntoHoodieTableCommand extends Logging { val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName) .getOrElse(classOf[ComplexKeyGenerator].getCanonicalName) +// val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, +// DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, - DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean + DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean || + parameters.get(DataSourceWriteOptions.OPERATION.key).exists(_.equalsIgnoreCase(WriteOperationType.BULK_INSERT.value)) + val dropDuplicate = sparkSession.conf .getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean