-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState #16826
Conversation
…ered functions) when forking a new SparkSession.
… overloaded functions for Java bytecode compatibility.
Hey Ryan @zsxwing |
What is the semantics? Do functions/settings on the base SparkSession affect the new forked? |
@rxin |
ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made one pass. Overall looks good. Could you also add the semantics in the PR description?
} | ||
|
||
val result = new ExperimentalMethods | ||
result.extraStrategies = cloneSeq(extraStrategies) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to copy these two Seq
s since they are not mutable Seq
s.
private[sql] class SessionState(sparkSession: SparkSession) { | ||
private[sql] class SessionState( | ||
sparkSession: SparkSession, | ||
existingSessionState: Option[SessionState]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: existingSessionState
-> parentSessionState
to indicate we should copy its internal states.
lazy val conf: SQLConf = new SQLConf | ||
lazy val conf: SQLConf = { | ||
val result = new SQLConf | ||
if (existingSessionState.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
existingSessionState.foreach(_.conf.getAllConfs.foreach {
case (k, v) => if (v ne null) result.setConfString(k, v)
})
|
||
/** | ||
* Internal catalog for managing functions registered by the user. | ||
*/ | ||
lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() | ||
lazy val functionRegistry: FunctionRegistry = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to just add a copy
method to FunctionRegistry
to simplify these codes.
@@ -115,16 +113,22 @@ class TestHiveContext( | |||
private[hive] class TestHiveSparkSession( | |||
@transient private val sc: SparkContext, | |||
@transient private val existingSharedState: Option[SharedState], | |||
existingSessionState: Option[SessionState], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you don't need to change this file?
@@ -151,7 +155,7 @@ private[hive] class TestHiveSparkSession( | |||
new TestHiveSessionState(self) | |||
|
|||
override def newSession(): TestHiveSparkSession = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can change it to override def newSession(inheritSessionState: Boolean)
instead
* This method will force the initialization of the shared state to ensure that parent | ||
* and child sessions are set up with the same shared state. If the underlying catalog | ||
* implementation is Hive, this will initialize the metastore, which may take some time. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please add @Experimental
and @InterfaceStability.Evolving
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not remove the boolean flag and just call this cloneSession?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems cleaner, fixed.
@@ -213,6 +218,24 @@ class SparkSession private( | |||
new SparkSession(sparkContext, Some(sharedState)) | |||
} | |||
|
|||
/** | |||
* Start a new session, sharing the underlying `SparkContext` and cached data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add :: Experimental ::
val activeSession = SparkSession | ||
.builder() | ||
.master("local") | ||
.config("spark-configb", "b") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in the shared state. You should use SparkSession.conf.set
instead.
@kunalkhamar by the way, please add the JIRA number to the title. |
Test build #72606 has finished for PR 16826 at commit
|
@kunalkhamar you should create a JIRA ticket for this. In addition, I'm not a big fan of the design to pass a base session in. It'd be simpler if there is just a clone method on sessionstate and the associated states we store, and then cloning a new sparksession is just creating a new sparksession with cloned sessionstate. |
… cloning SessionState.
Test build #72679 has finished for PR 16826 at commit
|
…ject.clone() issues.
Test build #72727 has finished for PR 16826 at commit
|
Test build #72897 has finished for PR 16826 at commit
|
…tialize all fields directly instead. Same change for HiveSessionState.
Test build #72968 has finished for PR 16826 at commit
|
Test build #72970 has finished for PR 16826 at commit
|
What's WIP about this? |
@rxin Also, currently There are fields inside Any thoughts on this? |
Test build #74039 has finished for PR 16826 at commit
|
Test build #74044 has finished for PR 16826 at commit
|
Test build #74056 has finished for PR 16826 at commit
|
val original = new SessionCatalog(externalCatalog) | ||
val tempTable1 = Range(1, 10, 1, 10) | ||
val db1 = "copytest1" | ||
original.createTempView(db1, tempTable1, overrideIfExists = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
db1
? let us give a reasonable view name?
// check if clone and original independent | ||
val db2 = "copytest2" | ||
val tempTable2 = Range(1, 20, 2, 20) | ||
clone.createTempView(db2, tempTable2, overrideIfExists = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same here.
original.createTempView(db3, tempTable3, overrideIfExists = false) | ||
original.setCurrentDatabase(db3) | ||
assert(clone.getCurrentDatabase == db2) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about?
test("clone SessionCatalog - current db") {
val externalCatalog = newEmptyCatalog()
val db1 = "db1"
val db2 = "db2"
val db3 = "db3"
externalCatalog.createDatabase(newDb(db1), ignoreIfExists = true)
externalCatalog.createDatabase(newDb(db2), ignoreIfExists = true)
externalCatalog.createDatabase(newDb(db3), ignoreIfExists = true)
val original = new SessionCatalog(externalCatalog)
original.setCurrentDatabase(db1)
// check if current db copied over
val clone = original.clone(
SimpleCatalystConf(caseSensitiveAnalysis = true),
new Configuration(),
new SimpleFunctionRegistry,
CatalystSqlParser)
assert(original != clone)
assert(clone.getCurrentDatabase == db1)
// check if clone and original independent
clone.setCurrentDatabase(db2)
assert(original.getCurrentDatabase == db1)
original.setCurrentDatabase(db3)
assert(clone.getCurrentDatabase == db2)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations. | ||
* @param streamingQueryManager Interface to start and stop | ||
* [[org.apache.spark.sql.streaming.StreamingQuery]]s. | ||
* @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us document all parms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm most of the param documentation here are actually kind of useless (very little information conveyed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin Removing the redundant comments in SPARK-20048.
|
||
/** | ||
* SQL-specific key-value configurations. | ||
* Logical query plan optimizer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove the extra space
@@ -278,6 +278,8 @@ private[hive] class HiveClientImpl( | |||
state.getConf.setClassLoader(clientLoader.classLoader) | |||
// Set the thread local metastore client to the client associated with this HiveClientImpl. | |||
Hive.set(client) | |||
// Replace conf in the thread local Hive with current conf | |||
Hive.get(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because IsolatedClientLoader
is shared? If we do not make this change, any test case failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of reusingIsolatedClientLoader.cachedHive
. Without this line, it may use an out-of-date HiveConf. The failed test is HiveSparkSubmitSuite.test("SPARK-18360: default table path of tables in default database should depend on the " + "location of default database")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
functionRegistry: FunctionRegistry, | ||
override val catalog: HiveSessionCatalog, | ||
sqlParser: ParserInterface, | ||
val metadataHive: HiveClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for avoiding using lazy val
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using SparkSession in this class.
analyzer: Analyzer, | ||
streamingQueryManager: StreamingQueryManager, | ||
queryExecutionCreator: LogicalPlan => QueryExecution, | ||
val plannerCreator: () => SparkPlanner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding val planner
to SessionState
? So far, the interface of HiveSessionState
looks a little bit complex to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checked how CarbonData extends HiveSessionState. Basically, this PR will break what they completed a few months ago. : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, SessionState.planner is a method. So it will return a new SparkPlanner using the latest experimentalMethods.extraStrategies
every time. Changing it to a val
is a breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uh, I see
Test build #74079 has finished for PR 16826 at commit
|
Test build #74124 has finished for PR 16826 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking close to done!
activeSession.stop() | ||
activeSession = null | ||
} | ||
super.afterAll() | ||
} | ||
|
||
test("fork new session and inherit RuntimeConfig options") { | ||
val key = "spark-config-clone" | ||
activeSession.conf.set(key, "active") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shoud this be inside try {} ?
val spark = activeSession | ||
// Cannot use `import activeSession.implicits._` due to the compiler limitation. | ||
import spark.implicits._ | ||
|
||
activeSession |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should create temp view be inside try block?
original.setCurrentDatabase(db3) | ||
assert(clone.getCurrentDatabase == db2) | ||
} | ||
|
||
test("SPARK-19737: detect undefined functions without triggering relation resolution") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to be part of this PR?
* @param conf SQL-specific key-value configurations. | ||
* @param experimentalMethods The experimental methods. | ||
* @param functionRegistry Internal catalog for managing functions registered by the user. | ||
* @param catalog Internal catalog for managing table and database states. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment on difference from SessionCatalog
: HiveSessionCatalog
uses Hive client for interacting with metastore.
Test build #74134 has finished for PR 16826 at commit
|
retest this please |
retest this please. The running one will be failed because master was broken. |
Test build #74148 has finished for PR 16826 at commit
|
Test build #74165 has finished for PR 16826 at commit
|
Test build #74215 has finished for PR 16826 at commit
|
LGTM. Merging to master. |
…n listeners ## What changes were proposed in this pull request? Bugfix from [SPARK-19540.](#16826) Cloning SessionState does not clone query execution listeners, so cloned session is unable to listen to events on queries. ## How was this patch tested? - Unit test Author: Kunal Khamar <kkhamar@outlook.com> Closes #17379 from kunalkhamar/clone-bugfix.
What changes were proposed in this pull request?
Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState.
Subsequent changes to base session are not propagated to cloned session, clone is independent after creation.
If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables.
How was this patch tested?
Unit tests