Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse #28885

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util

import scala.collection.mutable.{ArrayBuffer, Map}

import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.types.StructType

/**
* Map of canonicalized plans that can be used to find reuse possibilities.
*
* To avoid costly canonicalization of a plan:
* - we use its schema first to check if it can be replaced to a reused one at all
* - we insert it into the map of canonicalized plans only when at least 2 have the same schema
*
* @tparam T the type of the node we want to reuse
* @tparam T2 the type of the canonicalized node
*/
class ReuseMap[T <: T2, T2 <: QueryPlan[T2]] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might look like T2 is not required, but it is "silently" used at sameSchema.find(plan.sameResult) and Scala would complain without it.

private val map = Map[StructType, ArrayBuffer[T]]()

/**
* Find a matching plan with the same canonicalized form in the map or add the new plan to the
* map otherwise.
*
* @param plan the input plan
* @return the matching plan or the input plan
*/
private def lookupOrElseAdd(plan: T): T = {
val sameSchema = map.getOrElseUpdate(plan.schema, ArrayBuffer())
val samePlan = sameSchema.find(plan.sameResult)
if (samePlan.isDefined) {
samePlan.get
} else {
sameSchema += plan
plan
}
}

/**
* Find a matching plan with the same canonicalized form in the map and apply `f` on it or add
* the new plan to the map otherwise.
*
* @param plan the input plan
* @param f the function to apply
* @tparam T2 the type of the reuse node
* @return the matching plan with `f` applied or the input plan
*/
def reuseOrElseAdd[T2 >: T](plan: T, f: T => T2): T2 = {
Copy link
Contributor

@attilapiros attilapiros Jul 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this might be a better name: transformReusedOrAdd and/or consider rename f to funcOnReused. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @attilapiros for the feedback. I agree with you and I'm happy to rename the method and but I would wait a bit for some more feedback from others as I've renamed this method a few times.

val found = lookupOrElseAdd(plan)
if (found eq plan) {
plan
} else {
f(found)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.types.IntegerType

case class TestNode(children: Seq[TestNode], output: Seq[Attribute]) extends LogicalPlan {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = copy(children = children)
}
case class TestReuseNode(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(child = newChild)
}

class ReuseMapSuite extends SparkFunSuite {
private val leafNode1 = TestNode(Nil, Seq(AttributeReference("a", IntegerType)()))
private val leafNode2 = TestNode(Nil, Seq(AttributeReference("b", IntegerType)()))
private val parentNode1 = TestNode(Seq(leafNode1), Seq(AttributeReference("a", IntegerType)()))
private val parentNode2 = TestNode(Seq(leafNode2), Seq(AttributeReference("b", IntegerType)()))

private def reuse(testNode: TestNode) = TestReuseNode(testNode)

test("no reuse if same instance") {
val reuseMap = new ReuseMap[TestNode, LogicalPlan]()

reuseMap.reuseOrElseAdd(leafNode1, reuse)
reuseMap.reuseOrElseAdd(parentNode1, reuse)

assert(reuseMap.reuseOrElseAdd(leafNode1, reuse) == leafNode1)
assert(reuseMap.reuseOrElseAdd(parentNode1, reuse) == parentNode1)
}

test("reuse if different instance with same canonicalized plan") {
val reuseMap = new ReuseMap[TestNode, LogicalPlan]()
reuseMap.reuseOrElseAdd(leafNode1, reuse)
reuseMap.reuseOrElseAdd(parentNode1, reuse)

assert(reuseMap.reuseOrElseAdd(leafNode1.clone.asInstanceOf[TestNode], reuse) ==
reuse(leafNode1))
assert(reuseMap.reuseOrElseAdd(parentNode1.clone.asInstanceOf[TestNode], reuse) ==
reuse(parentNode1))
}

test("no reuse if different canonicalized plan") {
val reuseMap = new ReuseMap[TestNode, LogicalPlan]()
reuseMap.reuseOrElseAdd(leafNode1, reuse)
reuseMap.reuseOrElseAdd(parentNode1, reuse)

assert(reuseMap.reuseOrElseAdd(leafNode2, reuse) == leafNode2)
assert(reuseMap.reuseOrElseAdd(parentNode2, reuse) == parentNode2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
Expand All @@ -28,11 +27,9 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveS
object ExplainUtils extends AdaptiveSparkPlanHelper {
/**
* Given a input physical plan, performs the following tasks.
* 1. Computes the operator id for current operator and records it in the operator
* by setting a tag.
* 2. Computes the whole stage codegen id for current operator and records it in the
* 1. Computes the whole stage codegen id for current operator and records it in the
* operator by setting a tag.
* 3. Generate the two part explain output for this plan.
* 2. Generate the two part explain output for this plan.
* 1. First part explains the operator tree with each operator tagged with an unique
* identifier.
* 2. Second part explains each operator in a verbose manner.
Expand All @@ -41,22 +38,11 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
*
* @param plan Input query plan to process
* @param append function used to append the explain output
* @param startOperatorID The start value of operation id. The subsequent operations will
* be assigned higher value.
*
* @return The last generated operation id for this input plan. This is to ensure we
* always assign incrementing unique id to each operator.
*
*/
private def processPlanSkippingSubqueries[T <: QueryPlan[T]](
plan: => QueryPlan[T],
append: String => Unit,
startOperatorID: Int): Int = {

val operationIDs = new mutable.ArrayBuffer[(Int, QueryPlan[_])]()
var currentOperatorID = startOperatorID
plan: T,
append: String => Unit): Unit = {
try {
currentOperatorID = generateOperatorIDs(plan, currentOperatorID, operationIDs)
generateWholeStageCodegenIds(plan)

QueryPlan.append(
Expand All @@ -67,31 +53,36 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
printOperatorId = true)

append("\n")
var i: Integer = 0
for ((opId, curPlan) <- operationIDs) {
append(curPlan.verboseStringWithOperatorId())
}

val operationsWithID = ArrayBuffer.empty[QueryPlan[_]]
collectOperatorsWithID(plan, operationsWithID)
operationsWithID.foreach(p => append(p.verboseStringWithOperatorId()))

} catch {
case e: AnalysisException => append(e.toString)
}
currentOperatorID
}

/**
* Given a input physical plan, performs the following tasks.
* 1. Generates the explain output for the input plan excluding the subquery plans.
* 2. Generates the explain output for each subquery referenced in the plan.
*/
def processPlan[T <: QueryPlan[T]](
plan: => QueryPlan[T],
append: String => Unit): Unit = {
def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
try {
val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
var currentOperatorID = 0
currentOperatorID = processPlanSkippingSubqueries(plan, append, currentOperatorID)
currentOperatorID = generateOperatorIDs(plan, currentOperatorID)

val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
getSubqueries(plan, subqueries)
var i = 0

subqueries.foldLeft(currentOperatorID) {
(curId, plan) => generateOperatorIDs(plan._3.child, curId)
}

processPlanSkippingSubqueries(plan, append)

var i = 0
for (sub <- subqueries) {
if (i == 0) {
append("\n===== Subqueries =====\n\n")
Expand All @@ -104,10 +95,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
// the explain output. In case of subquery reuse, we don't print subquery plan more
// than once. So we skip [[ReusedSubqueryExec]] here.
if (!sub._3.isInstanceOf[ReusedSubqueryExec]) {
currentOperatorID = processPlanSkippingSubqueries(
sub._3.child,
append,
currentOperatorID)
processPlanSkippingSubqueries(sub._3.child, append)
}
append("\n")
}
Expand All @@ -117,59 +105,85 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
}

/**
* Traverses the supplied input plan in a bottom-up fashion does the following :
* 1. produces a map : operator identifier -> operator
* 2. Records the operator id via setting a tag in the operator.
* Traverses the supplied input plan in a bottom-up fashion and records the operator id via
* setting a tag in the operator.
* Note :
* 1. Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't
* appear in the explain output.
* 2. operator identifier starts at startOperatorID + 1
* - Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't
* appear in the explain output.
* - Operator identifier starts at startOperatorID + 1
*
* @param plan Input query plan to process
* @param startOperatorID The start value of operation id. The subsequent operations will
* be assigned higher value.
* @param operatorIDs A output parameter that contains a map of operator id and query plan. This
* is used by caller to print the detail portion of the plan.
* @return The last generated operation id for this input plan. This is to ensure we
* always assign incrementing unique id to each operator.
* @param startOperatorID The start value of operation id. The subsequent operations will be
* assigned higher value.
* @return The last generated operation id for this input plan. This is to ensure we always
* assign incrementing unique id to each operator.
*/
private def generateOperatorIDs(
plan: QueryPlan[_],
startOperatorID: Int,
operatorIDs: mutable.ArrayBuffer[(Int, QueryPlan[_])]): Int = {
private def generateOperatorIDs(plan: QueryPlan[_], startOperatorID: Int): Int = {
var currentOperationID = startOperatorID
// Skip the subqueries as they are not printed as part of main query block.
if (plan.isInstanceOf[BaseSubqueryExec]) {
return currentOperationID
}
plan.foreachUp {
case p: WholeStageCodegenExec =>
case p: InputAdapter =>
case other: QueryPlan[_] =>

def setOpId(): Unit = if (other.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
currentOperationID += 1
other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
operatorIDs += ((currentOperationID, other))
}
def setOpId(plan: QueryPlan[_]): Unit = if (plan.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
currentOperationID += 1
plan.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
}

other match {
case p: AdaptiveSparkPlanExec =>
currentOperationID =
generateOperatorIDs(p.executedPlan, currentOperationID, operatorIDs)
setOpId()
case p: QueryStageExec =>
currentOperationID = generateOperatorIDs(p.plan, currentOperationID, operatorIDs)
setOpId()
case _ =>
setOpId()
other.innerChildren.foldLeft(currentOperationID) {
(curId, plan) => generateOperatorIDs(plan, curId, operatorIDs)
}
plan.foreachUp {
case _: WholeStageCodegenExec =>
case _: InputAdapter =>
case p: AdaptiveSparkPlanExec =>
currentOperationID = generateOperatorIDs(p.executedPlan, currentOperationID)
setOpId(p)
case p: QueryStageExec =>
currentOperationID = generateOperatorIDs(p.plan, currentOperationID)
setOpId(p)
case other: QueryPlan[_] =>
setOpId(other)
other.innerChildren.foldLeft(currentOperationID) {
(curId, plan) => generateOperatorIDs(plan, curId)
}
}
currentOperationID
}

/**
* Traverses the supplied input plan in a bottom-up fashion and collects operators with assigned
* ids.
*
* @param plan Input query plan to process
* @param operators An output parameter that contains the operators.
*/
private def collectOperatorsWithID(
plan: QueryPlan[_],
operators: ArrayBuffer[QueryPlan[_]]): Unit = {
// Skip the subqueries as they are not printed as part of main query block.
if (plan.isInstanceOf[BaseSubqueryExec]) {
return
}

def collectOperatorWithID(plan: QueryPlan[_]): Unit = {
if (plan.getTagValue(QueryPlan.OP_ID_TAG).isDefined) {
operators += plan
}
}

plan.foreachUp {
case _: WholeStageCodegenExec =>
case _: InputAdapter =>
case p: AdaptiveSparkPlanExec =>
collectOperatorsWithID(p.executedPlan, operators)
collectOperatorWithID(p)
case p: QueryStageExec =>
collectOperatorsWithID(p.plan, operators)
collectOperatorWithID(p)
case other: QueryPlan[_] =>
collectOperatorWithID(other)
other.innerChildren.foreach(collectOperatorsWithID(_, operators))
}
}

/**
* Traverses the supplied input plan in a top-down fashion and records the
* whole stage code gen id in the plan via setting a tag.
Expand Down
Loading