From 15e5265c46c0087fb08cc4a194a7d4f641c268f4 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 21 Mar 2014 18:42:59 -0700 Subject: [PATCH] Support for resolving mixed case fields from a reflected schema using HiveQL. --- .../sql/catalyst/analysis/Analyzer.scala | 19 +++++++++++- .../sql/catalyst/optimizer/Optimizer.scala | 14 --------- .../plans/logical/basicOperators.scala | 30 ++++++++++++++++++- .../optimizer/ConstantFoldingSuite.scala | 7 +++-- .../optimizer/FilterPushdownSuite.scala | 13 ++++---- .../apache/spark/sql/hive/HiveContext.scala | 12 ++++++-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 6 ++-- .../hive/execution/HiveResolutionSuite.scala | 13 ++++++++ 8 files changed, 85 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fc76e76617670..161d28eba070e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -55,7 +55,9 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool StarExpansion :: ResolveFunctions :: GlobalAggregates :: - typeCoercionRules :_*) + typeCoercionRules :_*), + Batch("AnalysisOperators", fixedPoint, + EliminateAnalysisOperators) ) /** @@ -80,6 +82,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool case s: Star => s.copy(table = s.table.map(_.toLowerCase)) case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase) case Alias(c, name) => Alias(c, name.toLowerCase)() + case GetField(c, name) => GetField(c, name.toLowerCase) } } } @@ -184,3 +187,17 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool exprs.collect { case _: Star => true }.nonEmpty } } + +/** + * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are + * only required to provide scoping information for attributes and can be removed once analysis is + * complete. Similarly, this node also removes + * [[catalyst.plans.logical.LowerCaseSchema LowerCaseSchema]] operators. + */ +object EliminateAnalysisOperators extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case Subquery(_, child) => child + case LowerCaseSchema(child) => child + } +} + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c1201971d9c0a..07ebbc90fce71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -27,30 +27,16 @@ import org.apache.spark.sql.catalyst.types._ object Optimizer extends RuleExecutor[LogicalPlan] { val batches = - Batch("Subqueries", Once, - EliminateSubqueries) :: Batch("ConstantFolding", Once, ConstantFolding, BooleanSimplification, SimplifyCasts) :: Batch("Filter Pushdown", Once, - EliminateSubqueries, CombineFilters, PushPredicateThroughProject, PushPredicateThroughInnerJoin) :: Nil } -/** - * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are - * only required to provide scoping information for attributes and can be removed once analysis is - * complete. - */ -object EliminateSubqueries extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case Subquery(_, child) => child - } -} - /** * Replaces [[catalyst.expressions.Expression Expressions]] that can be statically evaluated with * equivalent [[catalyst.expressions.Literal Literal]] values. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 6480cca30049e..61481de65e76e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -21,6 +21,7 @@ package plans package logical import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { def output = projectList.map(_.toAttribute) @@ -86,7 +87,7 @@ case class Join( } case class InsertIntoTable( - table: BaseRelation, + table: LogicalPlan, partition: Map[String, Option[String]], child: LogicalPlan, overwrite: Boolean) @@ -141,6 +142,33 @@ case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode { def references = Set.empty } +/** + * Converts the schema of `child` to all lowercase, together with LowercaseAttributeReferences + * this allows for optional case insensitive attribute resolution. This node can be elided after + * analysis. + */ +case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode { + protected def lowerCaseSchema(dataType: DataType): DataType = dataType match { + case StructType(fields) => + StructType(fields.map(f => + StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable))) + case ArrayType(elemType) => ArrayType(lowerCaseSchema(elemType)) + case otherType => otherType + } + + val output = child.output.map { + case a: AttributeReference => + AttributeReference( + a.name.toLowerCase, + lowerCaseSchema(a.dataType), + a.nullable)( + a.exprId, + a.qualifiers) + } + + def references = Set.empty +} + case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: LogicalPlan) extends UnaryNode { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index 2c107b865af19..53f760fb4ceb2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -19,21 +19,22 @@ package org.apache.spark.sql package catalyst package optimizer -import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types.IntegerType // For implicit conversions +import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ class ConstantFoldingSuite extends OptimizerTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = - Batch("Subqueries", Once, - EliminateSubqueries) :: + Batch("AnalysisNodes", Once, + EliminateAnalysisOperators) :: Batch("ConstantFolding", Once, ConstantFolding, BooleanSimplification) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index cfbef53de16b4..02dc04a8d6bb5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -2,6 +2,8 @@ package org.apache.spark.sql package catalyst package optimizer + +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -14,9 +16,8 @@ class FilterPushdownSuite extends OptimizerTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Subqueries", Once, - EliminateSubqueries) :: + EliminateAnalysisOperators) :: Batch("Filter Pushdown", Once, - EliminateSubqueries, CombineFilters, PushPredicateThroughProject, PushPredicateThroughInnerJoin) :: Nil @@ -155,7 +156,7 @@ class FilterPushdownSuite extends OptimizerTest { } val optimized = Optimize(originalQuery.analyze) - comparePlans(optimizer.EliminateSubqueries(originalQuery.analyze), optimized) + comparePlans(analysis.EliminateAnalysisOperators(originalQuery.analyze), optimized) } test("joins: conjunctive predicates") { @@ -174,7 +175,7 @@ class FilterPushdownSuite extends OptimizerTest { left.join(right, condition = Some("x.b".attr === "y.b".attr)) .analyze - comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer)) + comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer)) } test("joins: conjunctive predicates #2") { @@ -193,7 +194,7 @@ class FilterPushdownSuite extends OptimizerTest { left.join(right, condition = Some("x.b".attr === "y.b".attr)) .analyze - comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer)) + comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer)) } test("joins: conjunctive predicates #3") { @@ -216,6 +217,6 @@ class FilterPushdownSuite extends OptimizerTest { condition = Some("z.a".attr === "x.b".attr)) .analyze - comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer)) + comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 79dc72233a181..af35c919df308 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -32,7 +32,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan, LowerCaseSchema} import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand} import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution._ @@ -108,7 +108,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /* A catalyst metadata catalog that points to the Hive Metastore. */ @transient - override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog + override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog { + override def lookupRelation( + databaseName: Option[String], + tableName: String, + alias: Option[String] = None): LogicalPlan = { + + LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias)) + } + } /* An analyzer that uses the Hive metastore. */ @transient diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index a5db283765c27..1667a217297b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde2.Deserializer -import org.apache.spark.sql.catalyst.analysis.Catalog + +import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateAnalysisOperators} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ @@ -96,7 +97,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { createTable(databaseName, tableName, child.output) InsertIntoTable( - lookupRelation(Some(databaseName), tableName, None).asInstanceOf[BaseRelation], + EliminateAnalysisOperators( + lookupRelation(Some(databaseName), tableName, None)), Map.empty, child, overwrite = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index 996bd4efecd4c..4bdea214677ad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -19,6 +19,11 @@ package org.apache.spark.sql package hive package execution +import TestHive._ + +case class Data(a: Int, B: Int, n: Nested) +case class Nested(a: Int, B: Int) + /** * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. */ @@ -47,6 +52,14 @@ class HiveResolutionSuite extends HiveComparisonTest { createQueryTest("alias.*", "SELECT a.* FROM src a ORDER BY key LIMIT 1") + test("case insensitivity with scala reflection") { + // Test resolution with Scala Reflection + TestHive.sparkContext.parallelize(Data(1, 2, Nested(1,2)) :: Nil) + .registerAsTable("caseSensitivityTest") + + sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest") + } + /** * Negative examples. Currently only left here for documentation purposes. * TODO(marmbrus): Test that catalyst fails on these queries.