From 9cec3c4f7c1b467023f0eefff69e8b7c5105417d Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Sat, 31 Aug 2024 10:05:18 +0800 Subject: [PATCH] [SPARK-49460][SQL] Remove `cleanupResource()` from EmptyRelationExec ### What changes were proposed in this pull request? Remove cleanupResource() from`EmptyRelationExec` ### Why are the changes needed? This bug was introduced in https://github.com/apache/spark/pull/46830 : `cleanupResources` might be executed on the executor where `logical` is null. After revisiting cleanupResources relevant code paths, I think `EmptyRelationExec` doesn't need to anything here. - for driver side cleanup, we have [this code path](https://github.com/apache/spark/blob/0602020eb3b346a8c50ad32eeda4e6dabb70c584/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala) to cleanup each AQE query stage. - for executor side cleanup, so far we only have SortMergeJoinExec which invoke cleanupResource during its execution, so upon the time when EmptyRelationExec is created, it's guaranteed necessary cleanup has been done. - After all, `EmptyRelationExec` is only a never-execute wrapper for materialized physical query stages, it should not be responsible for any cleanup invocation. So I'm removing `cleanupResources` implementation from `EmptyRelationExec`. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? NO Closes #47931 from liuzqt/SPARK-49460. Authored-by: Ziqi Liu Signed-off-by: yangjie01 --- .../sql/execution/EmptyRelationExec.scala | 10 ----- .../adaptive/AdaptiveQueryExecSuite.scala | 37 +++++++++++++++++++ 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index 085c0b22524c9..8a544de7567e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.adaptive.LogicalQueryStage import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -81,13 +80,4 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo override def doCanonicalize(): SparkPlan = { this.copy(logical = LocalRelation(logical.output).canonicalized) } - - override protected[sql] def cleanupResources(): Unit = { - logical.foreach { - case LogicalQueryStage(_, physical) => - physical.cleanupResources() - case _ => - } - super.cleanupResources() - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index fc54e7ecd46df..938a96a86b015 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1608,6 +1608,43 @@ class AdaptiveQueryExecSuite } } + test("SPARK-49460: NPE in EmptyRelationExec.cleanupResources") { + withTable("t1left", "t1right", "t1empty") { + spark.sql("create table t1left (a int, b int);") + spark.sql("insert into t1left values (1, 1), (2,2), (3,3);") + spark.sql("create table t1right (a int, b int);") + spark.sql("create table t1empty (a int, b int);") + spark.sql("insert into t1right values (2,20), (4, 40);") + + spark.sql(""" + |with leftT as ( + | with erp as ( + | select + | * + | from + | t1left + | join t1empty on t1left.a = t1empty.a + | join t1right on t1left.a = t1right.a + | ) + | SELECT + | CASE + | WHEN COUNT(*) = 0 THEN 4 + | ELSE NULL + | END AS a + | FROM + | erp + | HAVING + | COUNT(*) = 0 + |) + |select + | /*+ MERGEJOIN(t1right) */ + | * + |from + | leftT + | join t1right on leftT.a = t1right.a""".stripMargin).collect() + } + } + test("SPARK-35585: Support propagate empty relation through project/filter") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {