Skip to content

Commit

Permalink
resolve the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed May 11, 2021
1 parent 4ccd4b8 commit 701f1c3
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,10 @@ case class AdaptiveSparkPlanExec(
DisableUnnecessaryBucketedScan
) ++ context.session.sessionState.queryStagePrepRules

@transient private val initialPlan = context.session.withActive {
applyPhysicalRules(
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
}

// A list of physical optimizer rules to be applied to a new stage before its execution. These
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
PlanAdaptiveDynamicPruningFilters(initialPlan),
PlanAdaptiveDynamicPruningFilters(this),
ReuseAdaptiveSubquery(context.subqueryCache),
CoalesceShufflePartitions(context.session),
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
Expand Down Expand Up @@ -134,6 +129,11 @@ case class AdaptiveSparkPlanExec(

@transient private val costEvaluator = SimpleCostEvaluator

@transient private val initialPlan = context.session.withActive {
applyPhysicalRules(
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
}

@volatile private var currentPhysicalPlan = initialPlan

private var isFinalPlan = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati
/**
* A rule to insert dynamic pruning predicates in order to reuse the results of broadcast.
*/
case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[SparkPlan] {
case class PlanAdaptiveDynamicPruningFilters(
rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {
def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled) {
return plan
Expand All @@ -44,7 +45,7 @@ case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[S
val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)

val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
rootPlan.find {
find(rootPlan) {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
left.sameResult(exchange)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1492,12 +1492,11 @@ abstract class DynamicPartitionPruningSuiteBase
// +- Exchange
// +- HashAggregate
// +- Filter
// +- FileScan [PartitionFilters: [isnotnull(store_id#3367),
// dynamicpruningexpression(store_id#3367 IN dynamicpruning#3385)]]
// +- FileScan [PartitionFilters: dynamicpruning#3385]
// +- SubqueryBroadcast dynamicpruning#3385
// +- AdaptiveSparkPlan
// +- BroadcastQueryStage
// +- BroadcastExchange
// +- AdaptiveSparkPlan
// +- BroadcastQueryStage
// +- BroadcastExchange
//
// +- BroadcastQueryStage
// +- ReusedExchange
Expand Down

0 comments on commit 701f1c3

Please sign in to comment.