From d4b962f19718aa183ecfd43bb5e9e21a181c11b8 Mon Sep 17 00:00:00 2001 From: wangfei Date: Fri, 8 May 2015 10:42:44 +0800 Subject: [PATCH] added WindowsSubstitution --- .../sql/catalyst/analysis/Analyzer.scala | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index eeba787e196d4..bb7913e186a85 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -57,6 +57,7 @@ class Analyzer( lazy val batches: Seq[Batch] = Seq( Batch("Substitution", fixedPoint, CTESubstitution :: + WindowsSubstitution :: Nil : _*), Batch("Resolution", fixedPoint, ResolveRelations :: @@ -101,6 +102,28 @@ class Analyzer( } } + /** + * Substitute child plan with WindowSpecDefinitions. + */ + object WindowsSubstitution extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Lookup WindowSpecDefinitions. This rule works with unresolved children. + case WithWindowDefinition(windowDefinitions, child) => + child.transform { + case plan => plan.transformExpressions { + case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) => + val errorMessage = + s"Window specification $windowName is not defined in the WINDOW clause." + val windowSpecDefinition = + windowDefinitions + .get(windowName) + .getOrElse(failAnalysis(errorMessage)) + WindowExpression(c, windowSpecDefinition) + } + } + } + } + /** * Removes no-op Alias expressions from the plan. */ @@ -678,21 +701,6 @@ class Analyzer( // We have to use transformDown at here to make sure the rule of // "Aggregate with Having clause" will be triggered. def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - // Lookup WindowSpecDefinitions. This rule works with unresolved children. - case WithWindowDefinition(windowDefinitions, child) => - child.transform { - case plan => plan.transformExpressions { - case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) => - val errorMessage = - s"Window specification $windowName is not defined in the WINDOW clause." - val windowSpecDefinition = - windowDefinitions - .get(windowName) - .getOrElse(failAnalysis(errorMessage)) - WindowExpression(c, windowSpecDefinition) - } - } - // Aggregate with Having clause. This rule works with an unresolved Aggregate because // a resolved Aggregate will not have Window Functions. case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))