Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34637] [SQL] Support DPP + AQE when the broadcast exchange can be reused #31756

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import org.apache.spark.util.ThreadUtils
*
* @param index the index of the join key in the list of keys from the build side
* @param buildKeys the join keys from the build side of the join used
* @param child the BroadcastExchange from the build side of the join
* @param child the BroadcastExchange or the AdaptiveSparkPlan with BroadcastQueryStageExec
* from the build side of the join
*/
case class SubqueryBroadcastExec(
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal

import org.apache.spark.SparkException
import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -94,7 +94,7 @@ case class AdaptiveSparkPlanExec(
// A list of physical optimizer rules to be applied to a new stage before its execution. These
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
PlanAdaptiveDynamicPruningFilters(context.stageCache),
PlanAdaptiveDynamicPruningFilters(this),
ReuseAdaptiveSubquery(context.subqueryCache),
CoalesceShufflePartitions(context.session),
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
Expand Down Expand Up @@ -310,6 +310,10 @@ case class AdaptiveSparkPlanExec(
rdd
}

override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
getFinalPhysicalPlan().doExecuteBroadcast()
}

protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")

override def generateTreeString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@

package org.apache.spark.sql.execution.adaptive

import scala.collection.concurrent.TrieMap

import org.apache.spark.sql.catalyst.expressions.{BindReferences, DynamicPruningExpression, Literal}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins.{HashedRelationBroadcastMode, HashJoin}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode, HashJoin}

/**
* A rule to insert dynamic pruning predicates in order to reuse the results of broadcast.
*/
case class PlanAdaptiveDynamicPruningFilters(
stageCache: TrieMap[SparkPlan, QueryStageExec]) extends Rule[SparkPlan] {
rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {
def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled) {
return plan
Expand All @@ -44,12 +43,22 @@ case class PlanAdaptiveDynamicPruningFilters(
val mode = HashedRelationBroadcastMode(packedKeys)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)
val existingStage = stageCache.get(exchange.canonicalized)
if (existingStage.nonEmpty && conf.exchangeReuseEnabled) {
val name = s"dynamicpruning#${exprId.id}"
val reuseQueryStage = existingStage.get.newReuseInstance(0, exchange.output)
val broadcastValues =
SubqueryBroadcastExec(name, index, buildKeys, reuseQueryStage)

val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
find(rootPlan) {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
left.sameResult(exchange)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
right.sameResult(exchange)
case _ => false
}.isDefined

if (canReuseExchange) {
exchange.setLogicalLink(adaptivePlan.executedPlan.logicalLink.get)
val newAdaptivePlan = adaptivePlan.copy(inputPlan = exchange)

val broadcastValues = SubqueryBroadcastExec(
name, index, buildKeys, newAdaptivePlan)
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
} else {
DynamicPruningExpression(Literal.TrueLiteral)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ abstract class DynamicPartitionPruningSuiteBase
case _ => false
}.isDefined
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case a: AdaptiveSparkPlanExec =>
val broadcastQueryStage = collectFirst(a) {
case b: BroadcastQueryStageExec => b
}
val broadcastPlan = broadcastQueryStage.get.broadcast
val hasReuse = find(plan) {
case ReusedExchangeExec(_, e) => e eq broadcastPlan
case b: BroadcastExchangeLike => b eq broadcastPlan
case _ => false
}.isDefined
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case _ =>
fail(s"Invalid child node found in\n$s")
}
Expand Down Expand Up @@ -1463,6 +1474,37 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
}

test("SPARK-34637: DPP side broadcast query stage is created firstly") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
| SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70 group by f.store_id
| )
|
| SELECT * FROM v v1 join v v2 WHERE v1.store_id = v2.store_id
""".stripMargin)

// A possible resulting query plan:
// BroadcastHashJoin
// +- HashAggregate
// +- ShuffleQueryStage
// +- Exchange
// +- HashAggregate
// +- Filter
// +- FileScan [PartitionFilters: dynamicpruning#3385]
// +- SubqueryBroadcast dynamicpruning#3385
// +- AdaptiveSparkPlan
// +- BroadcastQueryStage
// +- BroadcastExchange
//
// +- BroadcastQueryStage
// +- ReusedExchange

checkPartitionPruningPredicate(df, false, true)
checkAnswer(df, Row(15, 15) :: Nil)
}
}
}

class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase
Expand Down