Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-774] Fallback broadcast exchange for DPP to reuse (#787)
Browse files Browse the repository at this point in the history
* Correct the assert statement in testing broadcast exchange reuse across subqueries

* fallback on reused broadcast exchange

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix reused columnar broadcast xchg

* Add guard logic

Co-authored-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
PHILO-HE and zhouyuan authored Mar 23, 2022
1 parent 0cee28a commit 69363f3
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,15 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
right)
case plan: BroadcastQueryStageExec =>
logDebug(
s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan.getClass}.")
plan
s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan}.")
plan.plan match {
case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor) =>
val newBroadcast = BroadcastExchangeExec(
originalBroadcastPlan.mode,
DataToArrowColumnarExec(plan.plan, 1))
SparkShimLoader.getSparkShims.newBroadcastQueryStageExec(plan.id, newBroadcast)
case other => plan
}
case plan: BroadcastExchangeExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down Expand Up @@ -368,6 +375,17 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] {
var isSupportAdaptive: Boolean = true

def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match {
// To get ColumnarBroadcastExchangeExec back from the fallback that for DPP reuse.
case RowToColumnarExec(broadcastQueryStageExec: BroadcastQueryStageExec)
if (broadcastQueryStageExec.plan match {
case BroadcastExchangeExec(_, _: DataToArrowColumnarExec) => true
case _ => false
}) =>
logDebug(s"Due to a fallback of BHJ inserted into plan." +
s" See above override in BroadcastQueryStageExec")
val localBroadcastXchg = broadcastQueryStageExec.plan.asInstanceOf[BroadcastExchangeExec]
val dataToArrowColumnar = localBroadcastXchg.child.asInstanceOf[DataToArrowColumnarExec]
ColumnarBroadcastExchangeExec(localBroadcastXchg.mode, dataToArrowColumnar)
case plan: RowToColumnarExec =>
val child = replaceWithColumnarPlan(plan.child)
if (columnarConf.enableArrowRowToColumnar) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeAdaptor, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, REPARTITION, REPARTITION_WITH_NUM, ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.exchange.{Exchange, REPARTITION, REPARTITION_WITH_NUM, ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
Expand Down Expand Up @@ -537,7 +537,7 @@ class AdaptiveQueryExecSuite
// Even with local shuffle reader, the query stage reuse can also work.
val ex = findReusedExchange(adaptivePlan)
assert(ex.nonEmpty)
assert(ex.head.child.isInstanceOf[BroadcastExchangeExec])
assert(ex.head.child.isInstanceOf[ColumnarBroadcastExchangeAdaptor])
val sub = findReusedSubquery(adaptivePlan)
assert(sub.isEmpty)
}
Expand Down

0 comments on commit 69363f3

Please sign in to comment.