diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 1dbae4d37d8f5..b87bbb4874670 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -438,7 +438,7 @@ case class HiveTableRelation( def isPartitioned: Boolean = partitionCols.nonEmpty - override lazy val canonicalized: HiveTableRelation = copy( + override def doCanonicalize(): HiveTableRelation = copy( tableMeta = tableMeta.copy( storage = CatalogStorageFormat.empty, createTime = -1 @@ -448,7 +448,8 @@ case class HiveTableRelation( }, partitionCols = partitionCols.zipWithIndex.map { case (attr, index) => attr.withExprId(ExprId(index + dataCols.length)) - }) + } + ) override def computeStats(): Statistics = { tableMeta.stats.map(_.toPlanStats(output)).getOrElse { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index c7952e3ff8280..d21b4afa2f06c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -180,6 +180,15 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT override protected def innerChildren: Seq[QueryPlan[_]] = subqueries + /** + * A private mutable variable to indicate whether this plan is the result of canonicalization. + * This is used solely for making sure we wouldn't execute a canonicalized plan. + * See [[canonicalized]] on how this is set. + */ + @transient private var _isCanonicalizedPlan: Boolean = false + + protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan + /** * Returns a plan where a best effort attempt has been made to transform `this` in a way * that preserves the result but removes cosmetic variations (case sensitivity, ordering for @@ -188,10 +197,24 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * Plans where `this.canonicalized == other.canonicalized` will always evaluate to the same * result. * - * Some nodes should overwrite this to provide proper canonicalize logic, but they should remove - * expressions cosmetic variations themselves. + * Plan nodes that require special canonicalization should override [[doCanonicalize()]]. + * They should remove expressions cosmetic variations themselves. + */ + @transient final lazy val canonicalized: PlanType = { + var plan = doCanonicalize() + // If the plan has not been changed due to canonicalization, make a copy of it so we don't + // mutate the original plan's _isCanonicalizedPlan flag. + if (plan eq this) { + plan = plan.makeCopy(plan.mapProductIterator(x => x.asInstanceOf[AnyRef])) + } + plan._isCanonicalizedPlan = true + plan + } + + /** + * Defines how the canonicalization should work for the current plan. */ - lazy val canonicalized: PlanType = { + protected def doCanonicalize(): PlanType = { val canonicalizedChildren = children.map(_.canonicalized) var id = -1 mapExpressions { @@ -213,7 +236,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT }.withNewChildren(canonicalizedChildren) } - /** * Returns true when the given query plan will return the same results as this query plan. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 80243d3d356ca..c2750c3079814 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -760,7 +760,7 @@ case class SubqueryAlias( child: LogicalPlan) extends UnaryNode { - override lazy val canonicalized: LogicalPlan = child.canonicalized + override def doCanonicalize(): LogicalPlan = child.canonicalized override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias))) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index 29a43528124d8..cbb626590d1d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -41,7 +41,7 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo()) override def output: Seq[Attribute] = child.output - override lazy val canonicalized: LogicalPlan = child.canonicalized + override def doCanonicalize(): LogicalPlan = child.canonicalized } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 8d0fc32feac99..e9f65031143b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -139,7 +139,7 @@ case class RowDataSourceScanExec( } // Don't care about `rdd` and `tableIdentifier` when canonicalizing. - override lazy val canonicalized: SparkPlan = + override def doCanonicalize(): SparkPlan = copy( fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)), rdd = null, @@ -522,7 +522,7 @@ case class FileSourceScanExec( } } - override lazy val canonicalized: FileSourceScanExec = { + override def doCanonicalize(): FileSourceScanExec = { FileSourceScanExec( relation, output.map(QueryPlan.normalizeExprId(_, output)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 2ffd948f984bf..657b265260135 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -111,6 +111,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * Concrete implementations of SparkPlan should override `doExecute`. */ final def execute(): RDD[InternalRow] = executeQuery { + if (isCanonicalizedPlan) { + throw new IllegalStateException("A canonicalized plan is not supposed to be executed.") + } doExecute() } @@ -121,6 +124,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * Concrete implementations of SparkPlan should override `doExecuteBroadcast`. */ final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery { + if (isCanonicalizedPlan) { + throw new IllegalStateException("A canonicalized plan is not supposed to be executed.") + } doExecuteBroadcast() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 6de9ea0efd2c6..29b584b55972c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -286,7 +286,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * Create a [[ClearCacheCommand]] logical plan. */ override def visitClearCache(ctx: ClearCacheContext): LogicalPlan = withOrigin(ctx) { - ClearCacheCommand + ClearCacheCommand() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index d15ece304cac4..e58c3cec2df15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -350,7 +350,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) - override lazy val canonicalized: SparkPlan = { + override def doCanonicalize(): SparkPlan = { RangeExec(range.canonicalized.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.Range]) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 140f920eaafae..687994d82a003 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -66,10 +66,13 @@ case class UncacheTableCommand( /** * Clear all cached data from the in-memory cache. */ -case object ClearCacheCommand extends RunnableCommand { +case class ClearCacheCommand() extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { sparkSession.catalog.clearCache() Seq.empty[Row] } + + /** [[org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy()]] does not support 0-arg ctor. */ + override def makeCopy(newArgs: Array[AnyRef]): ClearCacheCommand = ClearCacheCommand() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 3e98cb28453a2..236995708a12f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -35,7 +35,7 @@ case class LogicalRelation( extends LeafNode with MultiInstanceRelation { // Only care about relation when canonicalizing. - override lazy val canonicalized: LogicalPlan = copy( + override def doCanonicalize(): LogicalPlan = copy( output = output.map(QueryPlan.normalizeExprId(_, output)), catalogTable = None) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 880e18c6808b0..daea6c39624d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -48,7 +48,7 @@ case class BroadcastExchangeExec( override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) - override lazy val canonicalized: SparkPlan = { + override def doCanonicalize(): SparkPlan = { BroadcastExchangeExec(mode.canonicalized, child.canonicalized) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index 4b52f3e4c49b0..09f79a2de0ba0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -50,7 +50,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan extends LeafExecNode { // Ignore this wrapper for canonicalizing. - override lazy val canonicalized: SparkPlan = child.canonicalized + override def doCanonicalize(): SparkPlan = child.canonicalized def doExecute(): RDD[InternalRow] = { child.execute() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala new file mode 100644 index 0000000000000..750d9e4adf8b4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSQLContext + +class SparkPlanSuite extends QueryTest with SharedSQLContext { + + test("SPARK-21619 execution of a canonicalized plan should fail") { + val plan = spark.range(10).queryExecution.executedPlan.canonicalized + + intercept[IllegalStateException] { plan.execute() } + intercept[IllegalStateException] { plan.executeCollect() } + intercept[IllegalStateException] { plan.executeCollectPublic() } + intercept[IllegalStateException] { plan.executeToIterator() } + intercept[IllegalStateException] { plan.executeBroadcast() } + intercept[IllegalStateException] { plan.executeTake(1) } + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 4f8dab9cd6172..7dcaf170f9693 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -203,11 +203,11 @@ case class HiveTableScanExec( } } - override lazy val canonicalized: HiveTableScanExec = { + override def doCanonicalize(): HiveTableScanExec = { val input: AttributeSeq = relation.output HiveTableScanExec( requestedAttributes.map(QueryPlan.normalizeExprId(_, input)), - relation.canonicalized, + relation.canonicalized.asInstanceOf[HiveTableRelation], QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession) }