Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34637] [SQL] Support DPP + AQE when the broadcast exchange can be reused #31756

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import org.apache.spark.util.ThreadUtils
*
* @param index the index of the join key in the list of keys from the build side
* @param buildKeys the join keys from the build side of the join used
* @param child the BroadcastExchange from the build side of the join
* @param child the BroadcastExchange or the AdaptiveSparkPlan with BroadcastQueryStageExec
* from the build side of the join
*/
case class SubqueryBroadcastExec(
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal

import org.apache.spark.SparkException
import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -91,10 +91,15 @@ case class AdaptiveSparkPlanExec(
DisableUnnecessaryBucketedScan
) ++ context.session.sessionState.queryStagePrepRules

@transient private val initialPlan = context.session.withActive {
applyPhysicalRules(
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
}

// A list of physical optimizer rules to be applied to a new stage before its execution. These
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
PlanAdaptiveDynamicPruningFilters(context.stageCache),
PlanAdaptiveDynamicPruningFilters(initialPlan),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need use the initialPlan not the inputPlan, because the inputPlan is not applied the queryStagePreparationRules(EnsureRequirements).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to pass this as the root plan. AdaptiveSparkPlanExec keeps changing when more and more query stages are completed. So it's better that PlanAdaptiveDynamicPruningFilters always look at the latest plan.

ReuseAdaptiveSubquery(context.subqueryCache),
CoalesceShufflePartitions(context.session),
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
Expand Down Expand Up @@ -129,11 +134,6 @@ case class AdaptiveSparkPlanExec(

@transient private val costEvaluator = SimpleCostEvaluator

@transient private val initialPlan = context.session.withActive {
applyPhysicalRules(
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
}

@volatile private var currentPhysicalPlan = initialPlan

private var isFinalPlan = false
Expand Down Expand Up @@ -310,6 +310,10 @@ case class AdaptiveSparkPlanExec(
rdd
}

override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
getFinalPhysicalPlan().doExecuteBroadcast()
}

protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")

override def generateTreeString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@

package org.apache.spark.sql.execution.adaptive

import scala.collection.concurrent.TrieMap

import org.apache.spark.sql.catalyst.expressions.{BindReferences, DynamicPruningExpression, Literal}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins.{HashedRelationBroadcastMode, HashJoin}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode, HashJoin}

/**
* A rule to insert dynamic pruning predicates in order to reuse the results of broadcast.
*/
case class PlanAdaptiveDynamicPruningFilters(
stageCache: TrieMap[SparkPlan, QueryStageExec]) extends Rule[SparkPlan] {
case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled) {
return plan
Expand All @@ -44,12 +42,23 @@ case class PlanAdaptiveDynamicPruningFilters(
val mode = HashedRelationBroadcastMode(packedKeys)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)
val existingStage = stageCache.get(exchange.canonicalized)
if (existingStage.nonEmpty && conf.exchangeReuseEnabled) {
val name = s"dynamicpruning#${exprId.id}"
val reuseQueryStage = existingStage.get.newReuseInstance(0, exchange.output)
val broadcastValues =
SubqueryBroadcastExec(name, index, buildKeys, reuseQueryStage)

val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
rootPlan.find {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
left.sameResult(exchange)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
right.sameResult(exchange)
case _ => false
}.isDefined

if (canReuseExchange) {
exchange.setLogicalLink(adaptivePlan.executedPlan.logicalLink.get)
val newAdaptivePlan = AdaptiveSparkPlanExec(
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
exchange, adaptivePlan.context, adaptivePlan.preprocessingRules, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: adaptivePlan.copy(inputPlan = exchange)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.


val broadcastValues = SubqueryBroadcastExec(
name, index, buildKeys, newAdaptivePlan)
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
} else {
DynamicPruningExpression(Literal.TrueLiteral)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ abstract class DynamicPartitionPruningSuiteBase
case _ => false
}.isDefined
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case a: AdaptiveSparkPlanExec =>
val broadcastQueryStage = collectFirst(a) {
case b: BroadcastQueryStageExec => b
}
val broadcastPlan = broadcastQueryStage.get.broadcast
val hasReuse = find(plan) {
case ReusedExchangeExec(_, e) => e eq broadcastPlan
case b: BroadcastExchangeLike => b eq broadcastPlan
case _ => false
}.isDefined
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case _ =>
fail(s"Invalid child node found in\n$s")
}
Expand Down Expand Up @@ -1463,6 +1474,37 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
}

test("SPARK-34637: test DPP side broadcast query stage is created firstly") {
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SPARK-34637: DPP ... remove the test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH view1 as (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

view1 -> v?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

| SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70 group by f.store_id
| )
|
| SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id = v2.store_id
""".stripMargin)

// A possible resulting query plan:
// BroadcastHashJoin
// +- HashAggregate
// +- ShuffleQueryStage
// +- Exchange
// +- HashAggregate
// +- Filter
// +- FileScan
// +- SubqueryBroadcast
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subquery has different symbols in the tree string format. please try to explain some plans locally and update this comment.

// +- AdaptiveSparkPlan
// +- BroadcastQueryStage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no other place to reuse this broadcast, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broadcast only be reused in the build side.

Copy link
Contributor

@tgravescs tgravescs Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is broadcast before the FileScan? what is being broadcast

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broadcast is in the DPP subquery of the FileScan. It will broadcast the results of the build side and then prune the dataset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make it more clear that this is the subquery in the file scan node, not the child of it?

// +- BroadcastExchange
//
// +- BroadcastQueryStage
// +- ReusedExchange

checkPartitionPruningPredicate(df, false, true)
checkAnswer(df, Row(15, 15) :: Nil)
}
}
}

class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase
Expand Down