Skip to content

Commit

Permalink
[SQL] Minor fixes.
Browse files Browse the repository at this point in the history
Author: Michael Armbrust <michael@databricks.com>

Closes #315 from marmbrus/minorFixes and squashes the following commits:

b23a15d [Michael Armbrust] fix scaladoc
11062ac [Michael Armbrust] Fix registering "SELECT *" queries as tables and caching them.  As some tests for this and self-joins.
3997dc9 [Michael Armbrust] Move Row extractor to catalyst.
208bf5e [Michael Armbrust] More idiomatic naming of DSL functions. * subquery => as * for join condition => on, i.e., `r.join(s, condition = 'a == 'b)` =>`r.join(s, on = 'a == 'b)`
87211ce [Michael Armbrust] Correctly handle self joins of in-memory cached tables.
69e195e [Michael Armbrust] Change != to !== in the DSL since != will always translate to != on Any.
01f2dd5 [Michael Armbrust] Correctly assign aliases to tables in SqlParser.
  • Loading branch information
marmbrus authored and rxin committed Apr 5, 2014
1 parent 198892f commit d956cc2
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class SqlParser extends StandardTokenParsers {

protected lazy val relationFactor: Parser[LogicalPlan] =
ident ~ (opt(AS) ~> opt(ident)) ^^ {
case ident ~ alias => UnresolvedRelation(alias, ident)
case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
} |
"(" ~> query ~ ")" ~ opt(AS) ~ ident ^^ { case s ~ _ ~ _ ~ a => Subquery(a, s) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ package object dsl {
def > (other: Expression) = GreaterThan(expr, other)
def >= (other: Expression) = GreaterThanOrEqual(expr, other)
def === (other: Expression) = Equals(expr, other)
def != (other: Expression) = Not(Equals(expr, other))
def !== (other: Expression) = Not(Equals(expr, other))

def like(other: Expression) = Like(expr, other)
def rlike(other: Expression) = RLike(expr, other)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.types.NativeType

object Row {
/**
* This method can be used to extract fields from a [[Row]] object in a pattern match. Example:
* {{{
* import org.apache.spark.sql._
*
* val pairs = sql("SELECT key, value FROM src").rdd.map {
* case Row(key: Int, value: String) =>
* key -> value
* }
* }}}
*/
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
}

/**
* Represents one row of output from a relational operator. Allows both generic access by ordinal,
* which will incur boxing overhead for primitives, as well as native primitive access.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode {
a.nullable)(
a.exprId,
a.qualifiers)
case other => other
}

def references = Set.empty
Expand Down
15 changes: 1 addition & 14 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,5 @@ package object sql {

type Row = catalyst.expressions.Row

object Row {
/**
* This method can be used to extract fields from a [[Row]] object in a pattern match. Example:
* {{{
* import org.apache.spark.sql._
*
* val pairs = sql("SELECT key, value FROM src").rdd.map {
* case Row(key: Int, value: String) =>
* key -> value
* }
* }}}
*/
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
}
val Row = catalyst.expressions.Row
}
16 changes: 8 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,17 @@ class SchemaRDD(
*
* @param otherPlan the [[SchemaRDD]] that should be joined with this one.
* @param joinType One of `Inner`, `LeftOuter`, `RightOuter`, or `FullOuter`. Defaults to `Inner.`
* @param condition An optional condition for the join operation. This is equivilent to the `ON`
* clause in standard SQL. In the case of `Inner` joins, specifying a
* `condition` is equivilent to adding `where` clauses after the `join`.
* @param on An optional condition for the join operation. This is equivilent to the `ON`
* clause in standard SQL. In the case of `Inner` joins, specifying a
* `condition` is equivilent to adding `where` clauses after the `join`.
*
* @group Query
*/
def join(
otherPlan: SchemaRDD,
joinType: JoinType = Inner,
condition: Option[Expression] = None): SchemaRDD =
new SchemaRDD(sqlContext, Join(logicalPlan, otherPlan.logicalPlan, joinType, condition))
on: Option[Expression] = None): SchemaRDD =
new SchemaRDD(sqlContext, Join(logicalPlan, otherPlan.logicalPlan, joinType, on))

/**
* Sorts the results by the given expressions.
Expand Down Expand Up @@ -195,14 +195,14 @@ class SchemaRDD(
* with the same name, for example, when peforming self-joins.
*
* {{{
* val x = schemaRDD.where('a === 1).subquery('x)
* val y = schemaRDD.where('a === 2).subquery('y)
* val x = schemaRDD.where('a === 1).as('x)
* val y = schemaRDD.where('a === 2).as('y)
* x.join(y).where("x.a".attr === "y.a".attr),
* }}}
*
* @group Query
*/
def subquery(alias: Symbol) =
def as(alias: Symbol) =
new SchemaRDD(sqlContext, Subquery(alias.name, logicalPlan))

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan

abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
self: Product =>
Expand Down Expand Up @@ -69,6 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case InMemoryColumnarTableScan(output, child) =>
InMemoryColumnarTableScan(output.map(_.newInstance), child)
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,17 @@ class CachedTableSuite extends QueryTest {
TestSQLContext.uncacheTable("testData")
}
}

test("SELECT Star Cached Table") {
TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar")
TestSQLContext.cacheTable("selectStar")
TestSQLContext.sql("SELECT * FROM selectStar")
TestSQLContext.uncacheTable("selectStar")
}

test("Self-join cached") {
TestSQLContext.cacheTable("testData")
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key")
TestSQLContext.uncacheTable("testData")
}
}
16 changes: 8 additions & 8 deletions sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ class DslQuerySuite extends QueryTest {
}

test("inner join, where, multiple matches") {
val x = testData2.where('a === 1).subquery('x)
val y = testData2.where('a === 1).subquery('y)
val x = testData2.where('a === 1).as('x)
val y = testData2.where('a === 1).as('y)
checkAnswer(
x.join(y).where("x.a".attr === "y.a".attr),
(1,1,1,1) ::
Expand All @@ -131,17 +131,17 @@ class DslQuerySuite extends QueryTest {
}

test("inner join, no matches") {
val x = testData2.where('a === 1).subquery('x)
val y = testData2.where('a === 2).subquery('y)
val x = testData2.where('a === 1).as('x)
val y = testData2.where('a === 2).as('y)
checkAnswer(
x.join(y).where("x.a".attr === "y.a".attr),
Nil)
}

test("big inner join, 4 matches per row") {
val bigData = testData.unionAll(testData).unionAll(testData).unionAll(testData)
val bigDataX = bigData.subquery('x)
val bigDataY = bigData.subquery('y)
val bigDataX = bigData.as('x)
val bigDataY = bigData.as('y)

checkAnswer(
bigDataX.join(bigDataY).where("x.key".attr === "y.key".attr),
Expand Down Expand Up @@ -181,8 +181,8 @@ class DslQuerySuite extends QueryTest {
}

test("full outer join") {
val left = upperCaseData.where('N <= 4).subquery('left)
val right = upperCaseData.where('N >= 3).subquery('right)
val left = upperCaseData.where('N <= 4).as('left)
val right = upperCaseData.where('N >= 3).as('right)

checkAnswer(
left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
}

test("self-join parquet files") {
val x = ParquetTestData.testData.subquery('x)
val y = ParquetTestData.testData.subquery('y)
val x = ParquetTestData.testData.as('x)
val y = ParquetTestData.testData.as('y)
val query = x.join(y).where("x.myint".attr === "y.myint".attr)

// Check to make sure that the attributes from either side of the join have unique expression
Expand Down

0 comments on commit d956cc2

Please sign in to comment.