From 1c9a0920c34fc4e64fa9ef91aec4cfeb02917b2f Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 29 Apr 2015 17:17:57 +0800 Subject: [PATCH] added Substitution bastch --- .../sql/catalyst/analysis/Analyzer.scala | 71 +++++++++++++------ 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5e42b409dcc59..0fe5e2d1ade65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -47,12 +47,21 @@ class Analyzer( val fixedPoint = FixedPoint(maxIterations) + /** + * Override to provide additional rules for the "Substitution" batch. + */ + val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = Nil + /** * Override to provide additional rules for the "Resolution" batch. */ val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil lazy val batches: Seq[Batch] = Seq( + Batch("Substitution", fixedPoint, + CTESubstitution :: + Nil ++ + extendedSubstitutionRules : _*), Batch("Resolution", fixedPoint, ResolveRelations :: ResolveReferences :: @@ -68,6 +77,38 @@ class Analyzer( extendedResolutionRules : _*) ) + /** + * Substitute CTE definitions + */ + object CTESubstitution extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + val (realPlan, cteRelations) = plan match { + case With(child, relations) => + (child, relations) + case other => (other, Map.empty[String, LogicalPlan]) + } + substituteCTE(realPlan, cteRelations) + } + + def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = { + plan transform { + case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => + // In hive, if there is same table name in database and CTE definition, + // hive will use the table in database, not the CTE one. + // Taking into account the reasonableness and the implementation complexity, + // here use the CTE definition first, check table name only and ignore database name + val relation = cteRelations.get(u.tableIdentifier.last) + .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation)) + .getOrElse(u) + i.copy(table = relation) + case u : UnresolvedRelation => + cteRelations.get(u.tableIdentifier.last) + .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation)) + .getOrElse(u) + } + } + } + /** * Removes no-op Alias expressions from the plan. */ @@ -169,36 +210,20 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { - def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = { + def getTable(u: UnresolvedRelation): LogicalPlan = { try { - // In hive, if there is same table name in database and CTE definition, - // hive will use the table in database, not the CTE one. - // Taking into account the reasonableness and the implementation complexity, - // here use the CTE definition first, check table name only and ignore database name - cteRelations.get(u.tableIdentifier.last) - .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation)) - .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias)) + catalog.lookupRelation(u.tableIdentifier, u.alias) } catch { case _: NoSuchTableException => u.failAnalysis(s"no such table ${u.tableName}") } } - def apply(plan: LogicalPlan): LogicalPlan = { - val (realPlan, cteRelations) = plan match { - // TODO allow subquery to define CTE - // Add cte table to a temp relation map,drop `with` plan and keep its child - case With(child, relations) => (child, relations) - case other => (other, Map.empty[String, LogicalPlan]) - } - - realPlan transform { - case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => - i.copy( - table = EliminateSubQueries(getTable(u, cteRelations))) - case u: UnresolvedRelation => - getTable(u, cteRelations) - } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => + i.copy(table = EliminateSubQueries(getTable(u))) + case u: UnresolvedRelation => + getTable(u, cteRelations) } }