Skip to content

Commit

Permalink
[SPARK-29375][SQL] Exchange reuse across all subquery levels
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Nov 18, 2019
1 parent 50f6d93 commit 3f9e240
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ import org.apache.spark.util.Utils
class QueryExecution(
val sparkSession: SparkSession,
val logical: LogicalPlan,
val tracker: QueryPlanningTracker = new QueryPlanningTracker) {
val tracker: QueryPlanningTracker = new QueryPlanningTracker,
val subQuery: Boolean = false) {

// TODO: Move the planner an optimizer into here from SessionState.
protected def planner = sparkSession.sessionState.planner
Expand Down Expand Up @@ -127,9 +128,9 @@ class QueryExecution(
EnsureRequirements(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))
CollapseCodegenStages(sparkSession.sessionState.conf)) ++
(if (subQuery) Nil else Seq(ReuseExchange(sparkSession.sessionState.conf))) :+
ReuseSubquery(sparkSession.sessionState.conf)

def simpleString: String = simpleString(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.exchange

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -107,35 +106,39 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
if (!conf.exchangeReuseEnabled) {
return plan
}
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()

// Replace a Exchange duplicate with a ReusedExchange
def reuse: PartialFunction[Exchange, SparkPlan] = {
case exchange: Exchange =>
val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
val samePlan = sameSchema.find { e =>
exchange.sameResult(e)
}
if (samePlan.isDefined) {
// Keep the output of this exchange, the following plans require that to resolve
// attributes.
ReusedExchangeExec(exchange.output, samePlan.get)
} else {
sameSchema += exchange
exchange
// To avoid costly canonicalization of an exchange:
// - we use its schema first to check if it can be replaced to a reused exchange at all
// - we insert an exchange into the map of canonicalized plans only when at least 2 exchange
// have the same schema
val exchanges = mutable.Map[StructType, (Exchange, mutable.Map[SparkPlan, Exchange])]()

def reuse(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case exchange: Exchange =>
val (firstSameSchemaExchange, sameResultExchanges) =
exchanges.getOrElseUpdate(exchange.schema, (exchange, mutable.Map()))
if (firstSameSchemaExchange.ne(exchange)) {
if (sameResultExchanges.isEmpty) {
sameResultExchanges +=
firstSameSchemaExchange.canonicalized -> firstSameSchemaExchange
}
val sameResultExchange =
sameResultExchanges.getOrElseUpdate(exchange.canonicalized, exchange)
if (sameResultExchange.ne(exchange)) {
ReusedExchangeExec(exchange.output, sameResultExchange)
} else {
exchange
}
} else {
exchange
}
case other => other.transformExpressions {
case sub: ExecSubqueryExpression =>
sub.withNewPlan(reuse(sub.plan).asInstanceOf[BaseSubqueryExec])
}
}
}

plan transformUp {
case exchange: Exchange => reuse(exchange)
} transformAllExpressions {
// Lookup inside subqueries for duplicate exchanges
case in: InSubqueryExec =>
val newIn = in.plan.transformUp {
case exchange: Exchange => reuse(exchange)
}
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
}
reuse(plan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
plan.transformAllExpressions {
case subquery: expressions.ScalarSubquery =>
val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
val executedPlan =
new QueryExecution(sparkSession, subquery.plan, subQuery = true).executedPlan
ScalarSubquery(
SubqueryExec(s"scalar-subquery#${subquery.exprId.id}", executedPlan),
subquery.exprId)
Expand All @@ -192,7 +193,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
}
)
}
val executedPlan = new QueryExecution(sparkSession, query).executedPlan
val executedPlan = new QueryExecution(sparkSession, query, subQuery = true).executedPlan
InSubqueryExec(expr, SubqueryExec(s"subquery#${exprId.id}", executedPlan), exprId)
}
}
Expand Down
57 changes: 57 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
import org.apache.spark.sql.execution.datasources.FileScanRDD
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

Expand Down Expand Up @@ -1389,6 +1390,62 @@ class SubquerySuite extends QueryTest with SharedSparkSession {
}
}

test("Exchange reuse across all subquery levels") {
Seq(true, false).foreach { reuse =>
withSQLConf(SQLConf.EXCHANGE_REUSE_ENABLED.key -> reuse.toString) {
val df = sql(
"""
|SELECT
| (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key),
| a.key
|FROM testData AS a
|JOIN testData AS b ON b.key = a.key
""".stripMargin)

val plan = df.queryExecution.executedPlan

val exchangeIds = plan.collectInPlanAndSubqueries { case e: Exchange => e.id }
val reusedExchangeIds = plan.collectInPlanAndSubqueries {
case re: ReusedExchangeExec => re.child.id
}

if (reuse) {
assert(exchangeIds.size == 2, "Exchange reusing not working correctly")
assert(reusedExchangeIds.size == 3, "Exchange reusing not working correctly")
assert(reusedExchangeIds.forall(exchangeIds.contains(_)),
"ReusedExchangeExec should reuse an existing exchange")
} else {
assert(exchangeIds.size == 5, "expect 5 Exchange when not reusing")
assert(reusedExchangeIds.size == 0, "expect 0 ReusedExchangeExec when not reusing")
}

val df2 = sql(
"""
SELECT
(SELECT min(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key),
(SELECT max(a.key) FROM testData AS a JOIN testData2 AS b ON b.a = a.key)
""".stripMargin)

val plan2 = df2.queryExecution.executedPlan

val exchangeIds2 = plan2.collectInPlanAndSubqueries { case e: Exchange => e.id }
val reusedExchangeIds2 = plan2.collectInPlanAndSubqueries {
case re: ReusedExchangeExec => re.child.id
}

if (reuse) {
assert(exchangeIds2.size == 4, "Exchange reusing not working correctly")
assert(reusedExchangeIds2.size == 2, "Exchange reusing not working correctly")
assert(reusedExchangeIds2.forall(exchangeIds2.contains(_)),
"ReusedExchangeExec should reuse an existing exchange")
} else {
assert(exchangeIds2.size == 6, "expect 6 Exchange when not reusing")
assert(reusedExchangeIds2.size == 0, "expect 0 ReusedExchangeExec when not reusing")
}
}
}
}

test("Scalar subquery name should start with scalar-subquery#") {
val df = sql("SELECT a FROM l WHERE a = (SELECT max(c) FROM r WHERE c = 1)".stripMargin)
var subqueryExecs: ArrayBuffer[SubqueryExec] = ArrayBuffer.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ class PlannerSuite extends SharedSparkSession {
Inner,
None,
shuffle,
shuffle)
shuffle.copy())

val outputPlan = ReuseExchange(spark.sessionState.conf).apply(inputPlan)
if (outputPlan.collect { case e: ReusedExchangeExec => true }.size != 1) {
Expand Down

0 comments on commit 3f9e240

Please sign in to comment.