Skip to content

Commit

Permalink
Support for resolving mixed case fields from a reflected schema using…
Browse files Browse the repository at this point in the history
… HiveQL.
  • Loading branch information
marmbrus committed Mar 24, 2014
1 parent 5aa5035 commit 15e5265
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
typeCoercionRules :_*)
typeCoercionRules :_*),
Batch("AnalysisOperators", fixedPoint,
EliminateAnalysisOperators)
)

/**
Expand All @@ -80,6 +82,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
case s: Star => s.copy(table = s.table.map(_.toLowerCase))
case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)
case Alias(c, name) => Alias(c, name.toLowerCase)()
case GetField(c, name) => GetField(c, name.toLowerCase)
}
}
}
Expand Down Expand Up @@ -184,3 +187,17 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
exprs.collect { case _: Star => true }.nonEmpty
}
}

/**
* Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are
* only required to provide scoping information for attributes and can be removed once analysis is
* complete. Similarly, this node also removes
* [[catalyst.plans.logical.LowerCaseSchema LowerCaseSchema]] operators.
*/
object EliminateAnalysisOperators extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Subquery(_, child) => child
case LowerCaseSchema(child) => child
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,16 @@ import org.apache.spark.sql.catalyst.types._

object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueries) ::
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification,
SimplifyCasts) ::
Batch("Filter Pushdown", Once,
EliminateSubqueries,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin) :: Nil
}

/**
* Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are
* only required to provide scoping information for attributes and can be removed once analysis is
* complete.
*/
object EliminateSubqueries extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Subquery(_, child) => child
}
}

/**
* Replaces [[catalyst.expressions.Expression Expressions]] that can be statically evaluated with
* equivalent [[catalyst.expressions.Literal Literal]] values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package plans
package logical

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
def output = projectList.map(_.toAttribute)
Expand Down Expand Up @@ -86,7 +87,7 @@ case class Join(
}

case class InsertIntoTable(
table: BaseRelation,
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
overwrite: Boolean)
Expand Down Expand Up @@ -141,6 +142,33 @@ case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
def references = Set.empty
}

/**
* Converts the schema of `child` to all lowercase, together with LowercaseAttributeReferences
* this allows for optional case insensitive attribute resolution. This node can be elided after
* analysis.
*/
case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode {
protected def lowerCaseSchema(dataType: DataType): DataType = dataType match {
case StructType(fields) =>
StructType(fields.map(f =>
StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable)))
case ArrayType(elemType) => ArrayType(lowerCaseSchema(elemType))
case otherType => otherType
}

val output = child.output.map {
case a: AttributeReference =>
AttributeReference(
a.name.toLowerCase,
lowerCaseSchema(a.dataType),
a.nullable)(
a.exprId,
a.qualifiers)
}

def references = Set.empty
}

case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: LogicalPlan)
extends UnaryNode {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@ package org.apache.spark.sql
package catalyst
package optimizer

import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types.IntegerType

// For implicit conversions
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._

class ConstantFoldingSuite extends OptimizerTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueries) ::
Batch("AnalysisNodes", Once,
EliminateAnalysisOperators) ::
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.apache.spark.sql
package catalyst
package optimizer


import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

Expand All @@ -14,9 +16,8 @@ class FilterPushdownSuite extends OptimizerTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueries) ::
EliminateAnalysisOperators) ::
Batch("Filter Pushdown", Once,
EliminateSubqueries,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin) :: Nil
Expand Down Expand Up @@ -155,7 +156,7 @@ class FilterPushdownSuite extends OptimizerTest {
}
val optimized = Optimize(originalQuery.analyze)

comparePlans(optimizer.EliminateSubqueries(originalQuery.analyze), optimized)
comparePlans(analysis.EliminateAnalysisOperators(originalQuery.analyze), optimized)
}

test("joins: conjunctive predicates") {
Expand All @@ -174,7 +175,7 @@ class FilterPushdownSuite extends OptimizerTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze

comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}

test("joins: conjunctive predicates #2") {
Expand All @@ -193,7 +194,7 @@ class FilterPushdownSuite extends OptimizerTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze

comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}

test("joins: conjunctive predicates #3") {
Expand All @@ -216,6 +217,6 @@ class FilterPushdownSuite extends OptimizerTest {
condition = Some("z.a".attr === "x.b".attr))
.analyze

comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -108,7 +108,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog
override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
override def lookupRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None): LogicalPlan = {

LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
}
}

/* An analyzer that uses the Hive metastore. */
@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.Deserializer

import org.apache.spark.sql.catalyst.analysis.Catalog

import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateAnalysisOperators}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -96,7 +97,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
createTable(databaseName, tableName, child.output)

InsertIntoTable(
lookupRelation(Some(databaseName), tableName, None).asInstanceOf[BaseRelation],
EliminateAnalysisOperators(
lookupRelation(Some(databaseName), tableName, None)),
Map.empty,
child,
overwrite = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package org.apache.spark.sql
package hive
package execution

import TestHive._

case class Data(a: Int, B: Int, n: Nested)
case class Nested(a: Int, B: Int)

/**
* A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
*/
Expand Down Expand Up @@ -47,6 +52,14 @@ class HiveResolutionSuite extends HiveComparisonTest {
createQueryTest("alias.*",
"SELECT a.* FROM src a ORDER BY key LIMIT 1")

test("case insensitivity with scala reflection") {
// Test resolution with Scala Reflection
TestHive.sparkContext.parallelize(Data(1, 2, Nested(1,2)) :: Nil)
.registerAsTable("caseSensitivityTest")

sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest")
}

/**
* Negative examples. Currently only left here for documentation purposes.
* TODO(marmbrus): Test that catalyst fails on these queries.
Expand Down

0 comments on commit 15e5265

Please sign in to comment.