Skip to content

Commit

Permalink
[MINOR] Fix the issue when handling conf hoodie.datasource.write.oper…
Browse files Browse the repository at this point in the history
…ation=bulk_insert in sql mode apache#5679
  • Loading branch information
neverdizzy committed May 28, 2022
1 parent c8e0522 commit ae57789
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.sql.hudi.command

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, WriteOperationType}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
Expand All @@ -31,7 +30,6 @@ import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
Expand All @@ -41,7 +39,6 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}

import java.util.Properties

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -216,8 +213,12 @@ object InsertIntoHoodieTableCommand extends Logging {
val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
.getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)

// val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
// DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean ||
parameters.get(DataSourceWriteOptions.OPERATION.key).exists(_.equalsIgnoreCase(WriteOperationType.BULK_INSERT.value))

val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean

Expand Down

0 comments on commit ae57789

Please sign in to comment.