Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21619][SQL] Fail the execution of canonicalized plans explicitly #18828

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ case class CatalogRelation(
Objects.hashCode(tableMeta.identifier, output)
}

override lazy val canonicalized: LogicalPlan = copy(
override def doCanonicalize(): LogicalPlan = copy(
tableMeta = tableMeta.copy(
storage = CatalogStorageFormat.empty,
createTime = -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT

override def innerChildren: Seq[QueryPlan[_]] = subqueries

/**
* A private mutable variable to indicate whether this plan is the result of canonicalization.
* This is used solely for making sure we wouldn't execute a canonicalized plan.
* See [[canonicalized]] on how this is set.
*/
@transient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess plans are already not valid on executors, by why @transient?

private var _isCanonicalizedPlan: Boolean = false

protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan

/**
* Returns a plan where a best effort attempt has been made to transform `this` in a way
* that preserves the result but removes cosmetic variations (case sensitivity, ordering for
Expand All @@ -188,10 +198,21 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
* Plans where `this.canonicalized == other.canonicalized` will always evaluate to the same
* result.
*
* Some nodes should overwrite this to provide proper canonicalize logic, but they should remove
* expressions cosmetic variations themselves.
* Plan nodes that require special canonicalization should override [[doCanonicalize()]].
* They should remove expressions cosmetic variations themselves.
*/
lazy val canonicalized: PlanType = {
final lazy val canonicalized: PlanType = {
val plan = doCanonicalize()
// Change only the root node, since it is unlikely some code would go into the subtree (not
// the root) and try execute that part.
plan._isCanonicalizedPlan = true
plan
}

/**
* Defines how the canonicalization should work for the current plan.
*/
protected def doCanonicalize(): PlanType = {
val canonicalizedChildren = children.map(_.canonicalized)
var id = -1
mapExpressions {
Expand All @@ -213,7 +234,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
}.withNewChildren(canonicalizedChildren)
}


/**
* Returns true when the given query plan will return the same results as this query plan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ case class SubqueryAlias(
child: LogicalPlan)
extends UnaryNode {

override lazy val canonicalized: LogicalPlan = child.canonicalized
override def doCanonicalize(): LogicalPlan = child.canonicalized

override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo())

override def output: Seq[Attribute] = child.output

override lazy val canonicalized: LogicalPlan = child.canonicalized
override def doCanonicalize(): LogicalPlan = child.canonicalized
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ case class RowDataSourceScanExec(
}

// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
override lazy val canonicalized: SparkPlan =
override def doCanonicalize(): SparkPlan =
copy(
fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)),
rdd = null,
Expand Down Expand Up @@ -517,7 +517,7 @@ case class FileSourceScanExec(
}
}

override lazy val canonicalized: FileSourceScanExec = {
override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
output.map(QueryPlan.normalizeExprId(_, output)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* Concrete implementations of SparkPlan should override `doExecute`.
*/
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}

Expand All @@ -119,6 +122,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* Concrete implementations of SparkPlan should override `doExecuteBroadcast`.
*/
final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecuteBroadcast()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

override lazy val canonicalized: SparkPlan = {
override def doCanonicalize(): SparkPlan = {
RangeExec(range.canonicalized.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.Range])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ case class LogicalRelation(
}

// Only care about relation when canonicalizing.
override lazy val canonicalized: LogicalPlan = copy(
override def doCanonicalize(): LogicalPlan = copy(
output = output.map(QueryPlan.normalizeExprId(_, output)),
catalogTable = None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ case class BroadcastExchangeExec(

override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)

override lazy val canonicalized: SparkPlan = {
override def doCanonicalize(): SparkPlan = {
BroadcastExchangeExec(mode.canonicalized, child.canonicalized)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
extends LeafExecNode {

// Ignore this wrapper for canonicalizing.
override lazy val canonicalized: SparkPlan = child.canonicalized
override def doCanonicalize(): SparkPlan = child.canonicalized

def doExecute(): RDD[InternalRow] = {
child.execute()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSQLContext

class SparkPlanSuite extends QueryTest with SharedSQLContext {

test("SPARK-21619 execution of a canonicalized plan should fail") {
val plan = spark.range(10).queryExecution.executedPlan.canonicalized

intercept[IllegalStateException] { plan.execute() }
intercept[IllegalStateException] { plan.executeCollect() }
intercept[IllegalStateException] { plan.executeCollectPublic() }
intercept[IllegalStateException] { plan.executeToIterator() }
intercept[IllegalStateException] { plan.executeBroadcast() }
intercept[IllegalStateException] { plan.executeTake(1) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. There is an inconsistent corner case in plan.executeTake.

plan.executeTake(1)  -> raise exception
plan.executeTake(0)  -> no exception
plan.executeTake(-1)  -> raise exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not an issue with this test, is it? It's just how execution is done.

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ case class HiveTableScanExec(
}
}

override lazy val canonicalized: HiveTableScanExec = {
override def doCanonicalize(): HiveTableScanExec = {
val input: AttributeSeq = relation.output
HiveTableScanExec(
requestedAttributes.map(QueryPlan.normalizeExprId(_, input)),
Expand Down