Skip to content

Commit

Permalink
spark sql support
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Oct 24, 2017
1 parent 62f4e87 commit 1da8512
Show file tree
Hide file tree
Showing 23 changed files with 970 additions and 41 deletions.
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,37 @@ case class MyDao(c: MyContext) extends MySchema {
}
```

## Spark Context

Quill provides a context that allow users to run queries on top of Spark's SQL engine. Example usage:

```
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import io.getquill.QuillSparkContext._
// Replace by your spark sql context
implicit val sqlContext =
SparkSession
.builder()
.master("local")
.appName("spark test")
.getOrCreate()
.sqlContext
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = List(Person("John", 22)).toQuery
val q = quote {
people.filter(_.name == "John")
}
val filtered: Dataset[Person] = run(q)
```

## SQL Contexts

Example:
Expand Down
34 changes: 25 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ import com.typesafe.sbt.SbtScalariform.ScalariformKeys
import scalariform.formatter.preferences._
import sbtrelease.ReleasePlugin

lazy val scalaVersionProperty = Option(System.getProperty("scalaVersion"))

lazy val modules = Seq[sbt.ClasspathDep[sbt.ProjectReference]](
`quill-core-jvm`, `quill-core-js`, `quill-sql-jvm`, `quill-sql-js`,
`quill-jdbc`, `quill-finagle-mysql`, `quill-finagle-postgres`, `quill-async`,
`quill-async-mysql`, `quill-async-postgres`, `quill-cassandra`, `quill-orientdb`
) ++
Seq[sbt.ClasspathDep[sbt.ProjectReference]](`quill-spark`)
.filter(_ => scalaVersionProperty.map(_.startsWith("2.11")).getOrElse(true))

lazy val `quill` =
(project in file("."))
.settings(tutSettings ++ commonSettings)
.settings(`tut-settings`:_*)
.dependsOn(
`quill-core-jvm`, `quill-core-js`, `quill-sql-jvm`, `quill-sql-js`,
`quill-jdbc`, `quill-finagle-mysql`, `quill-finagle-postgres`, `quill-async`,
`quill-async-mysql`, `quill-async-postgres`, `quill-cassandra`, `quill-orientdb`
).aggregate(
`quill-core-jvm`, `quill-core-js`, `quill-sql-jvm`, `quill-sql-js`,
`quill-jdbc`, `quill-finagle-mysql`, `quill-finagle-postgres`, `quill-async`,
`quill-async-mysql`, `quill-async-postgres`, `quill-cassandra`, `quill-orientdb`
)
.aggregate(modules.map(_.project): _*)
.dependsOn(modules: _*)

lazy val superPure = new org.scalajs.sbtplugin.cross.CrossType {
def projectDir(crossBase: File, projectType: String): File =
Expand Down Expand Up @@ -74,6 +77,19 @@ lazy val `quill-jdbc` =
)
.dependsOn(`quill-sql-jvm` % "compile->compile;test->test")

lazy val `quill-spark` =
(project in file("quill-spark"))
.settings(commonSettings: _*)
.settings(mimaSettings: _*)
.settings(
crossScalaVersions := Seq("2.11.11"),
fork in Test := true,
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.2.0"
)
)
.dependsOn(`quill-sql-jvm` % "compile->compile;test->test")

lazy val `quill-finagle-mysql` =
(project in file("quill-finagle-mysql"))
.settings(commonSettings: _*)
Expand Down
6 changes: 3 additions & 3 deletions build/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ set -e # Any subsequent(*) commands which fail will cause the shell script to ex
chown root ~/.ssh/config
chmod 644 ~/.ssh/config

SBT_CMD="sbt clean"
SBT_CMD_2_11=" ++2.11.11 coverage test tut coverageReport coverageAggregate checkUnformattedFiles"
SBT_CMD_2_12=" ++2.12.2 test"
SBT_CMD="sbt"
SBT_CMD_2_11=" -DscalaVersion=2.11.11 ++2.11.11 clean coverage test tut coverageReport coverageAggregate checkUnformattedFiles"
SBT_CMD_2_12=" -DscalaVersion=2.12.3 ++2.12.3 clean test"
SBT_PUBLISH=" coverageOff publish"

if [[ $SCALA_VERSION == "2.11" ]]
Expand Down
21 changes: 15 additions & 6 deletions quill-core/src/main/scala/io/getquill/norm/AttachToEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@ import io.getquill.ast._

object AttachToEntity {

def apply(f: (Query, Ident) => Query, alias: Option[Ident] = None)(q: Query): Query =
private object IsEntity {
def unapply(q: Ast): Option[Ast] =
q match {
case q: Entity => Some(q)
case q: Infix => Some(q)
case _ => None
}
}

def apply(f: (Ast, Ident) => Query, alias: Option[Ident] = None)(q: Ast): Ast =
q match {

case Map(a: Entity, b, c) => Map(f(a, b), b, c)
case FlatMap(a: Entity, b, c) => FlatMap(f(a, b), b, c)
case Filter(a: Entity, b, c) => Filter(f(a, b), b, c)
case SortBy(a: Entity, b, c, d) => SortBy(f(a, b), b, c, d)
case Map(IsEntity(a), b, c) => Map(f(a, b), b, c)
case FlatMap(IsEntity(a), b, c) => FlatMap(f(a, b), b, c)
case Filter(IsEntity(a), b, c) => Filter(f(a, b), b, c)
case SortBy(IsEntity(a), b, c, d) => SortBy(f(a, b), b, c, d)

case Map(_: GroupBy, _, _) | _: Union | _: UnionAll | _: Join | _: FlatJoin => f(q, alias.getOrElse(Ident("x")))

Expand All @@ -24,7 +33,7 @@ object AttachToEntity {
case Aggregation(op, a: Query) => Aggregation(op, apply(f, alias)(a))
case Distinct(a: Query) => Distinct(apply(f, alias)(a))

case e: Entity => f(e, alias.getOrElse(Ident("x")))
case IsEntity(q) => f(q, alias.getOrElse(Ident("x")))

case other => fail(s"Can't find an 'Entity' in '$q'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,21 @@ private case class AvoidAliasConflict(state: collection.Set[Ident])

object Unaliased {

private def isUnaliased(q: Query): Boolean =
private def isUnaliased(q: Ast): Boolean =
q match {
case Nested(q: Query) => isUnaliased(q)
case Take(q: Query, _) => isUnaliased(q)
case Drop(q: Query, _) => isUnaliased(q)
case Aggregation(_, q: Query) => isUnaliased(q)
case Distinct(q: Query) => isUnaliased(q)
case _: Entity => true
case _: Nested | _: Take | _: Drop | _: Aggregation |
_: Distinct | _: FlatMap | _: Map | _: Filter | _: SortBy |
_: GroupBy | _: Union | _: UnionAll | _: Join | _: FlatJoin =>
false
case _: Entity | _: Infix => true
case _ => false
}

def unapply(q: Ast): Option[Query] =
def unapply(q: Ast): Option[Ast] =
q match {
case q: Query if (isUnaliased(q)) => Some(q)
case _ => None
case q if (isUnaliased(q)) => Some(q)
case _ => None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ case class Dealias(state: Option[Ident]) extends StatefulTransformer[Option[Iden
val (bn, _) = apply(b)
(UnionAll(an, bn), Dealias(None))
case Join(t, a, b, iA, iB, o) =>
val ((an, iAn, on), ont) = dealias(a, iA, o)((_, _, _))
val ((bn, iBn, onn), _) = ont.dealias(b, iB, on)((_, _, _))
val ((an, iAn, on), _) = dealias(a, iA, o)((_, _, _))
val ((bn, iBn, onn), _) = dealias(b, iB, on)((_, _, _))
(Join(t, an, bn, iAn, iBn, onn), Dealias(None))
case FlatJoin(t, a, iA, o) =>
val ((an, iAn, on), ont) = dealias(a, iA, o)((_, _, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ import io.getquill.ast.Constant
import io.getquill.ast.Ident
import io.getquill.ast.Map
import io.getquill.ast.SortBy
import io.getquill.testContext.implicitOrd
import io.getquill.testContext.qr1
import io.getquill.testContext.qr2
import io.getquill.testContext.quote
import io.getquill.testContext.unquote
import io.getquill.testContext._

class AttachToEntitySpec extends Spec {

Expand Down Expand Up @@ -90,6 +86,84 @@ class AttachToEntitySpec extends Spec {
}
}

val iqr1 = quote {
infix"$qr1".as[Query[TestEntity]]
}

"attaches clause to the root of the query (infix)" - {
"query is the entity" in {
val n = quote {
iqr1.sortBy(x => 1)
}
attachToEntity(iqr1.ast) mustEqual n.ast
}
"query is a composition" - {
"map" in {
val q = quote {
iqr1.filter(t => t.i == 1).map(t => t.s)
}
val n = quote {
iqr1.sortBy(t => 1).filter(t => t.i == 1).map(t => t.s)
}
attachToEntity(q.ast) mustEqual n.ast
}
"flatMap" in {
val q = quote {
iqr1.filter(t => t.i == 1).flatMap(t => qr2)
}
val n = quote {
iqr1.sortBy(t => 1).filter(t => t.i == 1).flatMap(t => qr2)
}
attachToEntity(q.ast) mustEqual n.ast
}
"filter" in {
val q = quote {
iqr1.filter(t => t.i == 1).filter(t => t.s == "s1")
}
val n = quote {
iqr1.sortBy(t => 1).filter(t => t.i == 1).filter(t => t.s == "s1")
}
attachToEntity(q.ast) mustEqual n.ast
}
"sortBy" in {
val q = quote {
iqr1.sortBy(t => t.s)
}
val n = quote {
iqr1.sortBy(t => 1).sortBy(t => t.s)
}
attachToEntity(q.ast) mustEqual n.ast
}
"take" in {
val q = quote {
iqr1.sortBy(b => b.s).take(1)
}
val n = quote {
iqr1.sortBy(b => 1).sortBy(b => b.s).take(1)
}
attachToEntity(q.ast) mustEqual n.ast
}
"drop" in {
val q = quote {
iqr1.sortBy(b => b.s).drop(1)
}
val n = quote {
iqr1.sortBy(b => 1).sortBy(b => b.s).drop(1)
}
attachToEntity(q.ast) mustEqual n.ast
}
"distinct" in {
val q = quote {
iqr1.sortBy(b => b.s).drop(1).distinct
}
val n = quote {
iqr1.sortBy(b => 1).sortBy(b => b.s).drop(1).distinct
}
attachToEntity(q.ast) mustEqual n.ast
}
}
}

"falls back to the query if it's not possible to flatten it" - {
"union" in {
val q = quote {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package io.getquill.norm.capture

import io.getquill.Spec
import io.getquill.testContext.implicitOrd
import io.getquill.testContext.qr1
import io.getquill.testContext.qr2
import io.getquill.testContext.qr3
import io.getquill.testContext.quote
import io.getquill.testContext.unquote
import io.getquill.testContext._

class AvoidAliasConflictSpec extends Spec {

Expand Down Expand Up @@ -160,6 +155,19 @@ class AvoidAliasConflictSpec extends Spec {
}
}

"considers infix as unaliased" in {
val i = quote {
infix"$qr1".as[Query[TestEntity]]
}
val q = quote {
i.flatMap(a => qr2.flatMap(a => qr3))
}
val n = quote {
i.flatMap(a => qr2.flatMap(a1 => qr3))
}
AvoidAliasConflict(q.ast) mustEqual n.ast
}

"takes in consideration the aliases already defined" - {
"flatMap" in {
val q = quote {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class DealiasSpec extends Spec {
Dealias(q.ast) mustEqual n.ast
}
}
"outer join" - {
"join" - {
"left" in {
val q = quote {
qr1.filter(a => a.s == "s").map(b => b.s).fullJoin(qr1).on((a, b) => a == b.s)
Expand Down Expand Up @@ -147,6 +147,12 @@ class DealiasSpec extends Spec {
}
Dealias(q.ast) mustEqual n.ast
}
"self join" in {
val q = quote {
qr1.join(qr1).on((a, b) => a.i == b.i)
}
Dealias(q.ast) mustEqual q.ast
}
}
"entity" in {
Dealias(qr1.ast) mustEqual qr1.ast
Expand Down
Loading

0 comments on commit 1da8512

Please sign in to comment.