Skip to content

Commit

Permalink
clone conf and apply all initiate options to them
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Feb 1, 2019
1 parent 4a1a18b commit 62d2aa3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,41 @@ import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.{MutableURLClassLoader, Utils}


/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*/
private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[String, String])
extends Logging {
private val conf = sparkContext.conf.clone()
private val hadoopConf = new Configuration(sparkContext.hadoopConfiguration)

// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
// `SharedState`, all `SparkSession` level configurations have higher priority to generate a
// `SharedState` instance. This will be done only once then shared across `SparkSession`s
initConfig.foreach { case (k, v) =>
logDebug(s"Applying initiate SparkSession options to SparkConf/HadoopConf: $k -> $v")
conf.set(k, v)
hadoopConf.set(k, v)
}
logInfo("Applied all initiate SparkSession options to the brand new SharedState")

// Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
// the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
val warehousePath: String = {
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
logInfo(s"loading hive config file: $configFile")
sparkContext.hadoopConfiguration.addResource(configFile)
hadoopConf.addResource(configFile)
}

// hive.metastore.warehouse.dir only stay in hadoopConf
sparkContext.conf.remove("hive.metastore.warehouse.dir")
conf.remove("hive.metastore.warehouse.dir")
// Set the Hive metastore warehouse path to the one we use
val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !conf.contains(WAREHOUSE_PATH.key)) {
// If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
// we will respect the value of hive.metastore.warehouse.dir.
sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
Expand All @@ -69,16 +80,15 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S
// the value of spark.sql.warehouse.dir.
// When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
// we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
val sparkWarehouseDir = conf.get(WAREHOUSE_PATH)
logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " +
s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
hadoopConf.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
sparkWarehouseDir
}
}
logInfo(s"Warehouse path is '$warehousePath'.")


/**
* Class for caching query results reused in future executions.
*/
Expand All @@ -102,9 +112,7 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S
*/
lazy val externalCatalog: ExternalCatalogWithListener = {
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf, initConfig),
sparkContext.conf,
sparkContext.hadoopConfiguration)
SharedState.externalCatalogClassName(conf), conf, hadoopConf)

val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
Expand Down Expand Up @@ -138,7 +146,7 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S
// System preserved database should not exists in metastore. However it's hard to guarantee it
// for every session, because case-sensitivity differs. Here we always lowercase it to make our
// life easier.
val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
if (externalCatalog.databaseExists(globalTempDB)) {
throw new SparkException(
s"$globalTempDB is a system preserved database, please rename your existing database " +
Expand Down Expand Up @@ -166,11 +174,8 @@ object SharedState extends Logging {

private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog"

private def externalCatalogClassName(
conf: SparkConf,
initSessionConfig: Map[String, String]): String = {
initSessionConfig
.getOrElse(CATALOG_IMPLEMENTATION.key, conf.get(CATALOG_IMPLEMENTATION)) match {
private def externalCatalogClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ class SharedStateSuite extends SparkFunSuite {
val ss = SparkSession.builder().enableHiveSupport().getOrCreate()
assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName ===
"org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ")
val ss2 = SparkSession.builder().getOrCreate()

val ss2 = SparkSession.builder().getOrCreate()
assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName ===
"org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should shared across sessions")
"org.apache.spark.sql.hive.HiveExternalCatalog",
"The catalog should be shared across sessions")
}

}

0 comments on commit 62d2aa3

Please sign in to comment.