Skip to content

Commit

Permalink
[SPARK-29375][SPARK-28940][SQL] Whole plan exchange and subquery reuse
Browse files Browse the repository at this point in the history
Change-Id: Icb229a5b2c775c8c796420134115e3886c1526fe
  • Loading branch information
peter-toth committed Jun 18, 2020
1 parent 9b79251 commit dbd8606
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,13 @@ object QueryPlan extends PredicateHelper {
* `Attribute`, and replace it with `BoundReference` will cause error.
*/
def normalizeExpressions[T <: Expression](e: T, input: AttributeSeq): T = {
type T2 = QueryPlan[_]
e.transformUp {
case s: PlanExpression[QueryPlan[_] @unchecked] =>
case s: PlanExpression[T2 @unchecked] =>
// Normalize the outer references in the subquery plan.
val normalizedPlan = s.plan.transformAllExpressions {
case OuterReference(r) => OuterReference(QueryPlan.normalizeExpressions(r, input))
}
}.canonicalized.asInstanceOf[T2]
s.withNewPlan(normalizedPlan)

case ar: AttributeReference =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.execution.reuse.WholePlanReuse
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
Expand Down Expand Up @@ -127,7 +128,7 @@ class QueryExecution(

protected def preparations: Seq[Rule[SparkPlan]] = {
QueryExecution.preparations(sparkSession,
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))), false)
}

private def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {
Expand Down Expand Up @@ -326,7 +327,8 @@ object QueryExecution {
*/
private[execution] def preparations(
sparkSession: SparkSession,
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,
subquery: Boolean): Seq[Rule[SparkPlan]] = {
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
adaptiveExecutionRule.toSeq ++
Expand All @@ -336,10 +338,12 @@ object QueryExecution {
EnsureRequirements(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf)
)
CollapseCodegenStages(sparkSession.sessionState.conf)) ++
(if (subquery) {
Nil
} else {
Seq(WholePlanReuse(sparkSession.sessionState.conf))
})
}

/**
Expand Down Expand Up @@ -370,7 +374,7 @@ object QueryExecution {
* Prepare the [[SparkPlan]] for execution.
*/
def prepareExecutedPlan(spark: SparkSession, plan: SparkPlan): SparkPlan = {
prepareForExecution(preparations(spark), plan)
prepareForExecution(preparations(spark, subquery = true), plan)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@

package org.apache.spark.sql.execution.exchange

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -95,46 +89,3 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
|""".stripMargin
}
}

/**
* Find out duplicated exchanges in the spark plan, then use the same exchange for all the
* references.
*/
case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
if (!conf.exchangeReuseEnabled) {
return plan
}
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()

// Replace a Exchange duplicate with a ReusedExchange
def reuse: PartialFunction[Exchange, SparkPlan] = {
case exchange: Exchange =>
val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
val samePlan = sameSchema.find { e =>
exchange.sameResult(e)
}
if (samePlan.isDefined) {
// Keep the output of this exchange, the following plans require that to resolve
// attributes.
ReusedExchangeExec(exchange.output, samePlan.get)
} else {
sameSchema += exchange
exchange
}
}

plan transformUp {
case exchange: Exchange => reuse(exchange)
} transformAllExpressions {
// Lookup inside subqueries for duplicate exchanges
case in: InSubqueryExec =>
val newIn = in.plan.transformUp {
case exchange: Exchange => reuse(exchange)
}
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.reuse

import scala.collection.mutable.Map

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

/**
* Find out duplicated exchanges and subqueries in the whole spark plan including subqueries, then
* use the same exhange or subquery for all the references.
*/
case class WholePlanReuse(conf: SQLConf) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {
// To avoid costly canonicalization of an exchange or a subquery:
// - we use its schema first to check if it can be replaced to a reused one at all
// - we insert it into the map of canonicalized plans only when at least 2 have the same
// schema
val exchanges = Map[StructType, (Exchange, Map[SparkPlan, Exchange])]()
val subqueries = Map[StructType, (BaseSubqueryExec, Map[SparkPlan, BaseSubqueryExec])]()

def reuse(plan: SparkPlan): SparkPlan = plan.transformUp {
case exchange: Exchange if conf.exchangeReuseEnabled =>
val (firstSameSchemaExchange, sameResultExchanges) =
exchanges.getOrElseUpdate(exchange.schema, exchange -> Map())
if (firstSameSchemaExchange.ne(exchange)) {
if (sameResultExchanges.isEmpty) {
sameResultExchanges +=
firstSameSchemaExchange.canonicalized -> firstSameSchemaExchange
}
val sameResultExchange =
sameResultExchanges.getOrElseUpdate(exchange.canonicalized, exchange)
if (sameResultExchange.ne(exchange)) {
ReusedExchangeExec(exchange.output, sameResultExchange)
} else {
exchange
}
} else {
exchange
}

case other => other.transformExpressionsUp {
case sub: ExecSubqueryExpression =>
val subquery = reuse(sub.plan).asInstanceOf[BaseSubqueryExec]
if (conf.subqueryReuseEnabled) {
val (firstSameSchemaSubquery, sameResultSubqueries) =
subqueries.getOrElseUpdate(subquery.schema, subquery -> Map())
if (firstSameSchemaSubquery.ne(subquery)) {
if (sameResultSubqueries.isEmpty) {
sameResultSubqueries +=
firstSameSchemaSubquery.canonicalized -> firstSameSchemaSubquery
}
val sameResultSubquery =
sameResultSubqueries.getOrElseUpdate(subquery.canonicalized, subquery)
if (sameResultSubquery.ne(subquery)) {
sub.withNewPlan(ReusedSubqueryExec(sameResultSubquery))
} else {
sub.withNewPlan(subquery)
}
} else {
sub.withNewPlan(subquery)
}
} else {
sub.withNewPlan(subquery)
}
}
}

reuse(plan)
} else {
plan
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.spark.sql.execution

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -197,30 +194,3 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
}
}
}

/**
* Find out duplicated subqueries in the spark plan, then use the same subquery result for all the
* references.
*/
case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
if (!conf.subqueryReuseEnabled) {
return plan
}
// Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.
val subqueries = mutable.HashMap[StructType, ArrayBuffer[BaseSubqueryExec]]()
plan transformAllExpressions {
case sub: ExecSubqueryExpression =>
val sameSchema =
subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[BaseSubqueryExec]())
val sameResult = sameSchema.find(_.sameResult(sub.plan))
if (sameResult.isDefined) {
sub.withNewPlan(ReusedSubqueryExec(sameResult.get))
} else {
sameSchema += sub.plan
sub
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1235,8 +1235,13 @@ abstract class DynamicPartitionPruningSuiteBase
val plan = df.queryExecution.executedPlan
val countSubqueryBroadcasts =
plan.collectWithSubqueries({ case _: SubqueryBroadcastExec => 1 }).sum
val countReusedSubqueryBroadcasts =
plan.collectWithSubqueries({
case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1
}).sum

assert(countSubqueryBroadcasts == 2)
assert(countSubqueryBroadcasts == 1)
assert(countReusedSubqueryBroadcasts == 1)
}
}

Expand Down Expand Up @@ -1280,6 +1285,53 @@ abstract class DynamicPartitionPruningSuiteBase
)
}
}

test("Subquery reuse across the whole plan") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
withTable("df1", "df2") {
spark.range(1000)
.select(col("id"), col("id").as("k"))
.write
.partitionBy("k")
.format(tableFormat)
.mode("overwrite")
.saveAsTable("df1")

spark.range(100)
.select(col("id"), col("id").as("k"))
.write
.partitionBy("k")
.format(tableFormat)
.mode("overwrite")
.saveAsTable("df2")

val df = sql(
"""
|SELECT df1.id, df2.k
|FROM df1 JOIN df2 ON df1.k = df2.k
|WHERE df2.id < (SELECT max(id) FROM df2 WHERE id <= 2)
|""".stripMargin)

checkPartitionPruningPredicate(df, true, false)

checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil)

val plan = df.queryExecution.executedPlan

val subqueryIds = plan.collectWithSubqueries { case s: SubqueryExec => s.id }
val reusedSubqueryIds = plan.collectWithSubqueries {
case rs: ReusedSubqueryExec => rs.child.id
}

assert(subqueryIds.size == 2, "Whole plan subquery reusing not working correctly")
assert(reusedSubqueryIds.size == 1, "Whole plan subquery reusing not working correctly")
assert(reusedSubqueryIds.forall(subqueryIds.contains(_)),
"ReusedSubqueryExec should reuse an existing subquery")
}
}
}
}

class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {
Expand Down
21 changes: 21 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1646,4 +1646,25 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkAnswer(df, df2)
checkAnswer(df, Nil)
}

test("Subquery reuse across the whole plan") {
val df = sql(
"""
|SELECT (SELECT avg(key) FROM testData), (SELECT (SELECT avg(key) FROM testData))
|FROM testData
|LIMIT 1
""".stripMargin)

val plan = df.queryExecution.executedPlan

val subqueryIds = plan.collectWithSubqueries { case s: SubqueryExec => s.id }
val reusedSubqueryIds = plan.collectWithSubqueries {
case rs: ReusedSubqueryExec => rs.child.id
}

assert(subqueryIds.size == 2, "Whole plan subquery reusing not working correctly")
assert(reusedSubqueryIds.size == 1, "Whole plan subquery reusing not working correctly")
assert(reusedSubqueryIds.forall(subqueryIds.contains(_)),
"ReusedSubqueryExec should reuse an existing subquery")
}
}
Loading

0 comments on commit dbd8606

Please sign in to comment.