From 3bc4baf59335f63f003adf74c87ddb2992e515cc Mon Sep 17 00:00:00 2001 From: jiake Date: Thu, 15 Apr 2021 16:48:35 +0800 Subject: [PATCH 1/6] Support DPP + AQE when find the broadcast exchange can reuse --- .../adaptive/AdaptiveSparkPlanExec.scala | 13 ++++--- .../PlanAdaptiveDynamicPruningFilters.scala | 36 ++++++++++++------- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index fba32a4dd1dab..f116d291c7670 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import scala.concurrent.ExecutionContext import scala.util.control.NonFatal -import org.apache.spark.SparkException +import org.apache.spark.{broadcast, SparkException} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -94,7 +94,7 @@ case class AdaptiveSparkPlanExec( // A list of physical optimizer rules to be applied to a new stage before its execution. These // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( - PlanAdaptiveDynamicPruningFilters(context.stageCache), + PlanAdaptiveDynamicPruningFilters(inputPlan), ReuseAdaptiveSubquery(context.subqueryCache), CoalesceShufflePartitions(context.session), // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs' @@ -310,6 +310,11 @@ case class AdaptiveSparkPlanExec( rdd } + override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { + val broadcastPlan = getFinalPhysicalPlan() + broadcastPlan.doExecuteBroadcast() + } + protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan") override def generateTreeString( @@ -476,7 +481,7 @@ case class AdaptiveSparkPlanExec( throw new IllegalStateException( "Custom columnar rules cannot transform shuffle node to something else.") } - ShuffleQueryStageExec(currentStageId, newShuffle, s.canonicalized) + ShuffleQueryStageExec(currentStageId, newShuffle, s.child.canonicalized) case b: BroadcastExchangeLike => val newBroadcast = applyPhysicalRules( b.withNewChildren(Seq(optimizedPlan)), @@ -486,7 +491,7 @@ case class AdaptiveSparkPlanExec( throw new IllegalStateException( "Custom columnar rules cannot transform broadcast node to something else.") } - BroadcastQueryStageExec(currentStageId, newBroadcast, b.canonicalized) + BroadcastQueryStageExec(currentStageId, newBroadcast, b.child.canonicalized) } currentStageId += 1 setLogicalLinkForNewQueryStage(queryStage, e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala index 8be348a5f0d14..8cb69cb73d9cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala @@ -17,19 +17,18 @@ package org.apache.spark.sql.execution.adaptive -import scala.collection.concurrent.TrieMap - import org.apache.spark.sql.catalyst.expressions.{BindReferences, DynamicPruningExpression, Literal} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec -import org.apache.spark.sql.execution.joins.{HashedRelationBroadcastMode, HashJoin} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode, HashJoin} /** * A rule to insert dynamic pruning predicates in order to reuse the results of broadcast. */ case class PlanAdaptiveDynamicPruningFilters( - stageCache: TrieMap[SparkPlan, QueryStageExec]) extends Rule[SparkPlan] { + originalPlan: SparkPlan) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { if (!conf.dynamicPartitionPruningEnabled) { return plan @@ -41,15 +40,26 @@ case class PlanAdaptiveDynamicPruningFilters( adaptivePlan: AdaptiveSparkPlanExec), exprId, _)) => val packedKeys = BindReferences.bindReferences( HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output) - val mode = HashedRelationBroadcastMode(packedKeys) - // plan a broadcast exchange of the build side of the join - val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan) - val existingStage = stageCache.get(exchange.canonicalized) - if (existingStage.nonEmpty && conf.exchangeReuseEnabled) { - val name = s"dynamicpruning#${exprId.id}" - val reuseQueryStage = existingStage.get.newReuseInstance(0, exchange.output) - val broadcastValues = - SubqueryBroadcastExec(name, index, buildKeys, reuseQueryStage) + + val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty && + originalPlan.find { + case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) => + left.sameResult(adaptivePlan.executedPlan) + case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) => + right.sameResult(adaptivePlan.executedPlan) + case _ => false + }.isDefined + + if(canReuseExchange) { + val mode = HashedRelationBroadcastMode(packedKeys) + // plan a broadcast exchange of the build side of the join + val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan) + exchange.setLogicalLink(adaptivePlan.executedPlan.logicalLink.get) + val newAdaptivePlan = AdaptiveSparkPlanExec( + exchange, adaptivePlan.context, adaptivePlan.preprocessingRules, true) + + val broadcastValues = SubqueryBroadcastExec( + name, index, buildKeys, newAdaptivePlan) DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) } else { DynamicPruningExpression(Literal.TrueLiteral) From 657c61b7c0e08d085cc505ce0a699fa45fe3e078 Mon Sep 17 00:00:00 2001 From: jiake Date: Thu, 15 Apr 2021 20:20:14 +0800 Subject: [PATCH 2/6] resolve the comments and fix the failed test --- .../adaptive/AdaptiveSparkPlanExec.scala | 19 +++++++++---------- .../PlanAdaptiveDynamicPruningFilters.scala | 17 ++++++++--------- .../sql/DynamicPartitionPruningSuite.scala | 5 +++++ 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index f116d291c7670..94d4ee1353956 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -91,10 +91,15 @@ case class AdaptiveSparkPlanExec( DisableUnnecessaryBucketedScan ) ++ context.session.sessionState.queryStagePrepRules + @transient private val initialPlan = context.session.withActive { + applyPhysicalRules( + inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations"))) + } + // A list of physical optimizer rules to be applied to a new stage before its execution. These // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( - PlanAdaptiveDynamicPruningFilters(inputPlan), + PlanAdaptiveDynamicPruningFilters(initialPlan), ReuseAdaptiveSubquery(context.subqueryCache), CoalesceShufflePartitions(context.session), // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs' @@ -129,11 +134,6 @@ case class AdaptiveSparkPlanExec( @transient private val costEvaluator = SimpleCostEvaluator - @transient private val initialPlan = context.session.withActive { - applyPhysicalRules( - inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations"))) - } - @volatile private var currentPhysicalPlan = initialPlan private var isFinalPlan = false @@ -311,8 +311,7 @@ case class AdaptiveSparkPlanExec( } override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - val broadcastPlan = getFinalPhysicalPlan() - broadcastPlan.doExecuteBroadcast() + getFinalPhysicalPlan().doExecuteBroadcast() } protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan") @@ -481,7 +480,7 @@ case class AdaptiveSparkPlanExec( throw new IllegalStateException( "Custom columnar rules cannot transform shuffle node to something else.") } - ShuffleQueryStageExec(currentStageId, newShuffle, s.child.canonicalized) + ShuffleQueryStageExec(currentStageId, newShuffle, s.canonicalized) case b: BroadcastExchangeLike => val newBroadcast = applyPhysicalRules( b.withNewChildren(Seq(optimizedPlan)), @@ -491,7 +490,7 @@ case class AdaptiveSparkPlanExec( throw new IllegalStateException( "Custom columnar rules cannot transform broadcast node to something else.") } - BroadcastQueryStageExec(currentStageId, newBroadcast, b.child.canonicalized) + BroadcastQueryStageExec(currentStageId, newBroadcast, b.canonicalized) } currentStageId += 1 setLogicalLinkForNewQueryStage(queryStage, e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala index 8cb69cb73d9cb..f293e8493e216 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala @@ -27,8 +27,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati /** * A rule to insert dynamic pruning predicates in order to reuse the results of broadcast. */ -case class PlanAdaptiveDynamicPruningFilters( - originalPlan: SparkPlan) extends Rule[SparkPlan] { +case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { if (!conf.dynamicPartitionPruningEnabled) { return plan @@ -40,20 +39,20 @@ case class PlanAdaptiveDynamicPruningFilters( adaptivePlan: AdaptiveSparkPlanExec), exprId, _)) => val packedKeys = BindReferences.bindReferences( HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output) + val mode = HashedRelationBroadcastMode(packedKeys) + // plan a broadcast exchange of the build side of the join + val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan) val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty && - originalPlan.find { + rootPlan.find { case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) => - left.sameResult(adaptivePlan.executedPlan) + left.sameResult(exchange) case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) => - right.sameResult(adaptivePlan.executedPlan) + right.sameResult(exchange) case _ => false }.isDefined - if(canReuseExchange) { - val mode = HashedRelationBroadcastMode(packedKeys) - // plan a broadcast exchange of the build side of the join - val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan) + if (canReuseExchange) { exchange.setLogicalLink(adaptivePlan.executedPlan.logicalLink.get) val newAdaptivePlan = AdaptiveSparkPlanExec( exchange, adaptivePlan.context, adaptivePlan.preprocessingRules, true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 736432222b0c1..b27ca99bec047 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -197,6 +197,11 @@ abstract class DynamicPartitionPruningSuiteBase case _ => false }.isDefined assert(hasReuse, s"$s\nshould have been reused in\n$plan") + case a: AdaptiveSparkPlanExec => + val hasReuse = collect(a) { + case r: ReusedExchangeExec => r + }.nonEmpty + assert(hasReuse, s"$s\nshould have been reused in\n$plan") case _ => fail(s"Invalid child node found in\n$s") } From c7b3735e12e04b68abd33adbace0653a09a7231a Mon Sep 17 00:00:00 2001 From: jiake Date: Fri, 16 Apr 2021 13:40:31 +0800 Subject: [PATCH 3/6] add the new test to test DPP side broadcast query stage is created firstly --- .../sql/execution/SubqueryBroadcastExec.scala | 3 +- .../sql/DynamicPartitionPruningSuite.scala | 43 +++++++++++++++++-- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala index 47cb70dde86a8..a8c7b9bfbdeea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala @@ -36,7 +36,8 @@ import org.apache.spark.util.ThreadUtils * * @param index the index of the join key in the list of keys from the build side * @param buildKeys the join keys from the build side of the join used - * @param child the BroadcastExchange from the build side of the join + * @param child the BroadcastExchange or the AdaptiveSparkPlan with BroadcastQueryStageExec + * from the build side of the join */ case class SubqueryBroadcastExec( name: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index b27ca99bec047..f8b24f855080c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -198,9 +198,15 @@ abstract class DynamicPartitionPruningSuiteBase }.isDefined assert(hasReuse, s"$s\nshould have been reused in\n$plan") case a: AdaptiveSparkPlanExec => - val hasReuse = collect(a) { - case r: ReusedExchangeExec => r - }.nonEmpty + val broadcastQueryStage = collectFirst(a) { + case b: BroadcastQueryStageExec => b + } + val broadcastPlan = broadcastQueryStage.get.broadcast + val hasReuse = find(plan) { + case ReusedExchangeExec(_, e) => e eq broadcastPlan + case b: BroadcastExchangeLike => b eq broadcastPlan + case _ => false + }.isDefined assert(hasReuse, s"$s\nshould have been reused in\n$plan") case _ => fail(s"Invalid child node found in\n$s") @@ -1468,6 +1474,37 @@ abstract class DynamicPartitionPruningSuiteBase } } } + + test("SPARK-34637: test DPP side broadcast query stage is created firstly") { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + val df = sql( + """ WITH view1 as ( + | SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70 group by f.store_id + | ) + | + | SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id = v2.store_id + """.stripMargin) + + // A possible resulting query plan: + // BroadcastHashJoin + // +- HashAggregate + // +- ShuffleQueryStage + // +- Exchange + // +- HashAggregate + // +- Filter + // +- FileScan + // +- SubqueryBroadcast + // +- AdaptiveSparkPlan + // +- BroadcastQueryStage + // +- BroadcastExchange + // + // +- BroadcastQueryStage + // +- ReusedExchange + + checkPartitionPruningPredicate(df, false, true) + checkAnswer(df, Row(15, 15) :: Nil) + } + } } class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase From 6b07c847399250def438e0e1b7bbb5b5effa77af Mon Sep 17 00:00:00 2001 From: jiake Date: Thu, 6 May 2021 16:41:37 +0800 Subject: [PATCH 4/6] update subquery symbols --- .../org/apache/spark/sql/DynamicPartitionPruningSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index f8b24f855080c..9480e3b5388d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1493,7 +1493,7 @@ abstract class DynamicPartitionPruningSuiteBase // +- HashAggregate // +- Filter // +- FileScan - // +- SubqueryBroadcast + // Dynamicpruning Subquery // +- AdaptiveSparkPlan // +- BroadcastQueryStage // +- BroadcastExchange From 4ccd4b8b1d2604f05a4564303d0b8aa65ea5dfa3 Mon Sep 17 00:00:00 2001 From: jiake Date: Tue, 11 May 2021 15:30:17 +0800 Subject: [PATCH 5/6] resolve the comments --- .../PlanAdaptiveDynamicPruningFilters.scala | 3 +-- .../spark/sql/DynamicPartitionPruningSuite.scala | 13 +++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala index f293e8493e216..e42d53a232557 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala @@ -54,8 +54,7 @@ case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[S if (canReuseExchange) { exchange.setLogicalLink(adaptivePlan.executedPlan.logicalLink.get) - val newAdaptivePlan = AdaptiveSparkPlanExec( - exchange, adaptivePlan.context, adaptivePlan.preprocessingRules, true) + val newAdaptivePlan = adaptivePlan.copy(inputPlan = exchange) val broadcastValues = SubqueryBroadcastExec( name, index, buildKeys, newAdaptivePlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 9480e3b5388d8..b69a683b5e75e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1475,14 +1475,14 @@ abstract class DynamicPartitionPruningSuiteBase } } - test("SPARK-34637: test DPP side broadcast query stage is created firstly") { + test("SPARK-34637: DPP side broadcast query stage is created firstly") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( - """ WITH view1 as ( + """ WITH v as ( | SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70 group by f.store_id | ) | - | SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id = v2.store_id + | SELECT * FROM v v1 join v v2 WHERE v1.store_id = v2.store_id """.stripMargin) // A possible resulting query plan: @@ -1492,9 +1492,10 @@ abstract class DynamicPartitionPruningSuiteBase // +- Exchange // +- HashAggregate // +- Filter - // +- FileScan - // Dynamicpruning Subquery - // +- AdaptiveSparkPlan + // +- FileScan [PartitionFilters: [isnotnull(store_id#3367), + // dynamicpruningexpression(store_id#3367 IN dynamicpruning#3385)]] + // +- SubqueryBroadcast dynamicpruning#3385 + // +- AdaptiveSparkPlan // +- BroadcastQueryStage // +- BroadcastExchange // From 701f1c30bbca2bfdffd121cc9c0d6b4a5250f823 Mon Sep 17 00:00:00 2001 From: jiake Date: Tue, 11 May 2021 17:43:19 +0800 Subject: [PATCH 6/6] resolve the comments --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 12 ++++++------ .../adaptive/PlanAdaptiveDynamicPruningFilters.scala | 5 +++-- .../spark/sql/DynamicPartitionPruningSuite.scala | 9 ++++----- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 94d4ee1353956..256aacb9049d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -91,15 +91,10 @@ case class AdaptiveSparkPlanExec( DisableUnnecessaryBucketedScan ) ++ context.session.sessionState.queryStagePrepRules - @transient private val initialPlan = context.session.withActive { - applyPhysicalRules( - inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations"))) - } - // A list of physical optimizer rules to be applied to a new stage before its execution. These // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( - PlanAdaptiveDynamicPruningFilters(initialPlan), + PlanAdaptiveDynamicPruningFilters(this), ReuseAdaptiveSubquery(context.subqueryCache), CoalesceShufflePartitions(context.session), // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs' @@ -134,6 +129,11 @@ case class AdaptiveSparkPlanExec( @transient private val costEvaluator = SimpleCostEvaluator + @transient private val initialPlan = context.session.withActive { + applyPhysicalRules( + inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations"))) + } + @volatile private var currentPhysicalPlan = initialPlan private var isFinalPlan = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala index e42d53a232557..3ef18108d2b30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala @@ -27,7 +27,8 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati /** * A rule to insert dynamic pruning predicates in order to reuse the results of broadcast. */ -case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[SparkPlan] { +case class PlanAdaptiveDynamicPruningFilters( + rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper { def apply(plan: SparkPlan): SparkPlan = { if (!conf.dynamicPartitionPruningEnabled) { return plan @@ -44,7 +45,7 @@ case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[S val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan) val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty && - rootPlan.find { + find(rootPlan) { case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) => left.sameResult(exchange) case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index b69a683b5e75e..3b88bd58d925f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1492,12 +1492,11 @@ abstract class DynamicPartitionPruningSuiteBase // +- Exchange // +- HashAggregate // +- Filter - // +- FileScan [PartitionFilters: [isnotnull(store_id#3367), - // dynamicpruningexpression(store_id#3367 IN dynamicpruning#3385)]] + // +- FileScan [PartitionFilters: dynamicpruning#3385] // +- SubqueryBroadcast dynamicpruning#3385 - // +- AdaptiveSparkPlan - // +- BroadcastQueryStage - // +- BroadcastExchange + // +- AdaptiveSparkPlan + // +- BroadcastQueryStage + // +- BroadcastExchange // // +- BroadcastQueryStage // +- ReusedExchange