Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26794][SQL]SparkSession enableHiveSupport does not point to hive but in-memory while the SparkContext exists #23709

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class SparkSession private(
@Unstable
@transient
lazy val sharedState: SharedState = {
existingSharedState.getOrElse(new SharedState(sparkContext))
existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions.toMap))
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*/
private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[String, String])
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
extends Logging {

// Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
// the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
Expand Down Expand Up @@ -101,7 +102,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
*/
lazy val externalCatalog: ExternalCatalogWithListener = {
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
SharedState.externalCatalogClassName(sparkContext.conf, initConfig),
sparkContext.conf,
sparkContext.hadoopConfiguration)

Expand Down Expand Up @@ -165,8 +166,11 @@ object SharedState extends Logging {

private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog"

private def externalCatalogClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
private def externalCatalogClassName(
conf: SparkConf,
initSessionConfig: Map[String, String]): String = {
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
initSessionConfig
.getOrElse(CATALOG_IMPLEMENTATION.key, conf.get(CATALOG_IMPLEMENTATION)) match {
case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[hive] class TestHiveExternalCatalog(
private[hive] class TestHiveSharedState(
sc: SparkContext,
hiveClient: Option[HiveClient] = None)
extends SharedState(sc) {
extends SharedState(sc, Map.empty) {
yaooqinn marked this conversation as resolved.
Show resolved Hide resolved

override lazy val externalCatalog: ExternalCatalogWithListener = {
new ExternalCatalogWithListener(new TestHiveExternalCatalog(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.internal
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.SparkSession

class SharedStateSuite extends SparkFunSuite {

test("the catalog should be determined at the very first") {
val conf = new SparkConf().setMaster("local").setAppName("SharedState Test")
val sc = new SparkContext(conf)
val ss = SparkSession.builder().enableHiveSupport().getOrCreate()
assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName ===
"org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ")
val ss2 = SparkSession.builder().getOrCreate()

HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName ===
"org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should shared across sessions")
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
}

}