Skip to content

Commit

Permalink
Impl
Browse files Browse the repository at this point in the history
  • Loading branch information
changgyoopark-db committed Jan 23, 2025
1 parent 1a49237 commit 43657b2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
4 changes: 3 additions & 1 deletion python/pyspark/sql/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ def test_extended_hint_types(self):
self.assertIsInstance(df.hint("broadcast", ["foo", "bar"]), type(df))

with io.StringIO() as buf, redirect_stdout(buf):
hinted_df.explain(True)
# the plan cache may hold a fully analyzed plan
with self.sql_conf({"spark.connect.session.planCache.enabled": False}):
hinted_df.explain(True)
explain_output = buf.getvalue()
self.assertGreaterEqual(explain_output.count("1.2345"), 1)
self.assertGreaterEqual(explain_output.count("what"), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
*/
private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)(
transform: proto.Relation => LogicalPlan): LogicalPlan = {
val planCacheEnabled = Option(session)
.forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true))
// We only cache plans that have a plan ID.
val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId
val planCacheEnabled = rel.hasCommon && rel.getCommon.hasPlanId &&
Option(session)
.forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true))

def getPlanCache(rel: proto.Relation): Option[LogicalPlan] =
planCache match {
case Some(cache) if planCacheEnabled && hasPlanId =>
case Some(cache) if planCacheEnabled =>
Option(cache.getIfPresent(rel)) match {
case Some(plan) =>
logDebug(s"Using cached plan for relation '$rel': $plan")
Expand All @@ -466,21 +466,27 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
}
case _ => None
}
def putPlanCache(rel: proto.Relation, plan: LogicalPlan): Unit =
def putPlanCache(rel: proto.Relation, plan: LogicalPlan): LogicalPlan =
planCache match {
case Some(cache) if planCacheEnabled && hasPlanId =>
cache.put(rel, plan)
case _ =>
case Some(cache) if cachePlan && planCacheEnabled =>
val analyzedPlan = if (plan.analyzed) {
plan
} else {
val qe = session.sessionState.executePlan(plan)
if (qe.isLazyAnalysis) {
// The plan is intended to be lazily analyzed.
plan
} else {
// Make sure that the plan is fully analyzed before being cached.
qe.analyzed
}
}
cache.put(rel, analyzedPlan)
analyzedPlan
case _ => plan
}

getPlanCache(rel)
.getOrElse({
val plan = transform(rel)
if (cachePlan) {
putPlanCache(rel, plan)
}
plan
})
getPlanCache(rel).getOrElse(putPlanCache(rel, transform(rel)))
}

// For testing. Expose the plan cache for testing purposes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
case Some(expectedCachedRelations) =>
val cachedRelations = sessionHolder.getPlanCache.get.asMap().keySet().asScala
assert(cachedRelations.size == expectedCachedRelations.size)
val cachedLogicalPlans = sessionHolder.getPlanCache.get.asMap().values().asScala
cachedLogicalPlans.foreach(plan => assert(plan.analyzed))
expectedCachedRelations.foreach(relation => assert(cachedRelations.contains(relation)))
case None => assert(sessionHolder.getPlanCache.isEmpty)
}
Expand Down

0 comments on commit 43657b2

Please sign in to comment.