Skip to content

Commit

Permalink
resolve the comments and fix the failed test
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Apr 15, 2021
1 parent 3bc4baf commit 657c61b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,15 @@ case class AdaptiveSparkPlanExec(
DisableUnnecessaryBucketedScan
) ++ context.session.sessionState.queryStagePrepRules

@transient private val initialPlan = context.session.withActive {
applyPhysicalRules(
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
}

// A list of physical optimizer rules to be applied to a new stage before its execution. These
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
PlanAdaptiveDynamicPruningFilters(inputPlan),
PlanAdaptiveDynamicPruningFilters(initialPlan),
ReuseAdaptiveSubquery(context.subqueryCache),
CoalesceShufflePartitions(context.session),
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
Expand Down Expand Up @@ -129,11 +134,6 @@ case class AdaptiveSparkPlanExec(

@transient private val costEvaluator = SimpleCostEvaluator

@transient private val initialPlan = context.session.withActive {
applyPhysicalRules(
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
}

@volatile private var currentPhysicalPlan = initialPlan

private var isFinalPlan = false
Expand Down Expand Up @@ -311,8 +311,7 @@ case class AdaptiveSparkPlanExec(
}

override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
val broadcastPlan = getFinalPhysicalPlan()
broadcastPlan.doExecuteBroadcast()
getFinalPhysicalPlan().doExecuteBroadcast()
}

protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")
Expand Down Expand Up @@ -481,7 +480,7 @@ case class AdaptiveSparkPlanExec(
throw new IllegalStateException(
"Custom columnar rules cannot transform shuffle node to something else.")
}
ShuffleQueryStageExec(currentStageId, newShuffle, s.child.canonicalized)
ShuffleQueryStageExec(currentStageId, newShuffle, s.canonicalized)
case b: BroadcastExchangeLike =>
val newBroadcast = applyPhysicalRules(
b.withNewChildren(Seq(optimizedPlan)),
Expand All @@ -491,7 +490,7 @@ case class AdaptiveSparkPlanExec(
throw new IllegalStateException(
"Custom columnar rules cannot transform broadcast node to something else.")
}
BroadcastQueryStageExec(currentStageId, newBroadcast, b.child.canonicalized)
BroadcastQueryStageExec(currentStageId, newBroadcast, b.canonicalized)
}
currentStageId += 1
setLogicalLinkForNewQueryStage(queryStage, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati
/**
* A rule to insert dynamic pruning predicates in order to reuse the results of broadcast.
*/
case class PlanAdaptiveDynamicPruningFilters(
originalPlan: SparkPlan) extends Rule[SparkPlan] {
case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled) {
return plan
Expand All @@ -40,20 +39,20 @@ case class PlanAdaptiveDynamicPruningFilters(
adaptivePlan: AdaptiveSparkPlanExec), exprId, _)) =>
val packedKeys = BindReferences.bindReferences(
HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output)
val mode = HashedRelationBroadcastMode(packedKeys)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)

val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
originalPlan.find {
rootPlan.find {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
left.sameResult(adaptivePlan.executedPlan)
left.sameResult(exchange)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
right.sameResult(adaptivePlan.executedPlan)
right.sameResult(exchange)
case _ => false
}.isDefined

if(canReuseExchange) {
val mode = HashedRelationBroadcastMode(packedKeys)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)
if (canReuseExchange) {
exchange.setLogicalLink(adaptivePlan.executedPlan.logicalLink.get)
val newAdaptivePlan = AdaptiveSparkPlanExec(
exchange, adaptivePlan.context, adaptivePlan.preprocessingRules, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ abstract class DynamicPartitionPruningSuiteBase
case _ => false
}.isDefined
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case a: AdaptiveSparkPlanExec =>
val hasReuse = collect(a) {
case r: ReusedExchangeExec => r
}.nonEmpty
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case _ =>
fail(s"Invalid child node found in\n$s")
}
Expand Down

0 comments on commit 657c61b

Please sign in to comment.