diff --git a/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala index 031c3b1aa0d50..9331cea08255e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala @@ -55,7 +55,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) plan transformAllExpressions { case DynamicPruningSubquery( value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId) => - val qe = new QueryExecution(sparkSession, buildPlan) + val qe = new QueryExecution(sparkSession, buildPlan, subQuery = true) // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is // the first to be applied (apart from `InsertAdaptiveSparkPlan`). val canReuseExchange = reuseBroadcast && buildKeys.nonEmpty && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index ee57eaeccc951..261785593e993 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -104,6 +104,10 @@ case class ScalarSubquery( require(updated, s"$this has not finished") Literal.create(result, dataType).doGenCode(ctx, ev) } + + override lazy val canonicalized: ScalarSubquery = { + copy(plan = plan.canonicalized.asInstanceOf[BaseSubqueryExec], exprId = ExprId(0)) + } } /**