diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 864e3f2f800ab..cc997dbb3a5f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -36,12 +36,23 @@ import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.{MutableURLClassLoader, Utils} - /** * A class that holds all state shared across sessions in a given [[SQLContext]]. */ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[String, String]) extends Logging { + private val conf = sparkContext.conf.clone() + private val hadoopConf = new Configuration(sparkContext.hadoopConfiguration) + + // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing + // `SharedState`, all `SparkSession` level configurations have higher priority to generate a + // `SharedState` instance. This will be done only once then shared across `SparkSession`s + initConfig.foreach { case (k, v) => + logDebug(s"Applying initiate SparkSession options to SparkConf/HadoopConf: $k -> $v") + conf.set(k, v) + hadoopConf.set(k, v) + } + logInfo("Applied all initiate SparkSession options to the brand new SharedState") // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. @@ -49,17 +60,17 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") if (configFile != null) { logInfo(s"loading hive config file: $configFile") - sparkContext.hadoopConfiguration.addResource(configFile) + hadoopConf.addResource(configFile) } // hive.metastore.warehouse.dir only stay in hadoopConf - sparkContext.conf.remove("hive.metastore.warehouse.dir") + conf.remove("hive.metastore.warehouse.dir") // Set the Hive metastore warehouse path to the one we use - val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") - if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) { + val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir") + if (hiveWarehouseDir != null && !conf.contains(WAREHOUSE_PATH.key)) { // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, // we will respect the value of hive.metastore.warehouse.dir. - sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) + conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " + s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") @@ -69,16 +80,15 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S // the value of spark.sql.warehouse.dir. // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. - val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH) + val sparkWarehouseDir = conf.get(WAREHOUSE_PATH) logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " + s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") - sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir) + hadoopConf.set("hive.metastore.warehouse.dir", sparkWarehouseDir) sparkWarehouseDir } } logInfo(s"Warehouse path is '$warehousePath'.") - /** * Class for caching query results reused in future executions. */ @@ -102,9 +112,7 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S */ lazy val externalCatalog: ExternalCatalogWithListener = { val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( - SharedState.externalCatalogClassName(sparkContext.conf, initConfig), - sparkContext.conf, - sparkContext.hadoopConfiguration) + SharedState.externalCatalogClassName(conf), conf, hadoopConf) val defaultDbDefinition = CatalogDatabase( SessionCatalog.DEFAULT_DATABASE, @@ -138,7 +146,7 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S // System preserved database should not exists in metastore. However it's hard to guarantee it // for every session, because case-sensitivity differs. Here we always lowercase it to make our // life easier. - val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT) + val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT) if (externalCatalog.databaseExists(globalTempDB)) { throw new SparkException( s"$globalTempDB is a system preserved database, please rename your existing database " + @@ -166,11 +174,8 @@ object SharedState extends Logging { private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog" - private def externalCatalogClassName( - conf: SparkConf, - initSessionConfig: Map[String, String]): String = { - initSessionConfig - .getOrElse(CATALOG_IMPLEMENTATION.key, conf.get(CATALOG_IMPLEMENTATION)) match { + private def externalCatalogClassName(conf: SparkConf): String = { + conf.get(CATALOG_IMPLEMENTATION) match { case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME case "in-memory" => classOf[InMemoryCatalog].getCanonicalName } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala index 52a2a18cbf910..ed591977f9231 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala @@ -28,10 +28,10 @@ class SharedStateSuite extends SparkFunSuite { val ss = SparkSession.builder().enableHiveSupport().getOrCreate() assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName === "org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ") - val ss2 = SparkSession.builder().getOrCreate() + val ss2 = SparkSession.builder().getOrCreate() assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName === - "org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should shared across sessions") + "org.apache.spark.sql.hive.HiveExternalCatalog", + "The catalog should be shared across sessions") } - }