Skip to content

Commit

Permalink
[SPARK-49460][SQL] Remove cleanupResource() from EmptyRelationExec
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Remove cleanupResource() from`EmptyRelationExec`

### Why are the changes needed?

This bug was introduced in apache#46830 : `cleanupResources` might be executed on the executor where `logical` is null.

After revisiting cleanupResources relevant code paths, I think `EmptyRelationExec` doesn't need to anything here.

- for driver side cleanup, we have [this code path](https://github.com/apache/spark/blob/0602020eb3b346a8c50ad32eeda4e6dabb70c584/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala) to cleanup each AQE query stage.
- for executor side cleanup, so far we only have SortMergeJoinExec which invoke cleanupResource during its execution, so upon the time when EmptyRelationExec is created, it's guaranteed necessary cleanup has been done.
-
After all, `EmptyRelationExec` is only a never-execute wrapper for materialized physical query stages, it should not be responsible for any cleanup invocation.

So I'm removing `cleanupResources` implementation from `EmptyRelationExec`.

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
New unit test.

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes apache#47931 from liuzqt/SPARK-49460.

Authored-by: Ziqi Liu <ziqi.liu@databricks.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
liuzqt authored and LuciferYang committed Aug 31, 2024
1 parent 7b54d9c commit 9cec3c4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -81,13 +80,4 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo
override def doCanonicalize(): SparkPlan = {
this.copy(logical = LocalRelation(logical.output).canonicalized)
}

override protected[sql] def cleanupResources(): Unit = {
logical.foreach {
case LogicalQueryStage(_, physical) =>
physical.cleanupResources()
case _ =>
}
super.cleanupResources()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,43 @@ class AdaptiveQueryExecSuite
}
}

test("SPARK-49460: NPE in EmptyRelationExec.cleanupResources") {
withTable("t1left", "t1right", "t1empty") {
spark.sql("create table t1left (a int, b int);")
spark.sql("insert into t1left values (1, 1), (2,2), (3,3);")
spark.sql("create table t1right (a int, b int);")
spark.sql("create table t1empty (a int, b int);")
spark.sql("insert into t1right values (2,20), (4, 40);")

spark.sql("""
|with leftT as (
| with erp as (
| select
| *
| from
| t1left
| join t1empty on t1left.a = t1empty.a
| join t1right on t1left.a = t1right.a
| )
| SELECT
| CASE
| WHEN COUNT(*) = 0 THEN 4
| ELSE NULL
| END AS a
| FROM
| erp
| HAVING
| COUNT(*) = 0
|)
|select
| /*+ MERGEJOIN(t1right) */
| *
|from
| leftT
| join t1right on leftT.a = t1right.a""".stripMargin).collect()
}
}

test("SPARK-35585: Support propagate empty relation through project/filter") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
Expand Down

0 comments on commit 9cec3c4

Please sign in to comment.