From 275e99890191968cf543ef9acde8ed4ce4d3392f Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 31 Jan 2019 16:11:38 +0800 Subject: [PATCH 01/15] fix #SPARK-26794 SparkSession enableHiveSupport does not point to hive but in-memory while the SparkContext exists --- .../scala/org/apache/spark/sql/SparkSession.scala | 2 +- .../org/apache/spark/sql/internal/SharedState.scala | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 1c13a6819fe5a..091d68aa84f2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -127,7 +127,7 @@ class SparkSession private( @Unstable @transient lazy val sharedState: SharedState = { - existingSharedState.getOrElse(new SharedState(sparkContext)) + existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions.toMap)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 5b6160e2b408f..864e3f2f800ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -40,7 +40,8 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} /** * A class that holds all state shared across sessions in a given [[SQLContext]]. */ -private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { +private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[String, String]) + extends Logging { // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. @@ -101,7 +102,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { */ lazy val externalCatalog: ExternalCatalogWithListener = { val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( - SharedState.externalCatalogClassName(sparkContext.conf), + SharedState.externalCatalogClassName(sparkContext.conf, initConfig), sparkContext.conf, sparkContext.hadoopConfiguration) @@ -165,8 +166,11 @@ object SharedState extends Logging { private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog" - private def externalCatalogClassName(conf: SparkConf): String = { - conf.get(CATALOG_IMPLEMENTATION) match { + private def externalCatalogClassName( + conf: SparkConf, + initSessionConfig: Map[String, String]): String = { + initSessionConfig + .getOrElse(CATALOG_IMPLEMENTATION.key, conf.get(CATALOG_IMPLEMENTATION)) match { case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME case "in-memory" => classOf[InMemoryCatalog].getCanonicalName } From e1eafe72f64a025ef5f3d17c94374b16e7197cfa Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 31 Jan 2019 17:05:29 +0800 Subject: [PATCH 02/15] fix compile --- .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 1db57b76ac24f..171697c65b0d8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -90,7 +90,7 @@ private[hive] class TestHiveExternalCatalog( private[hive] class TestHiveSharedState( sc: SparkContext, hiveClient: Option[HiveClient] = None) - extends SharedState(sc) { + extends SharedState(sc, Map.empty) { override lazy val externalCatalog: ExternalCatalogWithListener = { new ExternalCatalogWithListener(new TestHiveExternalCatalog( From 4a1a18b9baa789261fe7d2b3c7acc62d5a67c9e0 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 31 Jan 2019 20:31:52 +0800 Subject: [PATCH 03/15] add ut --- .../spark/sql/internal/SharedStateSuite.scala | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala new file mode 100644 index 0000000000000..52a2a18cbf910 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession + +class SharedStateSuite extends SparkFunSuite { + + test("the catalog should be determined at the very first") { + val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") + val sc = new SparkContext(conf) + val ss = SparkSession.builder().enableHiveSupport().getOrCreate() + assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName === + "org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ") + val ss2 = SparkSession.builder().getOrCreate() + + assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName === + "org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should shared across sessions") + } + +} From 62d2aa361b12df21aebf2c809ef14a77fff37b58 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 1 Feb 2019 11:01:08 +0800 Subject: [PATCH 04/15] clone conf and apply all initiate options to them --- .../spark/sql/internal/SharedState.scala | 41 +++++++++++-------- .../spark/sql/internal/SharedStateSuite.scala | 6 +-- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 864e3f2f800ab..cc997dbb3a5f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -36,12 +36,23 @@ import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.{MutableURLClassLoader, Utils} - /** * A class that holds all state shared across sessions in a given [[SQLContext]]. */ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[String, String]) extends Logging { + private val conf = sparkContext.conf.clone() + private val hadoopConf = new Configuration(sparkContext.hadoopConfiguration) + + // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing + // `SharedState`, all `SparkSession` level configurations have higher priority to generate a + // `SharedState` instance. This will be done only once then shared across `SparkSession`s + initConfig.foreach { case (k, v) => + logDebug(s"Applying initiate SparkSession options to SparkConf/HadoopConf: $k -> $v") + conf.set(k, v) + hadoopConf.set(k, v) + } + logInfo("Applied all initiate SparkSession options to the brand new SharedState") // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. @@ -49,17 +60,17 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") if (configFile != null) { logInfo(s"loading hive config file: $configFile") - sparkContext.hadoopConfiguration.addResource(configFile) + hadoopConf.addResource(configFile) } // hive.metastore.warehouse.dir only stay in hadoopConf - sparkContext.conf.remove("hive.metastore.warehouse.dir") + conf.remove("hive.metastore.warehouse.dir") // Set the Hive metastore warehouse path to the one we use - val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") - if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) { + val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir") + if (hiveWarehouseDir != null && !conf.contains(WAREHOUSE_PATH.key)) { // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, // we will respect the value of hive.metastore.warehouse.dir. - sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) + conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " + s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") @@ -69,16 +80,15 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S // the value of spark.sql.warehouse.dir. // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. - val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH) + val sparkWarehouseDir = conf.get(WAREHOUSE_PATH) logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " + s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") - sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir) + hadoopConf.set("hive.metastore.warehouse.dir", sparkWarehouseDir) sparkWarehouseDir } } logInfo(s"Warehouse path is '$warehousePath'.") - /** * Class for caching query results reused in future executions. */ @@ -102,9 +112,7 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S */ lazy val externalCatalog: ExternalCatalogWithListener = { val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( - SharedState.externalCatalogClassName(sparkContext.conf, initConfig), - sparkContext.conf, - sparkContext.hadoopConfiguration) + SharedState.externalCatalogClassName(conf), conf, hadoopConf) val defaultDbDefinition = CatalogDatabase( SessionCatalog.DEFAULT_DATABASE, @@ -138,7 +146,7 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S // System preserved database should not exists in metastore. However it's hard to guarantee it // for every session, because case-sensitivity differs. Here we always lowercase it to make our // life easier. - val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT) + val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT) if (externalCatalog.databaseExists(globalTempDB)) { throw new SparkException( s"$globalTempDB is a system preserved database, please rename your existing database " + @@ -166,11 +174,8 @@ object SharedState extends Logging { private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog" - private def externalCatalogClassName( - conf: SparkConf, - initSessionConfig: Map[String, String]): String = { - initSessionConfig - .getOrElse(CATALOG_IMPLEMENTATION.key, conf.get(CATALOG_IMPLEMENTATION)) match { + private def externalCatalogClassName(conf: SparkConf): String = { + conf.get(CATALOG_IMPLEMENTATION) match { case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME case "in-memory" => classOf[InMemoryCatalog].getCanonicalName } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala index 52a2a18cbf910..ed591977f9231 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala @@ -28,10 +28,10 @@ class SharedStateSuite extends SparkFunSuite { val ss = SparkSession.builder().enableHiveSupport().getOrCreate() assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName === "org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ") - val ss2 = SparkSession.builder().getOrCreate() + val ss2 = SparkSession.builder().getOrCreate() assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName === - "org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should shared across sessions") + "org.apache.spark.sql.hive.HiveExternalCatalog", + "The catalog should be shared across sessions") } - } From 8004c19f773c64cd612e34c0630e9fcdaf4e2f87 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 1 Feb 2019 13:33:03 +0800 Subject: [PATCH 05/15] typo & fix ut --- .../main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- .../org/apache/spark/sql/internal/SharedState.scala | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 091d68aa84f2b..a7bd2ef67eec5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -127,7 +127,7 @@ class SparkSession private( @Unstable @transient lazy val sharedState: SharedState = { - existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions.toMap)) + existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index cc997dbb3a5f2..1f853eb8a4e0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import java.net.URL import java.util.Locale +import scala.collection.Map import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -36,6 +37,7 @@ import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.{MutableURLClassLoader, Utils} + /** * A class that holds all state shared across sessions in a given [[SQLContext]]. */ @@ -48,11 +50,10 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S // `SharedState`, all `SparkSession` level configurations have higher priority to generate a // `SharedState` instance. This will be done only once then shared across `SparkSession`s initConfig.foreach { case (k, v) => - logDebug(s"Applying initiate SparkSession options to SparkConf/HadoopConf: $k -> $v") + logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v") conf.set(k, v) hadoopConf.set(k, v) } - logInfo("Applied all initiate SparkSession options to the brand new SharedState") // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. @@ -87,8 +88,10 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S sparkWarehouseDir } } + sparkContext.conf.set(WAREHOUSE_PATH.key, warehousePath) logInfo(s"Warehouse path is '$warehousePath'.") + /** * Class for caching query results reused in future executions. */ @@ -118,7 +121,7 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S SessionCatalog.DEFAULT_DATABASE, "default database", CatalogUtils.stringToURI(warehousePath), - Map()) + Map.empty[String, String]) // Create default database if it doesn't exist if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { // There may be another Spark application creating default database at the same time, here we From 10ebff783f6f68d463b52ad45feba6b84e39a675 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 1 Feb 2019 14:16:01 +0800 Subject: [PATCH 06/15] not import map --- .../scala/org/apache/spark/sql/internal/SharedState.scala | 7 ++++--- .../org/apache/spark/sql/internal/SharedStateSuite.scala | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 1f853eb8a4e0f..d777605936064 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.internal import java.net.URL import java.util.Locale -import scala.collection.Map import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -41,7 +40,9 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} /** * A class that holds all state shared across sessions in a given [[SQLContext]]. */ -private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[String, String]) +private[sql] class SharedState( + val sparkContext: SparkContext, + initConfig: scala.collection.Map[String, String]) extends Logging { private val conf = sparkContext.conf.clone() private val hadoopConf = new Configuration(sparkContext.hadoopConfiguration) @@ -121,7 +122,7 @@ private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[S SessionCatalog.DEFAULT_DATABASE, "default database", CatalogUtils.stringToURI(warehousePath), - Map.empty[String, String]) + Map()) // Create default database if it doesn't exist if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { // There may be another Spark application creating default database at the same time, here we diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala index ed591977f9231..8b1e066a74c7f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala @@ -24,7 +24,7 @@ class SharedStateSuite extends SparkFunSuite { test("the catalog should be determined at the very first") { val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") - val sc = new SparkContext(conf) + val sc = SparkContext.getOrCreate(conf) val ss = SparkSession.builder().enableHiveSupport().getOrCreate() assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName === "org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ") From f5c15e06cec62b346c0deef4a0d4de131767f351 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 1 Feb 2019 14:18:51 +0800 Subject: [PATCH 07/15] naming --- .../scala/org/apache/spark/sql/internal/SharedState.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index d777605936064..94cab4bdbf06f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} */ private[sql] class SharedState( val sparkContext: SparkContext, - initConfig: scala.collection.Map[String, String]) + initialConfigs: scala.collection.Map[String, String]) extends Logging { private val conf = sparkContext.conf.clone() private val hadoopConf = new Configuration(sparkContext.hadoopConfiguration) @@ -50,7 +50,7 @@ private[sql] class SharedState( // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing // `SharedState`, all `SparkSession` level configurations have higher priority to generate a // `SharedState` instance. This will be done only once then shared across `SparkSession`s - initConfig.foreach { case (k, v) => + initialConfigs.foreach { case (k, v) => logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v") conf.set(k, v) hadoopConf.set(k, v) From d57226a2823fec0c881a2a32d9d595725d0d481b Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 1 Feb 2019 17:09:40 +0800 Subject: [PATCH 08/15] set conf to lazy val --- .../spark/sql/internal/SharedState.scala | 46 +++++++++++-------- .../apache/spark/sql/hive/test/TestHive.scala | 2 +- .../HiveSharedStateSuite.scala} | 4 +- 3 files changed, 29 insertions(+), 23 deletions(-) rename sql/hive/src/test/scala/org/apache/spark/sql/{internal/SharedStateSuite.scala => hive/HiveSharedStateSuite.scala} (94%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 94cab4bdbf06f..13a8df1860714 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -39,21 +39,28 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} /** * A class that holds all state shared across sessions in a given [[SQLContext]]. + * + * @param sparkContext The Spark context associated with this SharedState + * @param initialConfigs The configs from the very first created SparkSession */ private[sql] class SharedState( val sparkContext: SparkContext, initialConfigs: scala.collection.Map[String, String]) extends Logging { - private val conf = sparkContext.conf.clone() - private val hadoopConf = new Configuration(sparkContext.hadoopConfiguration) - - // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing - // `SharedState`, all `SparkSession` level configurations have higher priority to generate a - // `SharedState` instance. This will be done only once then shared across `SparkSession`s - initialConfigs.foreach { case (k, v) => - logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v") - conf.set(k, v) - hadoopConf.set(k, v) + + private lazy val (conf, hadoopConf) = { + val confClone = sparkContext.conf.clone() + val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration) + // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing + // `SharedState`, all `SparkSession` level configurations have higher priority to generate a + // `SharedState` instance. This will be done only once then shared across `SparkSession`s + for((k, v) <- initialConfigs + if k != "hive.metastore.warehouse.dir" || k != WAREHOUSE_PATH.key) { + logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v") + confClone.set(k, v) + hadoopConfClone.set(k, v) + } + (confClone, hadoopConfClone) } // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on @@ -62,17 +69,17 @@ private[sql] class SharedState( val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") if (configFile != null) { logInfo(s"loading hive config file: $configFile") - hadoopConf.addResource(configFile) + sparkContext.hadoopConfiguration.addResource(configFile) } // hive.metastore.warehouse.dir only stay in hadoopConf - conf.remove("hive.metastore.warehouse.dir") + sparkContext.conf.remove("hive.metastore.warehouse.dir") // Set the Hive metastore warehouse path to the one we use - val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir") - if (hiveWarehouseDir != null && !conf.contains(WAREHOUSE_PATH.key)) { + val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") + if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) { // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, // we will respect the value of hive.metastore.warehouse.dir. - conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) + sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " + s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") @@ -82,14 +89,13 @@ private[sql] class SharedState( // the value of spark.sql.warehouse.dir. // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. - val sparkWarehouseDir = conf.get(WAREHOUSE_PATH) + val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH) logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " + s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") - hadoopConf.set("hive.metastore.warehouse.dir", sparkWarehouseDir) + sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir) sparkWarehouseDir } } - sparkContext.conf.set(WAREHOUSE_PATH.key, warehousePath) logInfo(s"Warehouse path is '$warehousePath'.") @@ -102,9 +108,9 @@ private[sql] class SharedState( * A status store to query SQL status/metrics of this Spark application, based on SQL-specific * [[org.apache.spark.scheduler.SparkListenerEvent]]s. */ - val statusStore: SQLAppStatusStore = { + lazy val statusStore: SQLAppStatusStore = { val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] - val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true) + val listener = new SQLAppStatusListener(conf, kvStore, live = true) sparkContext.listenerBus.addToStatusQueue(listener) val statusStore = new SQLAppStatusStore(kvStore, Some(listener)) sparkContext.ui.foreach(new SQLTab(statusStore, _)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 171697c65b0d8..5aa08e0af44cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -90,7 +90,7 @@ private[hive] class TestHiveExternalCatalog( private[hive] class TestHiveSharedState( sc: SparkContext, hiveClient: Option[HiveClient] = None) - extends SharedState(sc, Map.empty) { + extends SharedState(sc, initialConfigs = Map.empty) { override lazy val externalCatalog: ExternalCatalogWithListener = { new ExternalCatalogWithListener(new TestHiveExternalCatalog( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala similarity index 94% rename from sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index 8b1e066a74c7f..695104de6784e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.spark.sql.internal +package org.apache.spark.sql.hive import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.sql.SparkSession -class SharedStateSuite extends SparkFunSuite { +class HiveSharedStateSuite extends SparkFunSuite { test("the catalog should be determined at the very first") { val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") From 0c07ff13bf88a15d1bec81e39db4573c38649ed9 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 1 Feb 2019 19:48:44 +0800 Subject: [PATCH 09/15] fix ut --- .../spark/sql/hive/HiveSharedStateSuite.scala | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index 695104de6784e..328bc9e363b8a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -23,15 +23,23 @@ import org.apache.spark.sql.SparkSession class HiveSharedStateSuite extends SparkFunSuite { test("the catalog should be determined at the very first") { - val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") - val sc = SparkContext.getOrCreate(conf) - val ss = SparkSession.builder().enableHiveSupport().getOrCreate() - assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName === - "org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ") + SparkContext.getActive.foreach(_.stop()) + var sc: SparkContext = null + try { + val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") + sc = new SparkContext(conf) + val ss = SparkSession.builder().enableHiveSupport().getOrCreate() + assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName === + "org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ") - val ss2 = SparkSession.builder().getOrCreate() - assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName === - "org.apache.spark.sql.hive.HiveExternalCatalog", - "The catalog should be shared across sessions") + val ss2 = SparkSession.builder().getOrCreate() + assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName === + "org.apache.spark.sql.hive.HiveExternalCatalog", + "The catalog should be shared across sessions") + } finally { + if (sc != null && !sc.isStopped) { + sc.stop() + } + } } } From 35b3f1769803cf82065d186b8d99240f5d0446b2 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 1 Feb 2019 21:18:08 +0800 Subject: [PATCH 10/15] add some comments --- .../apache/spark/sql/internal/SharedState.scala | 17 ++++++++++++----- .../spark/sql/hive/HiveSharedStateSuite.scala | 9 ++++----- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 13a8df1860714..766e882a038e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -48,17 +48,24 @@ private[sql] class SharedState( initialConfigs: scala.collection.Map[String, String]) extends Logging { + // This variable should be lazy, because in the first place we need to load hive-site.xml into + // hadoopConf and determine the warehouse path which will be set into both spark conf and hadoop + // conf avoiding be affected by any SparkSession level options private lazy val (conf, hadoopConf) = { val confClone = sparkContext.conf.clone() val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration) // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing // `SharedState`, all `SparkSession` level configurations have higher priority to generate a // `SharedState` instance. This will be done only once then shared across `SparkSession`s - for((k, v) <- initialConfigs - if k != "hive.metastore.warehouse.dir" || k != WAREHOUSE_PATH.key) { - logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v") - confClone.set(k, v) - hadoopConfClone.set(k, v) + initialConfigs.foreach { + case (k, _) if k == "hive.metastore.warehouse.dir" || k == WAREHOUSE_PATH.key => + logWarning(s"Not allowing to set ${WAREHOUSE_PATH.key} or hive.metastore.warehouse.dir " + + s"in SparkSession's options, it should be set statically for cross-session usages") + case (k, v) => + logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v") + confClone.set(k, v) + hadoopConfClone.set(k, v) + } (confClone, hadoopConfClone) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index 328bc9e363b8a..942c8d1660d8e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -29,13 +29,12 @@ class HiveSharedStateSuite extends SparkFunSuite { val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") sc = new SparkContext(conf) val ss = SparkSession.builder().enableHiveSupport().getOrCreate() - assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName === - "org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ") + assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName + .contains("HiveExternalCatalog"), "The catalog should be hive ") val ss2 = SparkSession.builder().getOrCreate() - assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName === - "org.apache.spark.sql.hive.HiveExternalCatalog", - "The catalog should be shared across sessions") + assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName + .contains("HiveExternalCatalog"), "The catalog should be shared across sessions") } finally { if (sc != null && !sc.isStopped) { sc.stop() From 0cac5e6c353c32e71ae2bd49888d4755918acd7b Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sat, 2 Feb 2019 00:53:23 +0800 Subject: [PATCH 11/15] add more ut --- .../spark/sql/hive/HiveSharedStateSuite.scala | 60 +++++++++++++------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index 942c8d1660d8e..f9048a5a254ff 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -17,28 +17,54 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.hive.conf.HiveConf.ConfVars + import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SharedState +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.util.Utils class HiveSharedStateSuite extends SparkFunSuite { test("the catalog should be determined at the very first") { - SparkContext.getActive.foreach(_.stop()) - var sc: SparkContext = null - try { - val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") - sc = new SparkContext(conf) - val ss = SparkSession.builder().enableHiveSupport().getOrCreate() - assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName - .contains("HiveExternalCatalog"), "The catalog should be hive ") - - val ss2 = SparkSession.builder().getOrCreate() - assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName - .contains("HiveExternalCatalog"), "The catalog should be shared across sessions") - } finally { - if (sc != null && !sc.isStopped) { - sc.stop() - } - } + val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") + val sc = SparkContext.getOrCreate(conf) + val ss = SparkSession.builder().enableHiveSupport().getOrCreate() + assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName + .contains("HiveExternalCatalog"), "The catalog should be hive ") + + val ss2 = SparkSession.builder().getOrCreate() + assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName + .contains("HiveExternalCatalog"), "The catalog should be shared across sessions") + + } + + test("using initial configs to generate SharedState") { + val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") + val sc = SparkContext.getOrCreate(conf) + val invalidPath = "invalid/path" + val metastorePath = Utils.createTempDir() + val tmpDb = "tmp_db" + val initialConfigs = Map("spark.foo" -> "bar", + WAREHOUSE_PATH.key -> invalidPath, + ConfVars.METASTOREWAREHOUSE.varname -> invalidPath, + CATALOG_IMPLEMENTATION.key -> "hive", + ConfVars.METASTORECONNECTURLKEY.varname -> + s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true", + GLOBAL_TEMP_DATABASE.key -> tmpDb) + val state = new SharedState(sc, initialConfigs) + assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options") + assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath, + "warehouse conf in session options can't affect application wide spark conf") + assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath, + "warehouse conf in session options can't affect application wide hadoop conf") + + assert(!state.sparkContext.conf.contains("spark.foo"), + "static spark conf should not be affected by session") + assert(state.sparkContext.conf.get(CATALOG_IMPLEMENTATION) === "in-memory") + assert(state.globalTempViewManager.database === tmpDb) + assert(state.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], + "Initial SparkSession options can determine the catalog") } } From 92fec4d3777b1f4c909dbcfc6015f150393e0a27 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sat, 2 Feb 2019 10:00:57 +0800 Subject: [PATCH 12/15] fix ut --- .../scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index f9048a5a254ff..f785d5f821864 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -62,7 +62,6 @@ class HiveSharedStateSuite extends SparkFunSuite { assert(!state.sparkContext.conf.contains("spark.foo"), "static spark conf should not be affected by session") - assert(state.sparkContext.conf.get(CATALOG_IMPLEMENTATION) === "in-memory") assert(state.globalTempViewManager.database === tmpDb) assert(state.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], "Initial SparkSession options can determine the catalog") From 391e14c55085bfe6d360351ff3a55ded1633bd61 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 12 Feb 2019 10:28:30 +0800 Subject: [PATCH 13/15] mv confs behind warehousePath --- .../spark/sql/internal/SharedState.scala | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 766e882a038e8..ea33c6b22ac8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -48,28 +48,6 @@ private[sql] class SharedState( initialConfigs: scala.collection.Map[String, String]) extends Logging { - // This variable should be lazy, because in the first place we need to load hive-site.xml into - // hadoopConf and determine the warehouse path which will be set into both spark conf and hadoop - // conf avoiding be affected by any SparkSession level options - private lazy val (conf, hadoopConf) = { - val confClone = sparkContext.conf.clone() - val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration) - // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing - // `SharedState`, all `SparkSession` level configurations have higher priority to generate a - // `SharedState` instance. This will be done only once then shared across `SparkSession`s - initialConfigs.foreach { - case (k, _) if k == "hive.metastore.warehouse.dir" || k == WAREHOUSE_PATH.key => - logWarning(s"Not allowing to set ${WAREHOUSE_PATH.key} or hive.metastore.warehouse.dir " + - s"in SparkSession's options, it should be set statically for cross-session usages") - case (k, v) => - logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v") - confClone.set(k, v) - hadoopConfClone.set(k, v) - - } - (confClone, hadoopConfClone) - } - // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. val warehousePath: String = { @@ -105,6 +83,27 @@ private[sql] class SharedState( } logInfo(s"Warehouse path is '$warehousePath'.") + // This variable should be initiated after `warehousePath`, because in the first place we need + // to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into + // both spark conf and hadoop conf avoiding be affected by any SparkSession level options + private val (conf, hadoopConf) = { + val confClone = sparkContext.conf.clone() + val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration) + // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing + // `SharedState`, all `SparkSession` level configurations have higher priority to generate a + // `SharedState` instance. This will be done only once then shared across `SparkSession`s + initialConfigs.foreach { + case (k, _) if k == "hive.metastore.warehouse.dir" || k == WAREHOUSE_PATH.key => + logWarning(s"Not allowing to set ${WAREHOUSE_PATH.key} or hive.metastore.warehouse.dir " + + s"in SparkSession's options, it should be set statically for cross-session usages") + case (k, v) => + logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v") + confClone.set(k, v) + hadoopConfClone.set(k, v) + + } + (confClone, hadoopConfClone) + } /** * Class for caching query results reused in future executions. @@ -115,7 +114,7 @@ private[sql] class SharedState( * A status store to query SQL status/metrics of this Spark application, based on SQL-specific * [[org.apache.spark.scheduler.SparkListenerEvent]]s. */ - lazy val statusStore: SQLAppStatusStore = { + val statusStore: SQLAppStatusStore = { val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] val listener = new SQLAppStatusListener(conf, kvStore, live = true) sparkContext.listenerBus.addToStatusQueue(listener) From eff569b80cf5b41d73b6bf55af9e22cad1a92859 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 12 Feb 2019 11:44:49 +0800 Subject: [PATCH 14/15] meaningful titles for uts --- .../spark/sql/internal/SharedState.scala | 2 +- .../apache/spark/sql/hive/test/TestHive.scala | 2 +- .../spark/sql/hive/HiveSharedStateSuite.scala | 26 +++++++++++++------ 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index ea33c6b22ac8d..3dd2ae61e8cd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -83,7 +83,7 @@ private[sql] class SharedState( } logInfo(s"Warehouse path is '$warehousePath'.") - // This variable should be initiated after `warehousePath`, because in the first place we need + // These 2 variables should be initiated after `warehousePath`, because in the first place we need // to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into // both spark conf and hadoop conf avoiding be affected by any SparkSession level options private val (conf, hadoopConf) = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 5aa08e0af44cb..c45f3e706e9e7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -90,7 +90,7 @@ private[hive] class TestHiveExternalCatalog( private[hive] class TestHiveSharedState( sc: SparkContext, hiveClient: Option[HiveClient] = None) - extends SharedState(sc, initialConfigs = Map.empty) { + extends SharedState(sc, initialConfigs = Map.empty[String, String]) { override lazy val externalCatalog: ExternalCatalogWithListener = { new ExternalCatalogWithListener(new TestHiveExternalCatalog( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index f785d5f821864..1f7ade6e3b023 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -27,25 +27,28 @@ import org.apache.spark.util.Utils class HiveSharedStateSuite extends SparkFunSuite { - test("the catalog should be determined at the very first") { + test("enableHiveSupport has right to determine the catalog while using an existing sc") { val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") val sc = SparkContext.getOrCreate(conf) val ss = SparkSession.builder().enableHiveSupport().getOrCreate() - assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName - .contains("HiveExternalCatalog"), "The catalog should be hive ") + assert(ss.sharedState.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], + "The catalog should be hive ") val ss2 = SparkSession.builder().getOrCreate() - assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName - .contains("HiveExternalCatalog"), "The catalog should be shared across sessions") - + assert(ss2.sharedState.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], + "The catalog should be shared across sessions") } - test("using initial configs to generate SharedState") { + test("initial configs should be passed to SharedState but not SparkContext") { val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") val sc = SparkContext.getOrCreate(conf) val invalidPath = "invalid/path" val metastorePath = Utils.createTempDir() val tmpDb = "tmp_db" + + // The initial configs used to generate SharedState, none of these should affect the global + // shared SparkContext's configurations. Especially, all these configs are passed to the cloned + // confs inside SharedState except metastore warehouse dir. val initialConfigs = Map("spark.foo" -> "bar", WAREHOUSE_PATH.key -> invalidPath, ConfVars.METASTOREWAREHOUSE.varname -> invalidPath, @@ -53,6 +56,7 @@ class HiveSharedStateSuite extends SparkFunSuite { ConfVars.METASTORECONNECTURLKEY.varname -> s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true", GLOBAL_TEMP_DATABASE.key -> tmpDb) + val state = new SharedState(sc, initialConfigs) assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options") assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath, @@ -62,8 +66,14 @@ class HiveSharedStateSuite extends SparkFunSuite { assert(!state.sparkContext.conf.contains("spark.foo"), "static spark conf should not be affected by session") - assert(state.globalTempViewManager.database === tmpDb) assert(state.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], "Initial SparkSession options can determine the catalog") + val client = state.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client + assert(client.getConf("spark.foo", "") === "bar", + "session level conf should be passed to catalog") + assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, invalidPath) !== invalidPath, + "session level conf should be passed to catalog except warehouse dir") + + assert(state.globalTempViewManager.database === tmpDb) } } From 748821c86d063e4b57c8297ae3e259934db544c2 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 13 Feb 2019 14:11:31 +0800 Subject: [PATCH 15/15] rm one ut --- .../spark/sql/hive/HiveSharedStateSuite.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index 1f7ade6e3b023..6e2dcfc04d498 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -20,25 +20,12 @@ package org.apache.spark.sql.hive import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.SharedState import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.util.Utils class HiveSharedStateSuite extends SparkFunSuite { - test("enableHiveSupport has right to determine the catalog while using an existing sc") { - val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") - val sc = SparkContext.getOrCreate(conf) - val ss = SparkSession.builder().enableHiveSupport().getOrCreate() - assert(ss.sharedState.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], - "The catalog should be hive ") - - val ss2 = SparkSession.builder().getOrCreate() - assert(ss2.sharedState.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], - "The catalog should be shared across sessions") - } - test("initial configs should be passed to SharedState but not SparkContext") { val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") val sc = SparkContext.getOrCreate(conf)