Skip to content

Commit

Permalink
[SPARK-21619][SQL] Fail the execution of canonicalized plans explicitly
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
Canonicalized plans are not supposed to be executed. I ran into a case in which there's some code that accidentally calls execute on a canonicalized plan. This patch throws a more explicit exception when that happens.

## How was this patch tested?
Added a test case in SparkPlanSuite.

Author: Reynold Xin <rxin@databricks.com>

Closes #18828 from rxin/SPARK-21619.
  • Loading branch information
rxin authored and gatorsmile committed Oct 28, 2017
1 parent c42d208 commit d28d573
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ case class HiveTableRelation(

def isPartitioned: Boolean = partitionCols.nonEmpty

override lazy val canonicalized: HiveTableRelation = copy(
override def doCanonicalize(): HiveTableRelation = copy(
tableMeta = tableMeta.copy(
storage = CatalogStorageFormat.empty,
createTime = -1
Expand All @@ -448,7 +448,8 @@ case class HiveTableRelation(
},
partitionCols = partitionCols.zipWithIndex.map {
case (attr, index) => attr.withExprId(ExprId(index + dataCols.length))
})
}
)

override def computeStats(): Statistics = {
tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT

override protected def innerChildren: Seq[QueryPlan[_]] = subqueries

/**
* A private mutable variable to indicate whether this plan is the result of canonicalization.
* This is used solely for making sure we wouldn't execute a canonicalized plan.
* See [[canonicalized]] on how this is set.
*/
@transient private var _isCanonicalizedPlan: Boolean = false

protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan

/**
* Returns a plan where a best effort attempt has been made to transform `this` in a way
* that preserves the result but removes cosmetic variations (case sensitivity, ordering for
Expand All @@ -188,10 +197,24 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
* Plans where `this.canonicalized == other.canonicalized` will always evaluate to the same
* result.
*
* Some nodes should overwrite this to provide proper canonicalize logic, but they should remove
* expressions cosmetic variations themselves.
* Plan nodes that require special canonicalization should override [[doCanonicalize()]].
* They should remove expressions cosmetic variations themselves.
*/
@transient final lazy val canonicalized: PlanType = {
var plan = doCanonicalize()
// If the plan has not been changed due to canonicalization, make a copy of it so we don't
// mutate the original plan's _isCanonicalizedPlan flag.
if (plan eq this) {
plan = plan.makeCopy(plan.mapProductIterator(x => x.asInstanceOf[AnyRef]))
}
plan._isCanonicalizedPlan = true
plan
}

/**
* Defines how the canonicalization should work for the current plan.
*/
lazy val canonicalized: PlanType = {
protected def doCanonicalize(): PlanType = {
val canonicalizedChildren = children.map(_.canonicalized)
var id = -1
mapExpressions {
Expand All @@ -213,7 +236,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
}.withNewChildren(canonicalizedChildren)
}


/**
* Returns true when the given query plan will return the same results as this query plan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ case class SubqueryAlias(
child: LogicalPlan)
extends UnaryNode {

override lazy val canonicalized: LogicalPlan = child.canonicalized
override def doCanonicalize(): LogicalPlan = child.canonicalized

override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo())

override def output: Seq[Attribute] = child.output

override lazy val canonicalized: LogicalPlan = child.canonicalized
override def doCanonicalize(): LogicalPlan = child.canonicalized
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ case class RowDataSourceScanExec(
}

// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
override lazy val canonicalized: SparkPlan =
override def doCanonicalize(): SparkPlan =
copy(
fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)),
rdd = null,
Expand Down Expand Up @@ -522,7 +522,7 @@ case class FileSourceScanExec(
}
}

override lazy val canonicalized: FileSourceScanExec = {
override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
output.map(QueryPlan.normalizeExprId(_, output)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* Concrete implementations of SparkPlan should override `doExecute`.
*/
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}

Expand All @@ -121,6 +124,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* Concrete implementations of SparkPlan should override `doExecuteBroadcast`.
*/
final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecuteBroadcast()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* Create a [[ClearCacheCommand]] logical plan.
*/
override def visitClearCache(ctx: ClearCacheContext): LogicalPlan = withOrigin(ctx) {
ClearCacheCommand
ClearCacheCommand()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

override lazy val canonicalized: SparkPlan = {
override def doCanonicalize(): SparkPlan = {
RangeExec(range.canonicalized.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.Range])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ case class UncacheTableCommand(
/**
* Clear all cached data from the in-memory cache.
*/
case object ClearCacheCommand extends RunnableCommand {
case class ClearCacheCommand() extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.catalog.clearCache()
Seq.empty[Row]
}

/** [[org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy()]] does not support 0-arg ctor. */
override def makeCopy(newArgs: Array[AnyRef]): ClearCacheCommand = ClearCacheCommand()
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class LogicalRelation(
extends LeafNode with MultiInstanceRelation {

// Only care about relation when canonicalizing.
override lazy val canonicalized: LogicalPlan = copy(
override def doCanonicalize(): LogicalPlan = copy(
output = output.map(QueryPlan.normalizeExprId(_, output)),
catalogTable = None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ case class BroadcastExchangeExec(

override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)

override lazy val canonicalized: SparkPlan = {
override def doCanonicalize(): SparkPlan = {
BroadcastExchangeExec(mode.canonicalized, child.canonicalized)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
extends LeafExecNode {

// Ignore this wrapper for canonicalizing.
override lazy val canonicalized: SparkPlan = child.canonicalized
override def doCanonicalize(): SparkPlan = child.canonicalized

def doExecute(): RDD[InternalRow] = {
child.execute()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSQLContext

class SparkPlanSuite extends QueryTest with SharedSQLContext {

test("SPARK-21619 execution of a canonicalized plan should fail") {
val plan = spark.range(10).queryExecution.executedPlan.canonicalized

intercept[IllegalStateException] { plan.execute() }
intercept[IllegalStateException] { plan.executeCollect() }
intercept[IllegalStateException] { plan.executeCollectPublic() }
intercept[IllegalStateException] { plan.executeToIterator() }
intercept[IllegalStateException] { plan.executeBroadcast() }
intercept[IllegalStateException] { plan.executeTake(1) }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,11 @@ case class HiveTableScanExec(
}
}

override lazy val canonicalized: HiveTableScanExec = {
override def doCanonicalize(): HiveTableScanExec = {
val input: AttributeSeq = relation.output
HiveTableScanExec(
requestedAttributes.map(QueryPlan.normalizeExprId(_, input)),
relation.canonicalized,
relation.canonicalized.asInstanceOf[HiveTableRelation],
QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession)
}

Expand Down

0 comments on commit d28d573

Please sign in to comment.