Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26794][SQL]SparkSession enableHiveSupport does not point to hive but in-memory while the SparkContext exists #23709

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class SparkSession private(
@Unstable
@transient
lazy val sharedState: SharedState = {
existingSharedState.getOrElse(new SharedState(sparkContext))
existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,39 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*/
private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
private[sql] class SharedState(
val sparkContext: SparkContext,
initialConfigs: scala.collection.Map[String, String])
yaooqinn marked this conversation as resolved.
Show resolved Hide resolved
extends Logging {
private val conf = sparkContext.conf.clone()
private val hadoopConf = new Configuration(sparkContext.hadoopConfiguration)
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved

// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
// `SharedState`, all `SparkSession` level configurations have higher priority to generate a
// `SharedState` instance. This will be done only once then shared across `SparkSession`s
initialConfigs.foreach { case (k, v) =>
logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v")
conf.set(k, v)
hadoopConf.set(k, v)
}

// Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
// the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
val warehousePath: String = {
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
logInfo(s"loading hive config file: $configFile")
sparkContext.hadoopConfiguration.addResource(configFile)
hadoopConf.addResource(configFile)
}

// hive.metastore.warehouse.dir only stay in hadoopConf
sparkContext.conf.remove("hive.metastore.warehouse.dir")
conf.remove("hive.metastore.warehouse.dir")
yaooqinn marked this conversation as resolved.
Show resolved Hide resolved
// Set the Hive metastore warehouse path to the one we use
val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !conf.contains(WAREHOUSE_PATH.key)) {
// If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
// we will respect the value of hive.metastore.warehouse.dir.
sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
Expand All @@ -68,13 +82,14 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
// the value of spark.sql.warehouse.dir.
// When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
// we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
val sparkWarehouseDir = conf.get(WAREHOUSE_PATH)
logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " +
s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
yaooqinn marked this conversation as resolved.
Show resolved Hide resolved
hadoopConf.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
sparkWarehouseDir
}
}
sparkContext.conf.set(WAREHOUSE_PATH.key, warehousePath)
logInfo(s"Warehouse path is '$warehousePath'.")


yaooqinn marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -101,9 +116,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
*/
lazy val externalCatalog: ExternalCatalogWithListener = {
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
sparkContext.hadoopConfiguration)
SharedState.externalCatalogClassName(conf), conf, hadoopConf)

val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
Expand Down Expand Up @@ -137,7 +150,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
// System preserved database should not exists in metastore. However it's hard to guarantee it
// for every session, because case-sensitivity differs. Here we always lowercase it to make our
// life easier.
val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
if (externalCatalog.databaseExists(globalTempDB)) {
throw new SparkException(
s"$globalTempDB is a system preserved database, please rename your existing database " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[hive] class TestHiveExternalCatalog(
private[hive] class TestHiveSharedState(
sc: SparkContext,
hiveClient: Option[HiveClient] = None)
extends SharedState(sc) {
extends SharedState(sc, Map.empty) {
yaooqinn marked this conversation as resolved.
Show resolved Hide resolved

override lazy val externalCatalog: ExternalCatalogWithListener = {
new ExternalCatalogWithListener(new TestHiveExternalCatalog(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.internal
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.SparkSession

class SharedStateSuite extends SparkFunSuite {

test("the catalog should be determined at the very first") {
val conf = new SparkConf().setMaster("local").setAppName("SharedState Test")
val sc = SparkContext.getOrCreate(conf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being used?

Copy link
Member

@gatorsmile gatorsmile Feb 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Create a spark context first
SparkContext.getOrCreate(conf)

val ss = SparkSession.builder().enableHiveSupport().getOrCreate()
assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName ===
"org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ")

HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
val ss2 = SparkSession.builder().getOrCreate()
assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName ===
"org.apache.spark.sql.hive.HiveExternalCatalog",
"The catalog should be shared across sessions")
}
}