Skip to content

Commit

Permalink
[HUDI-6109] Fix the scala compile ambiguity of Properties#putAll (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 authored and yihua committed May 15, 2023
1 parent 72588d9 commit 22485f2
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory

import java.util.Properties
import scala.collection.JavaConverters._

object SparkKeyGenUtils {
Expand All @@ -31,8 +30,7 @@ object SparkKeyGenUtils {
* @param properties config properties
* @return partition columns
*/
def getPartitionColumns(properties: Properties): String = {
val props = new TypedProperties(properties)
def getPartitionColumns(props: TypedProperties): String = {
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
getPartitionColumns(keyGenerator, props)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public HoodieConfig(Properties props) {
this.props = new TypedProperties(props);
}

public HoodieConfig(TypedProperties props) {
this.props = props;
}

public <T> void setValue(ConfigProperty<T> cfg, String val) {
cfg.checkValues(val);
props.setProperty(cfg.key(), val);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -119,4 +120,43 @@ public double getDouble(String property) {
public double getDouble(String property, double defaultValue) {
return containsKey(property) ? Double.parseDouble(getProperty(property)) : defaultValue;
}

/**
* This method is introduced to get rid of the scala compile error:
* <pre>
* <code>
* ambiguous reference to overloaded definition,
* both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit
* and method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit
* match argument types (java.util.HashMap[Nothing,Nothing])
* properties.putAll(new java.util.HashMap())
* </code>
* </pre>
*
* @param items The new items to put
*/
public static TypedProperties fromMap(Map<?, ?> items) {
TypedProperties props = new TypedProperties();
props.putAll(items);
return props;
}

/**
* This method is introduced to get rid of the scala compile error:
* <pre>
* <code>
* ambiguous reference to overloaded definition,
* both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit
* and method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit
* match argument types (java.util.HashMap[Nothing,Nothing])
* properties.putAll(new java.util.HashMap())
* </code>
* </pre>
*
* @param props The properties
* @param items The new items to put
*/
public static void putAll(TypedProperties props, Map<?, ?> items) {
props.putAll(items);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferenc
import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.{CollectionUtils, StringUtils}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String

import java.text.SimpleDateFormat
import java.util
import javax.annotation.concurrent.NotThreadSafe
import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -309,8 +310,7 @@ object HoodieFileIndex extends Logging {

def getConfigProperties(spark: SparkSession, options: Map[String, String]) = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = new TypedProperties()
properties.putAll(options.filter(p => p._2 != null).asJava)
val properties = TypedProperties.fromMap(options.filter(p => p._2 != null).asJava)

// TODO(HUDI-5361) clean up properties carry-over

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,7 @@ object HoodieSparkSqlWriter {
if (metaSyncEnabled) {
val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT);
val properties = new TypedProperties()
properties.putAll(hoodieConfig.getProps)
val properties = TypedProperties.fromMap(hoodieConfig.getProps)
properties.put(HiveSyncConfigHolder.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key, spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD).toString)
properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key, SPARK_VERSION)
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.hudi
import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions.{RECORD_MERGER_IMPLS, _}
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig}
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
Expand All @@ -29,6 +29,7 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.SparkKeyGenUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hudi.command.SqlKeyGenerator

import java.util.Properties
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._
Expand All @@ -46,8 +47,7 @@ object HoodieWriterUtils {
*/
def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala
val props = new Properties()
props.putAll(parameters)
val props = TypedProperties.fromMap(parameters)
val hoodieConfig: HoodieConfig = new HoodieConfig(props)
hoodieConfig.setDefaultValue(OPERATION)
hoodieConfig.setDefaultValue(TABLE_TYPE)
Expand Down Expand Up @@ -93,8 +93,7 @@ object HoodieWriterUtils {
*/
def getParamsWithAlternatives(parameters: Map[String, String]): Map[String, String] = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala
val props = new Properties()
props.putAll(parameters)
val props = TypedProperties.fromMap(parameters)
val hoodieConfig: HoodieConfig = new HoodieConfig(props)
// do not set any default as this is called before validation.
Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
Expand All @@ -106,14 +105,11 @@ object HoodieWriterUtils {
* @return
*/
def getPartitionColumns(parameters: Map[String, String]): String = {
val props = new Properties()
props.putAll(parameters.asJava)
SparkKeyGenUtils.getPartitionColumns(props)
SparkKeyGenUtils.getPartitionColumns(TypedProperties.fromMap(parameters))
}

def convertMapToHoodieConfig(parameters: Map[String, String]): HoodieConfig = {
val properties = new Properties()
properties.putAll(mapAsJavaMap(parameters))
val properties = TypedProperties.fromMap(mapAsJavaMap(parameters))
new HoodieConfig(properties)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.hudi.DataSourceWriteOptions.OPERATION
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, TypedProperties}
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
Expand Down Expand Up @@ -180,8 +180,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
table = table.copy(schema = finalSchema)

// Save all the table config to the hoodie.properties.
val properties = new Properties()
properties.putAll(tableConfigs.asJava)
val properties = TypedProperties.fromMap(tableConfigs.asJava)

val catalogDatabaseName = formatName(spark,
table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ package org.apache.spark.sql.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline.parseDateFromInstantTime
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceReadOptions, SparkAdapterSupport}
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, SparkAdapterSupport}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
Expand All @@ -41,9 +41,8 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}

import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Locale, Properties}
import java.util.Locale
import scala.collection.JavaConverters._
import scala.collection.immutable.Map
import scala.util.Try

object HoodieSqlCommonUtils extends SparkAdapterSupport {
Expand Down Expand Up @@ -74,8 +73,7 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val metadataConfig = {
val properties = new Properties()
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++ table.properties).asJava)
val properties = TypedProperties.fromMap((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++ table.properties).asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}
FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala
Expand All @@ -86,9 +84,7 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
partitionPaths: Seq[String]): Map[String, Array[FileStatus]] = {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val metadataConfig = {
val properties = new Properties()
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++
table.properties).asJava)
val properties = TypedProperties.fromMap((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++ table.properties).asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}
FSUtils.getFilesInPartitions(sparkEngine, metadataConfig, getTableLocation(table, spark),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.hudi.command

import org.apache.avro.generic.GenericRecord
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieKey
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.config.HoodieWriteConfig
Expand All @@ -30,11 +29,11 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.joda.time.format.DateTimeFormat

import java.sql.Timestamp
import java.util
import java.util.Collections
import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
import org.apache.spark.sql.catalyst.InternalRow

/**
* Custom Spark-specific [[KeyGenerator]] overriding behavior handling [[TimestampType]] partition values
Expand All @@ -58,8 +57,7 @@ class SqlKeyGenerator(props: TypedProperties) extends BuiltinKeyGenerator(props)

val convertedKeyGenClassName = HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(originalKeyGenClassName)

val keyGenProps = new TypedProperties()
keyGenProps.putAll(props)
val keyGenProps = TypedProperties.fromMap(props)
keyGenProps.remove(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME)
keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Log
cfg.setBootstrapOverwrite(bootstrapOverwrite)

// add session bootstrap conf
properties.putAll(spark.sqlContext.conf.getAllConfs.asJava)
TypedProperties.putAll(properties, spark.sqlContext.conf.getAllConfs.asJava)
new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration, properties).execute()
Seq(Row(0))
}
Expand Down

0 comments on commit 22485f2

Please sign in to comment.