Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Feb 12, 2015
1 parent d4e9015 commit 1a797b4
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class Analyzer(catalog: Catalog,
extendedRules : _*),
Batch("Check Analysis", Once,
CheckResolution),
Batch("AnalysisOperators", fixedPoint,
EliminateAnalysisOperators)
Batch("Remove SubQueries", fixedPoint,
EliminateSubQueries)
)

/**
Expand Down Expand Up @@ -104,25 +104,27 @@ class Analyzer(catalog: Catalog,
s"of type ${f.condition.dataType.simpleString} is not a boolean.")

case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) =>
def isValidAggregateExpression(expr: Expression): Boolean = expr match {
case _: AggregateExpression => true
case e: Attribute => groupingExprs.contains(e)
case e if groupingExprs.contains(e) => true
case e if e.references.isEmpty => true
case e => e.children.forall(isValidAggregateExpression)
def checkValidAggregateExpression(expr: Expression): Unit = expr match {
case _: AggregateExpression => // OK
case e: Attribute if !groupingExprs.contains(e) =>
failAnalysis(
s"expression '${e.prettyString}' is neither present in the group by, " +
s"nor is it an aggregate function. " +
"Add to group by or wrap in first() if you don't care which value you get.")
case e: Attribute => // OK
case e if groupingExprs.contains(e) => // OK
case e if e.references.isEmpty => // OK
case e => e.children.foreach(checkValidAggregateExpression)
}

aggregateExprs.find { e =>
!isValidAggregateExpression(e.transform {
// Should trim aliases around `GetField`s. These aliases are introduced while
// resolving struct field accesses, because `GetField` is not a `NamedExpression`.
// (Should we just turn `GetField` into a `NamedExpression`?)
case Alias(g: GetField, _) => g
})
}.foreach { e =>
failAnalysis(
s"expression '${e.prettyString}' is not an aggregate function or in the group by")
}
val cleaned = aggregateExprs.map(_.transform {
// Should trim aliases around `GetField`s. These aliases are introduced while
// resolving struct field accesses, because `GetField` is not a `NamedExpression`.
// (Should we just turn `GetField` into a `NamedExpression`?)
case Alias(g, _) => g
})

cleaned.foreach(checkValidAggregateExpression)

case o if o.children.nonEmpty && !o.references.subsetOf(o.inputSet) =>
val missingAttributes = (o.references -- o.inputSet).map(_.prettyString).mkString(",")
Expand Down Expand Up @@ -247,7 +249,7 @@ class Analyzer(catalog: Catalog,
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) =>
i.copy(
table = EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias)))
table = EliminateSubQueries(catalog.lookupRelation(tableIdentifier, alias)))
case UnresolvedRelation(tableIdentifier, alias) =>
catalog.lookupRelation(tableIdentifier, alias)
}
Expand Down Expand Up @@ -494,7 +496,7 @@ class Analyzer(catalog: Catalog,
* only required to provide scoping information for attributes and can be removed once analysis is
* complete.
*/
object EliminateAnalysisOperators extends Rule[LogicalPlan] {
object EliminateSubQueries extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Subquery(_, child) => child
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.PlanTest
Expand All @@ -30,7 +30,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("AnalysisNodes", Once,
EliminateAnalysisOperators) ::
EliminateSubQueries) ::
Batch("Constant Folding", FixedPoint(50),
NullPropagation,
ConstantFolding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, EliminateAnalysisOperators}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, EliminateSubQueries}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.PlanTest
Expand All @@ -33,7 +33,7 @@ class ConstantFoldingSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("AnalysisNodes", Once,
EliminateAnalysisOperators) ::
EliminateSubQueries) ::
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions.Explode
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
Expand All @@ -32,7 +32,7 @@ class FilterPushdownSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateAnalysisOperators) ::
EliminateSubQueries) ::
Batch("Filter Pushdown", Once,
CombineFilters,
PushPredicateThroughProject,
Expand Down Expand Up @@ -351,7 +351,7 @@ class FilterPushdownSuite extends PlanTest {
}
val optimized = Optimize(originalQuery.analyze)

comparePlans(analysis.EliminateAnalysisOperators(originalQuery.analyze), optimized)
comparePlans(analysis.EliminateSubQueries(originalQuery.analyze), optimized)
}

test("joins: conjunctive predicates") {
Expand All @@ -370,7 +370,7 @@ class FilterPushdownSuite extends PlanTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze

comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}

test("joins: conjunctive predicates #2") {
Expand All @@ -389,7 +389,7 @@ class FilterPushdownSuite extends PlanTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze

comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}

test("joins: conjunctive predicates #3") {
Expand All @@ -412,7 +412,7 @@ class FilterPushdownSuite extends PlanTest {
condition = Some("z.a".attr === "x.b".attr))
.analyze

comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}

val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer

import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.PlanTest
Expand All @@ -34,7 +34,7 @@ class OptimizeInSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("AnalysisNodes", Once,
EliminateAnalysisOperators) ::
EliminateSubQueries) ::
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.rules._
Expand All @@ -29,7 +29,7 @@ class UnionPushdownSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateAnalysisOperators) ::
EliminateSubQueries) ::
Batch("Union Pushdown", Once,
UnionPushdown) :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, OverrideCatalog, OverrideFunctionRegistry}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException}
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
Expand Down Expand Up @@ -104,7 +104,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
*/
@Experimental
def analyze(tableName: String) {
val relation = EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName)))
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))

relation match {
case relation: MetastoreRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.{DataFrame, SQLContext}
Expand Down Expand Up @@ -175,7 +175,7 @@ case class CreateMetastoreDataSourceAsSelect(
val resolved =
ResolvedDataSource(sqlContext, Some(query.schema), provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
EliminateAnalysisOperators(sqlContext.table(tableName).logicalPlan) match {
EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
case l @ LogicalRelation(i: InsertableRelation) =>
if (l.schema != createdRelation.schema) {
val errorDescription =
Expand Down

0 comments on commit 1a797b4

Please sign in to comment.