diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 706b8c0a8be81..27dafca1b80c0 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -440,7 +440,9 @@ def test_extended_hint_types(self): self.assertIsInstance(df.hint("broadcast", ["foo", "bar"]), type(df)) with io.StringIO() as buf, redirect_stdout(buf): - hinted_df.explain(True) + # the plan cache may hold a fully analyzed plan + with self.sql_conf({"spark.connect.session.planCache.enabled": False}): + hinted_df.explain(True) explain_output = buf.getvalue() self.assertGreaterEqual(explain_output.count("1.2345"), 1) self.assertGreaterEqual(explain_output.count("what"), 1) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 5b56b7079a897..8141ee4b5db84 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -450,14 +450,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio */ private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)( transform: proto.Relation => LogicalPlan): LogicalPlan = { - val planCacheEnabled = Option(session) - .forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true)) // We only cache plans that have a plan ID. - val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId + val planCacheEnabled = rel.hasCommon && rel.getCommon.hasPlanId && + Option(session) + .forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true)) def getPlanCache(rel: proto.Relation): Option[LogicalPlan] = planCache match { - case Some(cache) if planCacheEnabled && hasPlanId => + case Some(cache) if planCacheEnabled => Option(cache.getIfPresent(rel)) match { case Some(plan) => logDebug(s"Using cached plan for relation '$rel': $plan") @@ -466,21 +466,27 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio } case _ => None } - def putPlanCache(rel: proto.Relation, plan: LogicalPlan): Unit = + def putPlanCache(rel: proto.Relation, plan: LogicalPlan): LogicalPlan = planCache match { - case Some(cache) if planCacheEnabled && hasPlanId => - cache.put(rel, plan) - case _ => + case Some(cache) if cachePlan && planCacheEnabled => + val analyzedPlan = if (plan.analyzed) { + plan + } else { + val qe = session.sessionState.executePlan(plan) + if (qe.isLazyAnalysis) { + // The plan is intended to be lazily analyzed. + plan + } else { + // Make sure that the plan is fully analyzed before being cached. + qe.analyzed + } + } + cache.put(rel, analyzedPlan) + analyzedPlan + case _ => plan } - getPlanCache(rel) - .getOrElse({ - val plan = transform(rel) - if (cachePlan) { - putPlanCache(rel, plan) - } - plan - }) + getPlanCache(rel).getOrElse(putPlanCache(rel, transform(rel))) } // For testing. Expose the plan cache for testing purposes. diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala index 21f84291a2f07..7d53c26ccb918 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala @@ -314,6 +314,8 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession { case Some(expectedCachedRelations) => val cachedRelations = sessionHolder.getPlanCache.get.asMap().keySet().asScala assert(cachedRelations.size == expectedCachedRelations.size) + val cachedLogicalPlans = sessionHolder.getPlanCache.get.asMap().values().asScala + cachedLogicalPlans.foreach(plan => assert(plan.analyzed)) expectedCachedRelations.foreach(relation => assert(cachedRelations.contains(relation))) case None => assert(sessionHolder.getPlanCache.isEmpty) }