From 587de13138c3c3da0a6405eaf908345387766f54 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 2 Aug 2017 19:25:34 -0700 Subject: [PATCH 01/10] [SPARK-21619][SQL] Fail the execution of canonicalized plans explicitly --- .../sql/catalyst/catalog/interface.scala | 2 +- .../spark/sql/catalyst/plans/QueryPlan.scala | 27 ++++++++++++++++--- .../plans/logical/basicLogicalOperators.scala | 2 +- .../sql/catalyst/plans/logical/hints.scala | 2 +- .../sql/execution/DataSourceScanExec.scala | 4 +-- .../spark/sql/execution/SparkPlan.scala | 6 +++++ .../execution/basicPhysicalOperators.scala | 2 +- .../datasources/LogicalRelation.scala | 2 +- .../exchange/BroadcastExchangeExec.scala | 2 +- .../sql/execution/exchange/Exchange.scala | 2 +- 10 files changed, 38 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 9531456434a15..896e969bfd1c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -426,7 +426,7 @@ case class CatalogRelation( Objects.hashCode(tableMeta.identifier, output) } - override lazy val canonicalized: LogicalPlan = copy( + override def doCanonicalize(): LogicalPlan = copy( tableMeta = tableMeta.copy( storage = CatalogStorageFormat.empty, createTime = -1 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 7addbaaa9afa5..30762fe93d0e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -180,6 +180,15 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT override def innerChildren: Seq[QueryPlan[_]] = subqueries + /** + * A private mutable variable to indicate whether this plan is the result of canonicalization. + * This is used solely for making sure we wouldn't execute a canonicalized plan. + * See [[canonicalized]] on how this is set. + */ + private var _isCanonicalizedPlan: Boolean = false + + protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan + /** * Returns a plan where a best effort attempt has been made to transform `this` in a way * that preserves the result but removes cosmetic variations (case sensitivity, ordering for @@ -188,10 +197,21 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * Plans where `this.canonicalized == other.canonicalized` will always evaluate to the same * result. * - * Some nodes should overwrite this to provide proper canonicalize logic, but they should remove - * expressions cosmetic variations themselves. + * Plan nodes that require special canonicalization should override [[doCanonicalize()]]. + * They should remove expressions cosmetic variations themselves. */ - lazy val canonicalized: PlanType = { + final lazy val canonicalized: PlanType = { + val plan = doCanonicalize() + // Change only the root node, since it is unlikely some code would go into the subtree (not + // the root) and try execute that part. + plan._isCanonicalizedPlan = true + plan + } + + /** + * Defines how the canonicalization should work for the current plan. + */ + protected def doCanonicalize(): PlanType = { val canonicalizedChildren = children.map(_.canonicalized) var id = -1 mapExpressions { @@ -213,7 +233,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT }.withNewChildren(canonicalizedChildren) } - /** * Returns true when the given query plan will return the same results as this query plan. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 0bd3166352d35..b28769c52dc73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -687,7 +687,7 @@ case class SubqueryAlias( child: LogicalPlan) extends UnaryNode { - override lazy val canonicalized: LogicalPlan = child.canonicalized + override def doCanonicalize(): LogicalPlan = child.canonicalized override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias))) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index 29a43528124d8..cbb626590d1d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -41,7 +41,7 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo()) override def output: Seq[Attribute] = child.output - override lazy val canonicalized: LogicalPlan = child.canonicalized + override def doCanonicalize(): LogicalPlan = child.canonicalized } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 588c937a13e45..6c7da53c37397 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -139,7 +139,7 @@ case class RowDataSourceScanExec( } // Don't care about `rdd` and `tableIdentifier` when canonicalizing. - override lazy val canonicalized: SparkPlan = + override def doCanonicalize(): SparkPlan = copy( fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)), rdd = null, @@ -517,7 +517,7 @@ case class FileSourceScanExec( } } - override lazy val canonicalized: FileSourceScanExec = { + override def doCanonicalize(): FileSourceScanExec = { FileSourceScanExec( relation, output.map(QueryPlan.normalizeExprId(_, output)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index c7277c21cebb2..c20762bbd1bb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -109,6 +109,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * Concrete implementations of SparkPlan should override `doExecute`. */ final def execute(): RDD[InternalRow] = executeQuery { + if (isCanonicalizedPlan) { + throw new IllegalStateException("A canonicalized plan is not supposed to be executed.") + } doExecute() } @@ -119,6 +122,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * Concrete implementations of SparkPlan should override `doExecuteBroadcast`. */ final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery { + if (isCanonicalizedPlan) { + throw new IllegalStateException("A canonicalized plan is not supposed to be executed.") + } doExecuteBroadcast() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 2151c339b9b87..1517a47beeec5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -343,7 +343,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) - override lazy val canonicalized: SparkPlan = { + override def doCanonicalize(): SparkPlan = { RangeExec(range.canonicalized.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.Range]) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 699f1bad9c4ed..6f41b0921740f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -44,7 +44,7 @@ case class LogicalRelation( } // Only care about relation when canonicalizing. - override lazy val canonicalized: LogicalPlan = copy( + override def doCanonicalize(): LogicalPlan = copy( output = output.map(QueryPlan.normalizeExprId(_, output)), catalogTable = None) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 9c859e41f8762..08dcc0e25a744 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -48,7 +48,7 @@ case class BroadcastExchangeExec( override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) - override lazy val canonicalized: SparkPlan = { + override def doCanonicalize(): SparkPlan = { BroadcastExchangeExec(mode.canonicalized, child.canonicalized) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index 4b52f3e4c49b0..09f79a2de0ba0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -50,7 +50,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan extends LeafExecNode { // Ignore this wrapper for canonicalizing. - override lazy val canonicalized: SparkPlan = child.canonicalized + override def doCanonicalize(): SparkPlan = child.canonicalized def doExecute(): RDD[InternalRow] = { child.execute() From 785a5698eef1f398cbbf71d1805da353e28b341e Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 2 Aug 2017 19:31:16 -0700 Subject: [PATCH 02/10] test case and more --- .../spark/sql/catalyst/plans/QueryPlan.scala | 1 + .../spark/sql/execution/SparkPlanSuite.scala | 36 +++++++++++++++++++ .../hive/execution/HiveTableScanExec.scala | 2 +- 3 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 30762fe93d0e3..a3d2b9b98739b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -185,6 +185,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * This is used solely for making sure we wouldn't execute a canonicalized plan. * See [[canonicalized]] on how this is set. */ + @transient private var _isCanonicalizedPlan: Boolean = false protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala new file mode 100644 index 0000000000000..750d9e4adf8b4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSQLContext + +class SparkPlanSuite extends QueryTest with SharedSQLContext { + + test("SPARK-21619 execution of a canonicalized plan should fail") { + val plan = spark.range(10).queryExecution.executedPlan.canonicalized + + intercept[IllegalStateException] { plan.execute() } + intercept[IllegalStateException] { plan.executeCollect() } + intercept[IllegalStateException] { plan.executeCollectPublic() } + intercept[IllegalStateException] { plan.executeToIterator() } + intercept[IllegalStateException] { plan.executeBroadcast() } + intercept[IllegalStateException] { plan.executeTake(1) } + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index e191071efbf18..26f36ca84eae1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -201,7 +201,7 @@ case class HiveTableScanExec( } } - override lazy val canonicalized: HiveTableScanExec = { + override def doCanonicalize(): HiveTableScanExec = { val input: AttributeSeq = relation.output HiveTableScanExec( requestedAttributes.map(QueryPlan.normalizeExprId(_, input)), From ab9cc31fa57f0ea9b2c87ba6b9d4958eb9eb3b87 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 4 Aug 2017 18:24:32 -0700 Subject: [PATCH 03/10] copy --- .../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index a3d2b9b98739b..9bf6989ce3ff3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -202,7 +202,10 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * They should remove expressions cosmetic variations themselves. */ final lazy val canonicalized: PlanType = { - val plan = doCanonicalize() + var plan = doCanonicalize() + if (plan eq this) { + plan = plan.makeCopy(plan.mapProductIterator(x => x.isInstanceOf[AnyRef])) + } // Change only the root node, since it is unlikely some code would go into the subtree (not // the root) and try execute that part. plan._isCanonicalizedPlan = true From 273f22abe927abd83454cf059fd0f715a93c7113 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 4 Aug 2017 18:25:17 -0700 Subject: [PATCH 04/10] comment --- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 9bf6989ce3ff3..708642903f822 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -203,11 +203,11 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT */ final lazy val canonicalized: PlanType = { var plan = doCanonicalize() + // If the plan has not been changed due to canonicalization, make a copy of it so we don't + // mutate the original plan's _isCanonicalizedPlan flag. if (plan eq this) { plan = plan.makeCopy(plan.mapProductIterator(x => x.isInstanceOf[AnyRef])) } - // Change only the root node, since it is unlikely some code would go into the subtree (not - // the root) and try execute that part. plan._isCanonicalizedPlan = true plan } From 8f2f91d9dfd0e19e0f60fc70f3f10ef73a2d0019 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 4 Aug 2017 18:25:38 -0700 Subject: [PATCH 05/10] fix typo --- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 708642903f822..ff003211b6f1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -206,7 +206,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT // If the plan has not been changed due to canonicalization, make a copy of it so we don't // mutate the original plan's _isCanonicalizedPlan flag. if (plan eq this) { - plan = plan.makeCopy(plan.mapProductIterator(x => x.isInstanceOf[AnyRef])) + plan = plan.makeCopy(plan.mapProductIterator(x => x.asInstanceOf[AnyRef])) } plan._isCanonicalizedPlan = true plan From 84e76b938517363de7d20adb9f2b87486293edac Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 4 Aug 2017 21:05:59 -0700 Subject: [PATCH 06/10] transient --- .../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index ff003211b6f1d..c1e4649343cdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -185,8 +185,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * This is used solely for making sure we wouldn't execute a canonicalized plan. * See [[canonicalized]] on how this is set. */ - @transient - private var _isCanonicalizedPlan: Boolean = false + @transient private var _isCanonicalizedPlan: Boolean = false protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan @@ -201,7 +200,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * Plan nodes that require special canonicalization should override [[doCanonicalize()]]. * They should remove expressions cosmetic variations themselves. */ - final lazy val canonicalized: PlanType = { + @transient final lazy val canonicalized: PlanType = { var plan = doCanonicalize() // If the plan has not been changed due to canonicalization, make a copy of it so we don't // mutate the original plan's _isCanonicalizedPlan flag. From a09e502a7c0f88086ac32e6c4cb845d17d4c2f44 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 4 Aug 2017 21:07:10 -0700 Subject: [PATCH 07/10] Clear cache. --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 2 +- .../scala/org/apache/spark/sql/execution/command/cache.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d4414b6f78ca2..62fd2d9cdd1b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -276,7 +276,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * Create a [[ClearCacheCommand]] logical plan. */ override def visitClearCache(ctx: ClearCacheContext): LogicalPlan = withOrigin(ctx) { - ClearCacheCommand + ClearCacheCommand() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 47952f2f227a3..31ff083284963 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -67,7 +67,7 @@ case class UncacheTableCommand( /** * Clear all cached data from the in-memory cache. */ -case object ClearCacheCommand extends RunnableCommand { +case class ClearCacheCommand() extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { sparkSession.catalog.clearCache() From 861f255638e0b301e3a21b5a0bc491fbfc9537d4 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 4 Aug 2017 21:07:50 -0700 Subject: [PATCH 08/10] clear cache --- .../scala/org/apache/spark/sql/execution/command/cache.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 31ff083284963..7cb6e3ec5ee71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.SQLExecution case class CacheTableCommand( tableIdent: TableIdentifier, @@ -73,4 +72,7 @@ case class ClearCacheCommand() extends RunnableCommand { sparkSession.catalog.clearCache() Seq.empty[Row] } + + /** [[org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy()]] does not support 0-arg ctor. */ + override def makeCopy(newArgs: Array[AnyRef]): ClearCacheCommand = ClearCacheCommand() } From f8238b103382b34aa637492b57793fa5e557313c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 27 Oct 2017 20:02:18 -0700 Subject: [PATCH 09/10] fix merge conflict --- .../apache/spark/sql/catalyst/catalog/interface.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c09a5f5599207..6d4c59fcca7c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -438,15 +438,6 @@ case class HiveTableRelation( def isPartitioned: Boolean = partitionCols.nonEmpty - override def equals(relation: Any): Boolean = relation match { - case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output - case _ => false - } - - override def hashCode(): Int = { - Objects.hashCode(tableMeta.identifier, output) - } - override def doCanonicalize(): LogicalPlan = copy( tableMeta = tableMeta.copy( storage = CatalogStorageFormat.empty, From 8346e083ce0f7afbdb5b14371725f74c87b1f8e2 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 27 Oct 2017 20:15:17 -0700 Subject: [PATCH 10/10] fix compile --- .../scala/org/apache/spark/sql/catalyst/catalog/interface.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveTableScanExec.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 6d4c59fcca7c1..b87bbb4874670 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -438,7 +438,7 @@ case class HiveTableRelation( def isPartitioned: Boolean = partitionCols.nonEmpty - override def doCanonicalize(): LogicalPlan = copy( + override def doCanonicalize(): HiveTableRelation = copy( tableMeta = tableMeta.copy( storage = CatalogStorageFormat.empty, createTime = -1 diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index a9cb92d9a8f8b..7dcaf170f9693 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -207,7 +207,7 @@ case class HiveTableScanExec( val input: AttributeSeq = relation.output HiveTableScanExec( requestedAttributes.map(QueryPlan.normalizeExprId(_, input)), - relation.canonicalized, + relation.canonicalized.asInstanceOf[HiveTableRelation], QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession) }