diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 443bdc831be64..1aa90f06ac215 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -54,7 +54,6 @@ class Analyzer(catalog: Catalog, lazy val batches: Seq[Batch] = Seq( Batch("Resolution", fixedPoint, - ResolveWith :: ResolveRelations :: ResolveReferences :: ResolveGroupingAnalytics :: @@ -167,34 +166,37 @@ class Analyzer(catalog: Catalog, } } - object ResolveWith extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case origin @ With(child, subQueries) - if subQueries.forall(query => !catalog.tableExists(Seq(query.alias))) => - subQueries.foreach(subQuery => - catalog.registerTable(Seq(subQuery.alias), subQuery.child)) - origin - } - } /** * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { - def getTable(u: UnresolvedRelation) = { + def getTable(u: UnresolvedRelation, extraRelations: Option[Seq[Subquery]]) = { try { - catalog.lookupRelation(u.tableIdentifier, u.alias) + extraRelations.fold(catalog.lookupRelation(u.tableIdentifier, u.alias)) { cteRelations => + cteRelations.find(_.alias == u.tableIdentifier) + .map(relation => u.alias.map(Subquery(_, relation.child)).getOrElse(relation.child)) + .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias)) + } } catch { case _: NoSuchTableException => u.failAnalysis(s"no such table ${u.tableIdentifier}") } } - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) => - i.copy( - table = EliminateSubQueries(getTable(u))) - case u: UnresolvedRelation => - getTable(u) + def apply(plan: LogicalPlan): LogicalPlan = { + val cteRelations = plan match { + case With(child, subQueries) => + Some(subQueries) + case _ => None + } + + plan transform { + case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) => + i.copy( + table = EliminateSubQueries(getTable(u, cteRelations))) + case u: UnresolvedRelation => + getTable(u, cteRelations) + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1a75fcf3545bd..4599e27f5f4c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -53,7 +53,14 @@ object DefaultOptimizer extends Optimizer { PushPredicateThroughGenerate, ColumnPruning) :: Batch("LocalRelation", FixedPoint(100), - ConvertToLocalRelation) :: Nil + ConvertToLocalRelation) :: + Batch("Drop With", Once, DropWith) :: Nil +} + +object DropWith extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case With(child, _) => child + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 8a160f860e712..5281c7502556a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -303,8 +303,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case e @ EvaluatePython(udf, child, _) => BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil - case logical.With(child, subQueries) => - execution.With(planLater(child), subQueries.map(_.alias)) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 67c6d9fce1b25..710268584cff1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -257,16 +257,6 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { } } -case class With(child: SparkPlan, subQueries: Seq[String]) extends UnaryNode { - override def output = child.output - - override def execute() = { - val result = child.execute() - subQueries.foreach(name => sqlContext.catalog.unregisterTable(Seq(name))) - result - } -} - /** * :: DeveloperApi :: * A plan node that does nothing but lie about the output of its child. Used to spice a diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index c9b58f96a1f22..e813027d94ba8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -572,17 +572,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val (fromClause: Option[ASTNode], insertClauses, cteSubqueries) = queryArgs match { case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses => - val (newInsertClauses, cteSubqueries) = insertClauses.last match { + insertClauses.last match { case Token("TOK_CTE", cteClauses) => val subQueries = cteClauses.map(node => { val relation = nodeToRelation(node) relation.asInstanceOf[Subquery] }).toSeq - (insertClauses.init, Some(subQueries)) + (Some(args.head), insertClauses.init, Some(subQueries)) - case _ => (insertClauses, None) + case _ => (Some(args.head), insertClauses, None) } - (Some(args.head), newInsertClauses, cteSubqueries) case Token("TOK_INSERT", _) :: Nil => (None, queryArgs, None) }