-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState #16826
Changes from 37 commits
18ce1b8
9beb78d
a343d8a
4210079
6da6bda
579d0b7
2837e73
8c00344
f423f74
b1371d8
2cee190
e2bbfa8
8ac778a
0c732ce
3c995e1
292011a
b027412
295ee41
847b484
9beba84
3d2e4a6
4f70d12
dd2dedd
8a8d47b
ffc2058
16824f9
fd11ee2
437b0bc
300d3a0
3ee271f
c3f052f
0bdc81c
2740c63
0f167db
2f0b1ad
c41e7bc
5eb6733
05abcf8
4c23e7a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,7 +50,6 @@ object SessionCatalog { | |
class SessionCatalog( | ||
externalCatalog: ExternalCatalog, | ||
globalTempViewManager: GlobalTempViewManager, | ||
functionResourceLoader: FunctionResourceLoader, | ||
functionRegistry: FunctionRegistry, | ||
conf: CatalystConf, | ||
hadoopConf: Configuration, | ||
|
@@ -66,16 +65,19 @@ class SessionCatalog( | |
this( | ||
externalCatalog, | ||
new GlobalTempViewManager("global_temp"), | ||
DummyFunctionResourceLoader, | ||
functionRegistry, | ||
conf, | ||
new Configuration(), | ||
CatalystSqlParser) | ||
functionResourceLoader = DummyFunctionResourceLoader | ||
} | ||
|
||
// For testing only. | ||
def this(externalCatalog: ExternalCatalog) { | ||
this(externalCatalog, new SimpleFunctionRegistry, new SimpleCatalystConf(true)) | ||
this( | ||
externalCatalog, | ||
new SimpleFunctionRegistry, | ||
SimpleCatalystConf(caseSensitiveAnalysis = true)) | ||
} | ||
|
||
/** List of temporary tables, mapping from table name to their logical plan. */ | ||
|
@@ -89,6 +91,8 @@ class SessionCatalog( | |
@GuardedBy("this") | ||
protected var currentDb = formatDatabaseName(DEFAULT_DATABASE) | ||
|
||
@volatile var functionResourceLoader: FunctionResourceLoader = _ | ||
|
||
/** | ||
* Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), | ||
* i.e. if this name only contains characters, numbers, and _. | ||
|
@@ -987,6 +991,9 @@ class SessionCatalog( | |
* by a tuple (resource type, resource uri). | ||
*/ | ||
def loadFunctionResources(resources: Seq[FunctionResource]): Unit = { | ||
if (functionResourceLoader == null) { | ||
throw new IllegalStateException("functionResourceLoader has not yet been initialized") | ||
} | ||
resources.foreach(functionResourceLoader.loadResource) | ||
} | ||
|
||
|
@@ -1182,4 +1189,29 @@ class SessionCatalog( | |
} | ||
} | ||
|
||
/** | ||
* Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and | ||
* `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied. | ||
*/ | ||
def newSessionCatalogWith( | ||
conf: CatalystConf, | ||
hadoopConf: Configuration, | ||
functionRegistry: FunctionRegistry, | ||
parser: ParserInterface): SessionCatalog = { | ||
val catalog = new SessionCatalog( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you are copying... shouldnt all the fields be copied, instead of just being reused? |
||
externalCatalog, | ||
globalTempViewManager, | ||
functionRegistry, | ||
conf, | ||
hadoopConf, | ||
parser) | ||
|
||
synchronized { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. resolved There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesnt look like. its still There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, I should mention we decided to keep it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it should be |
||
catalog.currentDb = currentDb | ||
// copy over temporary tables | ||
tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) | ||
} | ||
|
||
catalog | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
|
||
package org.apache.spark.sql.catalyst.catalog | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
|
||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier} | ||
import org.apache.spark.sql.catalyst.analysis._ | ||
|
@@ -1197,6 +1199,59 @@ class SessionCatalogSuite extends PlanTest { | |
} | ||
} | ||
|
||
test("clone SessionCatalog - temp views") { | ||
val externalCatalog = newEmptyCatalog() | ||
val original = new SessionCatalog(externalCatalog) | ||
val tempTable1 = Range(1, 10, 1, 10) | ||
original.createTempView("copytest1", tempTable1, overrideIfExists = false) | ||
|
||
// check if tables copied over | ||
val clone = original.newSessionCatalogWith( | ||
SimpleCatalystConf(caseSensitiveAnalysis = true), | ||
new Configuration(), | ||
new SimpleFunctionRegistry, | ||
CatalystSqlParser) | ||
assert(original ne clone) | ||
assert(clone.getTempView("copytest1") == Some(tempTable1)) | ||
|
||
// check if clone and original independent | ||
clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = false, purge = false) | ||
assert(original.getTempView("copytest1") == Some(tempTable1)) | ||
|
||
val tempTable2 = Range(1, 20, 2, 10) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should also test that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a test for this. |
||
original.createTempView("copytest2", tempTable2, overrideIfExists = false) | ||
assert(clone.getTempView("copytest2").isEmpty) | ||
} | ||
|
||
test("clone SessionCatalog - current db") { | ||
val externalCatalog = newEmptyCatalog() | ||
val db1 = "db1" | ||
val db2 = "db2" | ||
val db3 = "db3" | ||
|
||
externalCatalog.createDatabase(newDb(db1), ignoreIfExists = true) | ||
externalCatalog.createDatabase(newDb(db2), ignoreIfExists = true) | ||
externalCatalog.createDatabase(newDb(db3), ignoreIfExists = true) | ||
|
||
val original = new SessionCatalog(externalCatalog) | ||
original.setCurrentDatabase(db1) | ||
|
||
// check if current db copied over | ||
val clone = original.newSessionCatalogWith( | ||
SimpleCatalystConf(caseSensitiveAnalysis = true), | ||
new Configuration(), | ||
new SimpleFunctionRegistry, | ||
CatalystSqlParser) | ||
assert(original ne clone) | ||
assert(clone.getCurrentDatabase == db1) | ||
|
||
// check if clone and original independent | ||
clone.setCurrentDatabase(db2) | ||
assert(original.getCurrentDatabase == db1) | ||
original.setCurrentDatabase(db3) | ||
assert(clone.getCurrentDatabase == db2) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about? test("clone SessionCatalog - current db") {
val externalCatalog = newEmptyCatalog()
val db1 = "db1"
val db2 = "db2"
val db3 = "db3"
externalCatalog.createDatabase(newDb(db1), ignoreIfExists = true)
externalCatalog.createDatabase(newDb(db2), ignoreIfExists = true)
externalCatalog.createDatabase(newDb(db3), ignoreIfExists = true)
val original = new SessionCatalog(externalCatalog)
original.setCurrentDatabase(db1)
// check if current db copied over
val clone = original.clone(
SimpleCatalystConf(caseSensitiveAnalysis = true),
new Configuration(),
new SimpleFunctionRegistry,
CatalystSqlParser)
assert(original != clone)
assert(clone.getCurrentDatabase == db1)
// check if clone and original independent
clone.setCurrentDatabase(db2)
assert(original.getCurrentDatabase == db1)
original.setCurrentDatabase(db3)
assert(clone.getCurrentDatabase == db2)
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
test("SPARK-19737: detect undefined functions without triggering relation resolution") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this supposed to be part of this PR? |
||
import org.apache.spark.sql.catalyst.dsl.plans._ | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,4 +46,10 @@ class ExperimentalMethods private[sql]() { | |
|
||
@volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil | ||
|
||
override def clone(): ExperimentalMethods = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It sounds like we also need to add sync for both There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, added a sync block. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gatorsmile any reason you want to add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I reviewing the code, I just found they are not thread-safe, although they have already been declared volatile. I am fine to keep them unchanged. How about leaving a comment in the code to emphasize that it is not thread-safe. |
||
val result = new ExperimentalMethods | ||
result.extraStrategies = extraStrategies | ||
result.extraOptimizations = extraOptimizations | ||
result | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,6 @@ import java.io.Closeable | |
import java.util.concurrent.atomic.AtomicReference | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.reflect.ClassTag | ||
import scala.reflect.runtime.universe.TypeTag | ||
import scala.util.control.NonFatal | ||
|
||
|
@@ -43,7 +42,7 @@ import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} | |
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION | ||
import org.apache.spark.sql.sources.BaseRelation | ||
import org.apache.spark.sql.streaming._ | ||
import org.apache.spark.sql.types.{DataType, LongType, StructType} | ||
import org.apache.spark.sql.types.{DataType, StructType} | ||
import org.apache.spark.sql.util.ExecutionListenerManager | ||
import org.apache.spark.util.Utils | ||
|
||
|
@@ -67,15 +66,22 @@ import org.apache.spark.util.Utils | |
* .config("spark.some.config.option", "some-value") | ||
* .getOrCreate() | ||
* }}} | ||
* | ||
* @param sparkContext The Spark context associated with this Spark session. | ||
* @param existingSharedState If supplied, use the existing shared state | ||
* instead of creating a new one. | ||
* @param parentSessionState If supplied, inherit all session state (i.e. temporary | ||
* views, SQL config, UDFs etc) from parent. | ||
*/ | ||
@InterfaceStability.Stable | ||
class SparkSession private( | ||
@transient val sparkContext: SparkContext, | ||
@transient private val existingSharedState: Option[SharedState]) | ||
@transient private val existingSharedState: Option[SharedState], | ||
@transient private val parentSessionState: Option[SessionState]) | ||
extends Serializable with Closeable with Logging { self => | ||
|
||
private[sql] def this(sc: SparkContext) { | ||
this(sc, None) | ||
this(sc, None, None) | ||
} | ||
|
||
sparkContext.assertNotStopped() | ||
|
@@ -108,6 +114,7 @@ class SparkSession private( | |
/** | ||
* State isolated across sessions, including SQL configurations, temporary tables, registered | ||
* functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]]. | ||
* If `parentSessionState` is not null, the `SessionState` will be a copy of the parent. | ||
* | ||
* This is internal to Spark and there is no guarantee on interface stability. | ||
* | ||
|
@@ -116,9 +123,13 @@ class SparkSession private( | |
@InterfaceStability.Unstable | ||
@transient | ||
lazy val sessionState: SessionState = { | ||
SparkSession.reflect[SessionState, SparkSession]( | ||
SparkSession.sessionStateClassName(sparkContext.conf), | ||
self) | ||
parentSessionState | ||
.map(_.clone(this)) | ||
.getOrElse { | ||
SparkSession.instantiateSessionState( | ||
SparkSession.sessionStateClassName(sparkContext.conf), | ||
self) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -208,7 +219,25 @@ class SparkSession private( | |
* @since 2.0.0 | ||
*/ | ||
def newSession(): SparkSession = { | ||
new SparkSession(sparkContext, Some(sharedState)) | ||
new SparkSession(sparkContext, Some(sharedState), parentSessionState = None) | ||
} | ||
|
||
/** | ||
* Create an identical copy of this `SparkSession`, sharing the underlying `SparkContext` | ||
* and shared state. All the state of this session (i.e. SQL configurations, temporary tables, | ||
* registered functions) is copied over, and the cloned session is set up with the same shared | ||
* state as this session. The cloned session is independent of this session, that is, any | ||
* non-global change in either session is not reflected in the other. | ||
* | ||
* @note Other than the `SparkContext`, all shared state is initialized lazily. | ||
* This method will force the initialization of the shared state to ensure that parent | ||
* and child sessions are set up with the same shared state. If the underlying catalog | ||
* implementation is Hive, this will initialize the metastore, which may take some time. | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: please add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not remove the boolean flag and just call this cloneSession? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That seems cleaner, fixed. |
||
private[sql] def cloneSession(): SparkSession = { | ||
val result = new SparkSession(sparkContext, Some(sharedState), Some(sessionState)) | ||
result.sessionState // force copy of SessionState | ||
result | ||
} | ||
|
||
|
||
|
@@ -971,16 +1000,18 @@ object SparkSession { | |
} | ||
|
||
/** | ||
* Helper method to create an instance of [[T]] using a single-arg constructor that | ||
* accepts an [[Arg]]. | ||
* Helper method to create an instance of `SessionState` based on `className` from conf. | ||
* The result is either `SessionState` or `HiveSessionState`. | ||
*/ | ||
private def reflect[T, Arg <: AnyRef]( | ||
private def instantiateSessionState( | ||
className: String, | ||
ctorArg: Arg)(implicit ctorArgTag: ClassTag[Arg]): T = { | ||
sparkSession: SparkSession): SessionState = { | ||
|
||
try { | ||
// get `SessionState.apply(SparkSession)` | ||
val clazz = Utils.classForName(className) | ||
val ctor = clazz.getDeclaredConstructor(ctorArgTag.runtimeClass) | ||
ctor.newInstance(ctorArg).asInstanceOf[T] | ||
val method = clazz.getMethod("apply", sparkSession.getClass) | ||
method.invoke(null, sparkSession).asInstanceOf[SessionState] | ||
} catch { | ||
case NonFatal(e) => | ||
throw new IllegalArgumentException(s"Error while instantiating '$className':", e) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,7 +66,8 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { | |
* Preprocess [[CreateTable]], to do some normalization and checking. | ||
*/ | ||
case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[LogicalPlan] { | ||
private val catalog = sparkSession.sessionState.catalog | ||
// catalog is a def and not a val/lazy val as the latter would introduce a circular reference | ||
private def catalog = sparkSession.sessionState.catalog | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add comments saying that this should be a def and not a val or lazy val as this would introduce circular references. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point |
||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
// When we CREATE TABLE without specifying the table schema, we should fail the query if | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for adding the param name