Skip to content

Commit

Permalink
Add streaming to quill-ndbc-postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
kampka committed Aug 17, 2022
1 parent 38f7f97 commit 8aa4d3a
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 2 deletions.
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3807,6 +3807,20 @@ Async support via [NDBC driver](https://ndbc.io/) is available with Postgres dat

### quill-ndbc-postgres

#### running and streaming

NDBC supports both single result queries and streaming:

```scala
val q1 = quote { query[Country].filter(_.code == "GBR") }

// Select all at once
run(q1)

// Stream in chunks of 16
stream(q1, 16)
```

#### transactions

Transaction support is provided out of the box by NDBC:
Expand Down Expand Up @@ -4785,4 +4799,4 @@ The project was created having Philip Wadler's talk ["A practical theory of lang

* [A Practical Theory of Language-Integrated Query](http://homepages.inf.ed.ac.uk/slindley/papers/practical-theory-of-linq.pdf)
* [Everything old is new again: Quoted Domain Specific Languages](http://homepages.inf.ed.ac.uk/wadler/papers/qdsl/qdsl.pdf)
* [The Flatter, the Better](http://db.inf.uni-tuebingen.de/staticfiles/publications/the-flatter-the-better.pdf)
* [The Flatter, the Better](http://db.inf.uni-tuebingen.de/staticfiles/publications/the-flatter-the-better.pdf)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import scala.BigDecimal
import scala.jdk.CollectionConverters._
import scala.math.BigDecimal.int2bigDecimal
import io.getquill.util.PrintMac
import java.util.LinkedList
import org.scalatest.concurrent.Eventually._
import org.reactivestreams.Subscription
import org.reactivestreams.Subscriber

class QueryResultTypeNdbcPostgresSpec extends QueryResultTypeSpec {

Expand Down Expand Up @@ -104,4 +108,31 @@ class QueryResultTypeNdbcPostgresSpec extends QueryResultTypeSpec {
get(context.run(isEmpty)) mustEqual false
}
}

"streaming" - {

class CollectingSubscriber extends Subscriber[Product] {
val received = new LinkedList[Product]()
private var subscription: Subscription = null
override def onSubscribe(s: Subscription): Unit = {
subscription = s
subscription.request(1)
}
override def onComplete(): Unit = {}
override def onNext(p: Product): Unit = {
received.add(p)
subscription.request(1)
}
override def onError(t: Throwable) = fail(t)
}

"select all" in {
val subscriber = new CollectingSubscriber()
context.stream(selectAll).subscribe(subscriber)

eventually {
subscriber.received must contain theSameElementsAs (products)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import io.trane.future.scala.{ Await, Future, Promise }
import io.trane.ndbc.{ DataSource, PreparedStatement, Row }

import scala.concurrent.duration.Duration
import org.reactivestreams.Publisher
import io.getquill.context.ContextVerbStream

abstract class NdbcContext[I <: SqlIdiom, +N <: NamingStrategy, P <: PreparedStatement, R <: Row](
override val idiom: I, override val naming: N, val dataSource: DataSource[P, R]
)
extends NdbcContextBase[I, N, P, R]
with ContextVerbStream[I, N]
with ContextTranslateProto {

override type Result[T] = Future[T]
Expand All @@ -22,6 +25,8 @@ abstract class NdbcContext[I <: SqlIdiom, +N <: NamingStrategy, P <: PreparedSta
override type RunBatchActionResult = List[Long]
override type RunBatchActionReturningResult[T] = List[T]

override type StreamResult[T] = Publisher[T]

override implicit protected val resultEffect: NdbcContextBase.ContextEffect[Future, Unit] = NdbcContext.ContextEffect

override type TranslateResult[T] = Future[T]
Expand Down Expand Up @@ -55,6 +60,23 @@ abstract class NdbcContext[I <: SqlIdiom, +N <: NamingStrategy, P <: PreparedSta
because previously it wasn't done here either */
override def withDataSource[T](f: DataSource[P, R] => Future[T]): Future[T] = f(dataSource)

def streamQuery[T](fetchSize: Option[Index], sql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Publisher[T] = {
// Can be inlined to (row) => extractor(row, ()) after dropping scala 2.11
val extract = new java.util.function.Function[R, T] {
override def apply(row: R): T = {
extractor(row, ())
}
}

// TODO: Do we need to set ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY?
val stmt = createPreparedStatement(sql)
val (params, ps) = prepare(stmt, ())
logger.logQuery(sql, params)

dataSource.stream(ps).map(extract)

}

def close(): Unit = {
dataSource.close()
()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.Duration
import scala.language.{ higherKinds, implicitConversions }
import scala.util.Try
import io.getquill.context.ContextVerbStream

object NdbcContextBase {
trait ContextEffect[F[_], FutureExecutionContext_] {
Expand Down Expand Up @@ -46,13 +47,14 @@ object NdbcContextBase {
}

trait NdbcContextBase[+Idiom <: SqlIdiom, +Naming <: NamingStrategy, P <: PreparedStatement, R <: Row]
extends SqlContext[Idiom, Naming] {
extends SqlContext[Idiom, Naming] with ContextVerbStream[Idiom, Naming] {

private[getquill] val logger = ContextLogger(classOf[NdbcContext[_, _, _, _]])

final override type PrepareRow = P
final override type ResultRow = R
override type Session = Unit

type Runner = Unit

protected implicit val resultEffect: NdbcContextBase.ContextEffect[Result, _]
Expand Down

0 comments on commit 8aa4d3a

Please sign in to comment.