Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ZStream support #53

Merged
merged 10 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,26 @@ val es: ErrorStrategies =



### Streaming

When the wrapped framework handle streaming, you can convert the framework's stream to a `ZStream` using `tzioStream`.
To provide a `Connection` to the ZIO stream, you can either consume the stream into a ZIO first (then use the same functions as above), or use the `Database`'s streaming methods.

The methods `transactionOrDieStream` and `autoCommitStream` work in the same way as the similar, non-stream method.
Note that for transactions, only the `OrDie` variant exists: this is because ZIO's acquire-release mechanism for stream does not allow to pass errors that occur during the acquire-release phase in the error channel.
Defects in the stream due to connection errors can only be caught after the `ZStream` has been consumed into a `ZIO`.

```scala
import io.github.gaelrenoux.tranzactio.doobie._
import zio._
val queryStream: ZStream[Connection, Error, Person] = tzioStream { sql"""SELECT given_name, family_name FROM person""".query[Person].stream }.mapError(transform)
val zStream: ZStream[Database, DbException, Person] = Database.transactionOrDieStream(queryStream)
```

You can see a full example in the `examples` submodule (in `LayeredAppStreaming`).



### Multiple Databases

Some applications use multiple databases.
Expand All @@ -339,7 +359,6 @@ You only need to provide a different marker type for each database you use.

```scala
import io.github.gaelrenoux.tranzactio.doobie._
import javax.sql.DataSource
import zio._

trait Db1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.github.gaelrenoux.tranzactio

import zio.ZIO.attemptBlocking
import zio._
import zio.stream.ZStream

import java.sql.Connection
import javax.sql.DataSource
Expand All @@ -14,8 +15,14 @@ object ConnectionSource {
def runTransaction[R, E, A](task: Connection => ZIO[R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A]

def runTransactionOrDieStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A]

def runAutoCommit[R, E, A](task: Connection => ZIO[R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A]

def runAutoCommitStream[R, E, A](task: Connection => ZStream[R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A]
}

/** ConnectionSource with standard behavior. Children class need to implement `getConnection`. */
Expand All @@ -39,16 +46,37 @@ object ConnectionSource {
}
}

def runTransactionOrDieStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] = {
ZStream
.acquireReleaseExitWith(openConnection.tap(c => setAutoCommit(c, autoCommit = false)).orDie) {
case (c, Exit.Success(_)) => commitConnection(c).tapEither(_ => closeConnection(c)).orDie
case (c, Exit.Failure(cause)) if cause.isDie => closeConnection(c).orDie // No commit, no rollback in case of a defect, just close the connection
case (c, Exit.Failure(_)) => (if (commitOnFailure) commitConnection(c) else rollbackConnection(c)).tapEither(_ => closeConnection(c)).orDie
}
.flatMap { (c: Connection) => task(c) }
}

def runAutoCommit[R, E, A](task: Connection => ZIO[R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] =
ZIO.acquireReleaseWith(openConnection.mapError(Left(_)))(closeConnection(_).orDie) { (c: Connection) =>
setAutoCommit(c, autoCommit = true)
.mapError(Left(_))
setAutoCommit(c, autoCommit = true).mapError(Left(_))
.zipRight {
task(c).mapError(Right(_))
}
}

def runAutoCommitStream[R, E, A](task: Connection => ZStream[R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] =
ZStream
.acquireReleaseWith(openConnection.mapError(Left(_)))(closeConnection(_).orDie)
.flatMap { (c: Connection) =>
ZStream.fromZIO(setAutoCommit(c, autoCommit = true).mapError(Left(_)))
.crossRight {
task(c).mapError(Right(_))
}
}

// TODO handle error reporting when retrying

private def bottomErrorStrategy(implicit errorStrategies: ErrorStrategiesRef) =
Expand Down Expand Up @@ -116,6 +144,10 @@ object ConnectionSource {
super.runTransaction(task, commitOnFailure)
}

override def runTransactionOrDieStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] =
super.runTransactionOrDieStream(task, commitOnFailure) // TODO Could not find a way to use the semaphore here

override def runAutoCommit[R, E, A](task: Connection => ZIO[R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] =
semaphore.withPermit {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.gaelrenoux.tranzactio


import zio.stream.ZStream
import zio.{Tag, Trace, ZIO, ZLayer}

import javax.sql.DataSource
Expand All @@ -20,6 +21,15 @@ abstract class DatabaseModuleBase[Connection, Database <: DatabaseOps.ServiceOps
}
}

override def transactionOrDieStream[R, E, A](
stream: => ZStream[Connection with R, E, A],
commitOnFailure: => Boolean = false
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[Database with R, E, A] = {
ZStream.serviceWithStream[Database] { db =>
db.transactionOrDieStream[R, E, A](stream, commitOnFailure)
}
}

override def autoCommit[R, E, A](
zio: => ZIO[Connection with R, E, A]
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[Database with R, Either[DbException, E], A] = {
Expand All @@ -28,6 +38,14 @@ abstract class DatabaseModuleBase[Connection, Database <: DatabaseOps.ServiceOps
}
}

override def autoCommitStream[R, E, A](
stream: => ZStream[Connection with R, E, A]
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[Database with R, Either[DbException, E], A] = {
ZStream.serviceWithStream[Database] { db =>
db.autoCommitStream[R, E, A](stream)
}
}

/** Creates a Database Layer which requires an existing ConnectionSource. */
def fromConnectionSource(implicit dbContext: DbContext, trace: Trace): ZLayer[ConnectionSource, Nothing, Database]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.gaelrenoux.tranzactio

import zio.stream.ZStream
import zio.{Cause, Trace, ZIO}

/** Operations for a Database, based on a few atomic operations. Can be used both by the actual DB service, or by the DB
Expand Down Expand Up @@ -58,6 +59,12 @@ trait DatabaseOps[Connection, R0] {
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, E, A] =
transactionOrDie[R, E, A](zio, commitOnFailure)

/** As `transactionOrDie`, for ZStream instances instead of ZIO instances. */
def transactionOrDieStream[R <: Any, E, A](
stream: => ZStream[Connection with R, E, A],
commitOnFailure: => Boolean = false
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, E, A]

/** Provides that ZIO with a Connection. All DB action in the ZIO will be auto-committed. Failures in the initial
* ZIO will be wrapped in a Right in the error case of the resulting ZIO, with connection errors resulting in a
* failure with the exception wrapped in a Left.
Expand All @@ -74,6 +81,14 @@ trait DatabaseOps[Connection, R0] {
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, Either[DbException, E], A] =
autoCommit[R, E, A](zio)

/** As `autoCommit`, for ZStream instances instead of ZIO instances.
*
* This method should be implemented by subclasses, to provide the connection.
*/
def autoCommitStream[R, E, A](
stream: => ZStream[Connection with R, E, A]
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, Either[DbException, E], A]

/** As `autoCommit`, but exceptions are simply widened to a common failure type. The resulting failure type is a
* superclass of both DbException and the error type of the inital ZIO. */
final def autoCommitOrWiden[R, E >: DbException, A](
Expand All @@ -87,6 +102,12 @@ trait DatabaseOps[Connection, R0] {
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, E, A] =
autoCommitOrWiden[R, E, A](zio)

/** As `autoCommitOrWiden`, for ZStream instances instead of ZIO instances. */
final def autoCommitOrWidenStream[R, E >: DbException, A](
stream: => ZStream[Connection with R, E, A]
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, E, A] =
autoCommitStream[R, E, A](stream).mapError(_.merge)

/** As `autoCommit`, but errors when handling the connections are treated as defects instead of failures. */
final def autoCommitOrDie[R, E, A](
zio: => ZIO[Connection with R, E, A]
Expand All @@ -99,6 +120,17 @@ trait DatabaseOps[Connection, R0] {
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, E, A] =
autoCommitOrDie[R, E, A](zio)

/** As `autoCommitOrDie`, for ZStream instances instead of ZIO instances. */
final def autoCommitOrDieStream[R, E, A](
stream: => ZStream[Connection with R, E, A]
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, E, A] =
autoCommitStream[R, E, A](stream).mapErrorCause { cause =>
cause.flatMap {
case Left(dbError) => Cause.die(dbError, cause.trace)
case Right(error) => Cause.fail(error, cause.trace)
}
}

}

object DatabaseOps {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.gaelrenoux.tranzactio

import zio.{Tag, ZEnvironment, ZIO, Trace}
import zio.stream.ZStream
import zio.{Tag, Trace, ZEnvironment, ZIO}

import java.sql.{Connection => JdbcConnection}

Expand All @@ -23,6 +24,16 @@ abstract class DatabaseServiceBase[Connection: Tag](connectionSource: Connection
}, commitOnFailure)
}

override def transactionOrDieStream[R, E, A](stream: => ZStream[Connection with R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] =
ZStream.environmentWithStream[R] { r =>
runTransactionOrDieStream({ (c: JdbcConnection) =>
ZStream.fromZIO(connectionFromJdbc(c))
.map(r ++ ZEnvironment(_))
.flatMap(stream.provideEnvironment(_))
}, commitOnFailure)
}

override def autoCommit[R, E, A](zio: => ZIO[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] =
ZIO.environmentWithZIO[R] { r =>
Expand All @@ -33,5 +44,15 @@ abstract class DatabaseServiceBase[Connection: Tag](connectionSource: Connection
}
}

override def autoCommitStream[R, E, A](stream: => ZStream[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] =
ZStream.environmentWithStream[R] { r =>
runAutoCommitStream { (c: JdbcConnection) =>
ZStream.fromZIO(connectionFromJdbc(c))
.map(r ++ ZEnvironment(_))
.flatMap(stream.provideEnvironment(_))
}
}

}

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.gaelrenoux.tranzactio

import zio.{Trace, ZEnvironment, ZIO, ZLayer, Tag}
import zio.stream.ZStream
import zio.{Tag, Trace, ZEnvironment, ZIO, ZLayer}

/**
* This is a typed database, to use when you have multiple databases in your application. Simply provide a marker type,
Expand All @@ -12,11 +13,17 @@ class DatabaseTBase[M: Tag, Connection](underlying: DatabaseOps.ServiceOps[Conne
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R with Any, Either[DbException, E], A] =
underlying.transaction[R, E, A](task, commitOnFailure = commitOnFailure)

override def transactionOrDieStream[R, E, A](stream: => ZStream[Connection with R, E, A], commitOnFailure: => Boolean)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R with Any, E, A] =
underlying.transactionOrDieStream[R, E, A](stream, commitOnFailure = commitOnFailure)

override def autoCommit[R, E, A](task: => ZIO[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R with Any, Either[DbException, E], A] =
underlying.autoCommit[R, E, A](task)

override def autoCommitStream[R, E, A](stream: => ZStream[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R with Any, Either[DbException, E], A] =
underlying.autoCommitStream[R, E, A](stream)
}

object DatabaseTBase {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.gaelrenoux.tranzactio

import zio.{Tag, Trace}
import zio.Trace


/** A specific wrapper package for one specific library (e.g. Doobie). */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.github.gaelrenoux.tranzactio.test

import zio.stream.ZStream

import io.github.gaelrenoux.tranzactio._
import zio.{Tag, ZEnvironment, ZIO, ZLayer, Trace}
import zio.{Tag, Trace, ZEnvironment, ZIO, ZLayer}

/** Testing utilities on the Database module. */
trait DatabaseModuleTestOps[Connection, DbContext] extends DatabaseModuleBase[Connection, DatabaseOps.ServiceOps[Connection], DbContext] {
Expand Down Expand Up @@ -32,13 +34,29 @@ trait DatabaseModuleTestOps[Connection, DbContext] extends DatabaseModuleBase[Co
.mapError(Right(_))
}

override def transactionOrDieStream[R, E, A](stream: => ZStream[Connection with R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] = {
ZStream.fromZIO(noConnection).flatMap { c =>
ZStream.environmentWith[R](_ ++ ZEnvironment(c))
.flatMap(stream.provideEnvironment(_))
}
}

override def autoCommit[R, E, A](zio: => ZIO[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] =
noConnection.flatMap { c =>
ZIO.environmentWith[R](_ ++ ZEnvironment(c))
.flatMap(zio.provideEnvironment(_))
.mapError(Right(_))
}

override def autoCommitStream[R, E, A](stream: => ZStream[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] =
ZStream.fromZIO(noConnection).flatMap { c =>
ZStream.environmentWith[R](_ ++ ZEnvironment(c))
.flatMap(stream.provideEnvironment(_))
.mapError(Right(_))
}
}
}
}
Loading
Loading