Skip to content

Commit

Permalink
[SPARK-29375][SQL] Exchange reuse across all subquery levels
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Oct 7, 2019
1 parent b103449 commit e6d4997
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,35 +107,22 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
if (!conf.exchangeReuseEnabled) {
return plan
}
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
val exchanges = mutable.HashMap[SparkPlan, Exchange]()

// Replace a Exchange duplicate with a ReusedExchange
def reuse: PartialFunction[Exchange, SparkPlan] = {
def reuse(plan: SparkPlan): SparkPlan = plan.transform {
case exchange: Exchange =>
val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
val samePlan = sameSchema.find { e =>
exchange.sameResult(e)
}
if (samePlan.isDefined) {
// Keep the output of this exchange, the following plans require that to resolve
// attributes.
ReusedExchangeExec(exchange.output, samePlan.get)
val newExchange = exchanges.getOrElseUpdate(exchange.canonicalized, exchange)
if (newExchange.ne(exchange)) {
ReusedExchangeExec(exchange.output, newExchange)
} else {
sameSchema += exchange
exchange
}
case other => other.transformExpressions {
case sub: ExecSubqueryExpression =>
sub.withNewPlan(reuse(sub.plan).asInstanceOf[BaseSubqueryExec])
}
}

plan transformUp {
case exchange: Exchange => reuse(exchange)
} transformAllExpressions {
// Lookup inside subqueries for duplicate exchanges
case in: InSubqueryExec =>
val newIn = in.plan.transformUp {
case exchange: Exchange => reuse(exchange)
}
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
}
reuse(plan)
}
}
31 changes: 31 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
import org.apache.spark.sql.execution.datasources.FileScanRDD
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

Expand Down Expand Up @@ -1373,6 +1374,36 @@ class SubquerySuite extends QueryTest with SharedSparkSession {
}
}

test("Exhange reuse across all subquery levels") {
Seq(true, false).foreach { reuse =>
withSQLConf(SQLConf.EXCHANGE_REUSE_ENABLED.key -> reuse.toString) {
val df = sql(
"""
|SELECT
| (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key),
| a.key
|FROM testData AS a
|JOIN testData AS b ON b.key = a.key
""".stripMargin)

val plan = df.queryExecution.executedPlan

val countExchange = plan.collectInPlanAndSubqueries({ case _: Exchange => 1 }).sum
val countReusedExchange =
plan.collectInPlanAndSubqueries({ case _: ReusedExchangeExec => 1 }).sum

if (reuse) {
assert(countExchange == 2, "Exchange reusing not working correctly")
assert(countReusedExchange == 3, "Exchange reusing not working correctly")
} else {
assert(countExchange == 5, "expect 4 Exchange when not reusing")
assert(countReusedExchange == 0,
"expect 0 ReusedExchangeExec when not reusing")
}
}
}
}

test("Scalar subquery name should start with scalar-subquery#") {
val df = sql("SELECT a FROM l WHERE a = (SELECT max(c) FROM r WHERE c = 1)".stripMargin)
var subqueryExecs: ArrayBuffer[SubqueryExec] = ArrayBuffer.empty
Expand Down

0 comments on commit e6d4997

Please sign in to comment.