Skip to content

Commit

Permalink
[SPARK-6199] [SQL] Support CTE in HiveContext and SQLContext
Browse files Browse the repository at this point in the history
Author: haiyang <huhaiyang@huawei.com>

Closes apache#4929 from haiyangsea/cte and squashes the following commits:

220b67d [haiyang] add golden files for cte test
d3c7681 [haiyang] Merge branch 'master' into cte-repair
0ba2070 [haiyang] modify code style
9ce6b58 [haiyang] fix conflict
ff74741 [haiyang] add comment for With plan
0d56af4 [haiyang] code indention
776a440 [haiyang] add comments for resolve relation strategy
2fccd7e [haiyang] add comments for resolve relation strategy
241bbe2 [haiyang] fix cte problem of view
e9e1237 [haiyang] fix test case problem
614182f [haiyang] add test cases for CTE feature
32e415b [haiyang] add comment
1cc8c15 [haiyang] support with
03f1097 [haiyang] support with
e960099 [haiyang] support with
9aaa874 [haiyang] support with
0566978 [haiyang] support with
a99ecd2 [haiyang] support with
c3fa4c2 [haiyang] support with
3b6077f [haiyang] support with
5f8abe3 [haiyang] support with
4572b05 [haiyang] support with
f801f54 [haiyang] support with
  • Loading branch information
haiyang authored and marmbrus committed Apr 12, 2015
1 parent 7dbd371 commit 2f53588
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected val UPPER = Keyword("UPPER")
protected val WHEN = Keyword("WHEN")
protected val WHERE = Keyword("WHERE")
protected val WITH = Keyword("WITH")

protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
Expand All @@ -127,6 +128,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
| UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
| insert
| cte
)

protected lazy val select: Parser[LogicalPlan] =
Expand Down Expand Up @@ -156,6 +158,11 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
}

protected lazy val cte: Parser[LogicalPlan] =
WITH ~> rep1sep(ident ~ ( AS ~ "(" ~> start <~ ")"), ",") ~ start ^^ {
case r ~ s => With(s, r.map({case n ~ s => (n, Subquery(n, s))}).toMap)
}

protected lazy val projection: Parser[Expression] =
expression ~ (AS.? ~> ident.?) ^^ {
case e ~ a => a.fold(e)(Alias(e, _)())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,36 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
def getTable(u: UnresolvedRelation): LogicalPlan = {
def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]) = {
try {
catalog.lookupRelation(u.tableIdentifier, u.alias)
// In hive, if there is same table name in database and CTE definition,
// hive will use the table in database, not the CTE one.
// Taking into account the reasonableness and the implementation complexity,
// here use the CTE definition first, check table name only and ignore database name
cteRelations.get(u.tableIdentifier.last)
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
.getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
i.copy(
table = EliminateSubQueries(getTable(u)))
case u: UnresolvedRelation =>
getTable(u)
def apply(plan: LogicalPlan): LogicalPlan = {
val (realPlan, cteRelations) = plan match {
// TODO allow subquery to define CTE
// Add cte table to a temp relation map,drop `with` plan and keep its child
case With(child, relations) => (child, relations)
case other => (other, Map.empty[String, LogicalPlan])
}

realPlan transform {
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
i.copy(
table = EliminateSubQueries(getTable(u, cteRelations)))
case u: UnresolvedRelation =>
getTable(u, cteRelations)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ case class CreateTableAsSelect[T](
override lazy val resolved: Boolean = databaseName != None && childrenResolved
}

/**
* A container for holding named common table expressions (CTEs) and a query plan.
* This operator will be removed during analysis and the relations will be substituted into child.
* @param child The final query of this CTE.
* @param cteRelations Queries that this CTE defined,
* key is the alias of the CTE definition,
* value is the CTE definition.
*/
case class With(child: LogicalPlan, cteRelations: Map[String, Subquery]) extends UnaryNode {
override def output = child.output
}

case class WriteToFile(
path: String,
child: LogicalPlan) extends UnaryNode {
Expand Down
14 changes: 14 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
mapData.collect().take(1).map(Row.fromTuple).toSeq)
}

test("CTE feature") {
checkAnswer(
sql("with q1 as (select * from testData limit 10) select * from q1"),
testData.take(10).toSeq)

checkAnswer(
sql("""
|with q1 as (select * from testData where key= '5'),
|q2 as (select * from testData where key = '4')
|select * from q1 union all select * from q2""".stripMargin),
Row(5, "5") :: Row(4, "4") :: Nil)

}

test("date row") {
checkAnswer(sql(
"""select cast("2015-01-28" as date) from testData limit 1"""),
Expand Down
27 changes: 21 additions & 6 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,23 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case Token("TOK_QUERY", queryArgs)
if Seq("TOK_FROM", "TOK_INSERT").contains(queryArgs.head.getText) =>

val (fromClause: Option[ASTNode], insertClauses) = queryArgs match {
case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
(Some(args.head), insertClauses)
case Token("TOK_INSERT", _) :: Nil => (None, queryArgs)
}
val (fromClause: Option[ASTNode], insertClauses, cteRelations) =
queryArgs match {
case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
// check if has CTE
insertClauses.last match {
case Token("TOK_CTE", cteClauses) =>
val cteRelations = cteClauses.map(node => {
val relation = nodeToRelation(node).asInstanceOf[Subquery]
(relation.alias, relation)
}).toMap
(Some(args.head), insertClauses.init, Some(cteRelations))

case _ => (Some(args.head), insertClauses, None)
}

case Token("TOK_INSERT", _) :: Nil => (None, queryArgs, None)
}

// Return one query for each insert clause.
val queries = insertClauses.map { case Token("TOK_INSERT", singleInsert) =>
Expand Down Expand Up @@ -794,7 +806,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}

// If there are multiple INSERTS just UNION them together into on query.
queries.reduceLeft(Union)
val query = queries.reduceLeft(Union)

// return With plan if there is CTE
cteRelations.map(With(query, _)).getOrElse(query)

case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
5
5
5
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
val_4
val_5
val_5
val_5
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
4
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,21 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
createQueryTest("select null from table",
"SELECT null FROM src LIMIT 1")

createQueryTest("CTE feature #1",
"with q1 as (select key from src) select * from q1 where key = 5")

createQueryTest("CTE feature #2",
"""with q1 as (select * from src where key= 5),
|q2 as (select * from src s2 where key = 4)
|select value from q1 union all select value from q2
""".stripMargin)

createQueryTest("CTE feature #3",
"""with q1 as (select key from src)
|from q1
|select * where key = 4
""".stripMargin)

test("predicates contains an empty AttributeSet() references") {
sql(
"""
Expand Down

0 comments on commit 2f53588

Please sign in to comment.