From 82bb58429f9459a540113d4c3e3076642b760a1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C3=ABl=20Renoux?= Date: Sun, 24 Mar 2024 16:48:55 +0100 Subject: [PATCH] Implement ZStream support (#53) Co-authored-by: Alex Ritter --- README.md | 21 ++- .../tranzactio/ConnectionSource.scala | 36 +++- .../tranzactio/DatabaseModuleBase.scala | 18 ++ .../gaelrenoux/tranzactio/DatabaseOps.scala | 32 ++++ .../tranzactio/DatabaseServiceBase.scala | 23 ++- .../gaelrenoux/tranzactio/DatabaseTBase.scala | 9 +- .../gaelrenoux/tranzactio/Wrapper.scala | 2 +- .../test/DatabaseModuleTestOps.scala | 20 +- .../tranzactio/ConnectionSourceTest.scala | 97 +++++++++- .../tranzactio/DatabaseOpsCompileTest.scala | 81 +++++++- .../tranzactio/integration/DoobieIT.scala | 36 ++-- .../integration/DoobieStreamIT.scala | 174 ++++++++++++++++++ .../samples/doobie/LayeredAppStreaming.scala | 19 +- .../scala/samples/doobie/PersonQueries.scala | 10 + 14 files changed, 543 insertions(+), 35 deletions(-) create mode 100644 examples/src/test/scala/io/github/gaelrenoux/tranzactio/integration/DoobieStreamIT.scala diff --git a/README.md b/README.md index ce827cf..6cae1ae 100644 --- a/README.md +++ b/README.md @@ -329,6 +329,26 @@ val es: ErrorStrategies = +### Streaming + +When the wrapped framework handle streaming, you can convert the framework's stream to a `ZStream` using `tzioStream`. +To provide a `Connection` to the ZIO stream, you can either consume the stream into a ZIO first (then use the same functions as above), or use the `Database`'s streaming methods. + +The methods `transactionOrDieStream` and `autoCommitStream` work in the same way as the similar, non-stream method. +Note that for transactions, only the `OrDie` variant exists: this is because ZIO's acquire-release mechanism for stream does not allow to pass errors that occur during the acquire-release phase in the error channel. +Defects in the stream due to connection errors can only be caught after the `ZStream` has been consumed into a `ZIO`. + +```scala +import io.github.gaelrenoux.tranzactio.doobie._ +import zio._ +val queryStream: ZStream[Connection, Error, Person] = tzioStream { sql"""SELECT given_name, family_name FROM person""".query[Person].stream }.mapError(transform) +val zStream: ZStream[Database, DbException, Person] = Database.transactionOrDieStream(queryStream) +``` + +You can see a full example in the `examples` submodule (in `LayeredAppStreaming`). + + + ### Multiple Databases Some applications use multiple databases. @@ -339,7 +359,6 @@ You only need to provide a different marker type for each database you use. ```scala import io.github.gaelrenoux.tranzactio.doobie._ -import javax.sql.DataSource import zio._ trait Db1 diff --git a/core/src/main/scala/io/github/gaelrenoux/tranzactio/ConnectionSource.scala b/core/src/main/scala/io/github/gaelrenoux/tranzactio/ConnectionSource.scala index f17ed65..70246f5 100644 --- a/core/src/main/scala/io/github/gaelrenoux/tranzactio/ConnectionSource.scala +++ b/core/src/main/scala/io/github/gaelrenoux/tranzactio/ConnectionSource.scala @@ -2,6 +2,7 @@ package io.github.gaelrenoux.tranzactio import zio.ZIO.attemptBlocking import zio._ +import zio.stream.ZStream import java.sql.Connection import javax.sql.DataSource @@ -14,8 +15,14 @@ object ConnectionSource { def runTransaction[R, E, A](task: Connection => ZIO[R, E, A], commitOnFailure: => Boolean = false) (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] + def runTransactionOrDieStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] + def runAutoCommit[R, E, A](task: Connection => ZIO[R, E, A]) (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] + + def runAutoCommitStream[R, E, A](task: Connection => ZStream[R, E, A]) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] } /** ConnectionSource with standard behavior. Children class need to implement `getConnection`. */ @@ -39,16 +46,37 @@ object ConnectionSource { } } + def runTransactionOrDieStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] = { + ZStream + .acquireReleaseExitWith(openConnection.tap(c => setAutoCommit(c, autoCommit = false)).orDie) { + case (c, Exit.Success(_)) => commitConnection(c).tapEither(_ => closeConnection(c)).orDie + case (c, Exit.Failure(cause)) if cause.isDie => closeConnection(c).orDie // No commit, no rollback in case of a defect, just close the connection + case (c, Exit.Failure(_)) => (if (commitOnFailure) commitConnection(c) else rollbackConnection(c)).tapEither(_ => closeConnection(c)).orDie + } + .flatMap { (c: Connection) => task(c) } + } + def runAutoCommit[R, E, A](task: Connection => ZIO[R, E, A]) (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] = ZIO.acquireReleaseWith(openConnection.mapError(Left(_)))(closeConnection(_).orDie) { (c: Connection) => - setAutoCommit(c, autoCommit = true) - .mapError(Left(_)) + setAutoCommit(c, autoCommit = true).mapError(Left(_)) .zipRight { task(c).mapError(Right(_)) } } + def runAutoCommitStream[R, E, A](task: Connection => ZStream[R, E, A]) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] = + ZStream + .acquireReleaseWith(openConnection.mapError(Left(_)))(closeConnection(_).orDie) + .flatMap { (c: Connection) => + ZStream.fromZIO(setAutoCommit(c, autoCommit = true).mapError(Left(_))) + .crossRight { + task(c).mapError(Right(_)) + } + } + // TODO handle error reporting when retrying private def bottomErrorStrategy(implicit errorStrategies: ErrorStrategiesRef) = @@ -116,6 +144,10 @@ object ConnectionSource { super.runTransaction(task, commitOnFailure) } + override def runTransactionOrDieStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] = + super.runTransactionOrDieStream(task, commitOnFailure) // TODO Could not find a way to use the semaphore here + override def runAutoCommit[R, E, A](task: Connection => ZIO[R, E, A]) (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] = semaphore.withPermit { diff --git a/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseModuleBase.scala b/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseModuleBase.scala index 97c0823..651bb55 100644 --- a/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseModuleBase.scala +++ b/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseModuleBase.scala @@ -1,6 +1,7 @@ package io.github.gaelrenoux.tranzactio +import zio.stream.ZStream import zio.{Tag, Trace, ZIO, ZLayer} import javax.sql.DataSource @@ -20,6 +21,15 @@ abstract class DatabaseModuleBase[Connection, Database <: DatabaseOps.ServiceOps } } + override def transactionOrDieStream[R, E, A]( + stream: => ZStream[Connection with R, E, A], + commitOnFailure: => Boolean = false + )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[Database with R, E, A] = { + ZStream.serviceWithStream[Database] { db => + db.transactionOrDieStream[R, E, A](stream, commitOnFailure) + } + } + override def autoCommit[R, E, A]( zio: => ZIO[Connection with R, E, A] )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[Database with R, Either[DbException, E], A] = { @@ -28,6 +38,14 @@ abstract class DatabaseModuleBase[Connection, Database <: DatabaseOps.ServiceOps } } + override def autoCommitStream[R, E, A]( + stream: => ZStream[Connection with R, E, A] + )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[Database with R, Either[DbException, E], A] = { + ZStream.serviceWithStream[Database] { db => + db.autoCommitStream[R, E, A](stream) + } + } + /** Creates a Database Layer which requires an existing ConnectionSource. */ def fromConnectionSource(implicit dbContext: DbContext, trace: Trace): ZLayer[ConnectionSource, Nothing, Database] diff --git a/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseOps.scala b/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseOps.scala index b3a6ddc..0f8d086 100644 --- a/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseOps.scala +++ b/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseOps.scala @@ -1,5 +1,6 @@ package io.github.gaelrenoux.tranzactio +import zio.stream.ZStream import zio.{Cause, Trace, ZIO} /** Operations for a Database, based on a few atomic operations. Can be used both by the actual DB service, or by the DB @@ -58,6 +59,12 @@ trait DatabaseOps[Connection, R0] { )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, E, A] = transactionOrDie[R, E, A](zio, commitOnFailure) + /** As `transactionOrDie`, for ZStream instances instead of ZIO instances. */ + def transactionOrDieStream[R <: Any, E, A]( + stream: => ZStream[Connection with R, E, A], + commitOnFailure: => Boolean = false + )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, E, A] + /** Provides that ZIO with a Connection. All DB action in the ZIO will be auto-committed. Failures in the initial * ZIO will be wrapped in a Right in the error case of the resulting ZIO, with connection errors resulting in a * failure with the exception wrapped in a Left. @@ -74,6 +81,14 @@ trait DatabaseOps[Connection, R0] { )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, Either[DbException, E], A] = autoCommit[R, E, A](zio) + /** As `autoCommit`, for ZStream instances instead of ZIO instances. + * + * This method should be implemented by subclasses, to provide the connection. + */ + def autoCommitStream[R, E, A]( + stream: => ZStream[Connection with R, E, A] + )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, Either[DbException, E], A] + /** As `autoCommit`, but exceptions are simply widened to a common failure type. The resulting failure type is a * superclass of both DbException and the error type of the inital ZIO. */ final def autoCommitOrWiden[R, E >: DbException, A]( @@ -87,6 +102,12 @@ trait DatabaseOps[Connection, R0] { )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, E, A] = autoCommitOrWiden[R, E, A](zio) + /** As `autoCommitOrWiden`, for ZStream instances instead of ZIO instances. */ + final def autoCommitOrWidenStream[R, E >: DbException, A]( + stream: => ZStream[Connection with R, E, A] + )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, E, A] = + autoCommitStream[R, E, A](stream).mapError(_.merge) + /** As `autoCommit`, but errors when handling the connections are treated as defects instead of failures. */ final def autoCommitOrDie[R, E, A]( zio: => ZIO[Connection with R, E, A] @@ -99,6 +120,17 @@ trait DatabaseOps[Connection, R0] { )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, E, A] = autoCommitOrDie[R, E, A](zio) + /** As `autoCommitOrDie`, for ZStream instances instead of ZIO instances. */ + final def autoCommitOrDieStream[R, E, A]( + stream: => ZStream[Connection with R, E, A] + )(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, E, A] = + autoCommitStream[R, E, A](stream).mapErrorCause { cause => + cause.flatMap { + case Left(dbError) => Cause.die(dbError, cause.trace) + case Right(error) => Cause.fail(error, cause.trace) + } + } + } object DatabaseOps { diff --git a/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseServiceBase.scala b/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseServiceBase.scala index 2616a00..57abc72 100644 --- a/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseServiceBase.scala +++ b/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseServiceBase.scala @@ -1,6 +1,7 @@ package io.github.gaelrenoux.tranzactio -import zio.{Tag, ZEnvironment, ZIO, Trace} +import zio.stream.ZStream +import zio.{Tag, Trace, ZEnvironment, ZIO} import java.sql.{Connection => JdbcConnection} @@ -23,6 +24,16 @@ abstract class DatabaseServiceBase[Connection: Tag](connectionSource: Connection }, commitOnFailure) } + override def transactionOrDieStream[R, E, A](stream: => ZStream[Connection with R, E, A], commitOnFailure: => Boolean = false) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] = + ZStream.environmentWithStream[R] { r => + runTransactionOrDieStream({ (c: JdbcConnection) => + ZStream.fromZIO(connectionFromJdbc(c)) + .map(r ++ ZEnvironment(_)) + .flatMap(stream.provideEnvironment(_)) + }, commitOnFailure) + } + override def autoCommit[R, E, A](zio: => ZIO[Connection with R, E, A]) (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] = ZIO.environmentWithZIO[R] { r => @@ -33,5 +44,15 @@ abstract class DatabaseServiceBase[Connection: Tag](connectionSource: Connection } } + override def autoCommitStream[R, E, A](stream: => ZStream[Connection with R, E, A]) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] = + ZStream.environmentWithStream[R] { r => + runAutoCommitStream { (c: JdbcConnection) => + ZStream.fromZIO(connectionFromJdbc(c)) + .map(r ++ ZEnvironment(_)) + .flatMap(stream.provideEnvironment(_)) + } + } + } diff --git a/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseTBase.scala b/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseTBase.scala index 6c73a78..ecc6023 100644 --- a/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseTBase.scala +++ b/core/src/main/scala/io/github/gaelrenoux/tranzactio/DatabaseTBase.scala @@ -1,6 +1,7 @@ package io.github.gaelrenoux.tranzactio -import zio.{Trace, ZEnvironment, ZIO, ZLayer, Tag} +import zio.stream.ZStream +import zio.{Tag, Trace, ZEnvironment, ZIO, ZLayer} /** * This is a typed database, to use when you have multiple databases in your application. Simply provide a marker type, @@ -12,11 +13,17 @@ class DatabaseTBase[M: Tag, Connection](underlying: DatabaseOps.ServiceOps[Conne (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R with Any, Either[DbException, E], A] = underlying.transaction[R, E, A](task, commitOnFailure = commitOnFailure) + override def transactionOrDieStream[R, E, A](stream: => ZStream[Connection with R, E, A], commitOnFailure: => Boolean) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R with Any, E, A] = + underlying.transactionOrDieStream[R, E, A](stream, commitOnFailure = commitOnFailure) override def autoCommit[R, E, A](task: => ZIO[Connection with R, E, A]) (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R with Any, Either[DbException, E], A] = underlying.autoCommit[R, E, A](task) + override def autoCommitStream[R, E, A](stream: => ZStream[Connection with R, E, A]) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R with Any, Either[DbException, E], A] = + underlying.autoCommitStream[R, E, A](stream) } object DatabaseTBase { diff --git a/core/src/main/scala/io/github/gaelrenoux/tranzactio/Wrapper.scala b/core/src/main/scala/io/github/gaelrenoux/tranzactio/Wrapper.scala index 4d9a0ba..27bb9f3 100644 --- a/core/src/main/scala/io/github/gaelrenoux/tranzactio/Wrapper.scala +++ b/core/src/main/scala/io/github/gaelrenoux/tranzactio/Wrapper.scala @@ -1,6 +1,6 @@ package io.github.gaelrenoux.tranzactio -import zio.{Tag, Trace} +import zio.Trace /** A specific wrapper package for one specific library (e.g. Doobie). */ diff --git a/core/src/main/scala/io/github/gaelrenoux/tranzactio/test/DatabaseModuleTestOps.scala b/core/src/main/scala/io/github/gaelrenoux/tranzactio/test/DatabaseModuleTestOps.scala index 00c7553..c6d4716 100644 --- a/core/src/main/scala/io/github/gaelrenoux/tranzactio/test/DatabaseModuleTestOps.scala +++ b/core/src/main/scala/io/github/gaelrenoux/tranzactio/test/DatabaseModuleTestOps.scala @@ -1,7 +1,9 @@ package io.github.gaelrenoux.tranzactio.test +import zio.stream.ZStream + import io.github.gaelrenoux.tranzactio._ -import zio.{Tag, ZEnvironment, ZIO, ZLayer, Trace} +import zio.{Tag, Trace, ZEnvironment, ZIO, ZLayer} /** Testing utilities on the Database module. */ trait DatabaseModuleTestOps[Connection, DbContext] extends DatabaseModuleBase[Connection, DatabaseOps.ServiceOps[Connection], DbContext] { @@ -32,6 +34,14 @@ trait DatabaseModuleTestOps[Connection, DbContext] extends DatabaseModuleBase[Co .mapError(Right(_)) } + override def transactionOrDieStream[R, E, A](stream: => ZStream[Connection with R, E, A], commitOnFailure: => Boolean = false) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] = { + ZStream.fromZIO(noConnection).flatMap { c => + ZStream.environmentWith[R](_ ++ ZEnvironment(c)) + .flatMap(stream.provideEnvironment(_)) + } + } + override def autoCommit[R, E, A](zio: => ZIO[Connection with R, E, A]) (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] = noConnection.flatMap { c => @@ -39,6 +49,14 @@ trait DatabaseModuleTestOps[Connection, DbContext] extends DatabaseModuleBase[Co .flatMap(zio.provideEnvironment(_)) .mapError(Right(_)) } + + override def autoCommitStream[R, E, A](stream: => ZStream[Connection with R, E, A]) + (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] = + ZStream.fromZIO(noConnection).flatMap { c => + ZStream.environmentWith[R](_ ++ ZEnvironment(c)) + .flatMap(stream.provideEnvironment(_)) + .mapError(Right(_)) + } } } } diff --git a/core/src/test/scala/io/github/gaelrenoux/tranzactio/ConnectionSourceTest.scala b/core/src/test/scala/io/github/gaelrenoux/tranzactio/ConnectionSourceTest.scala index 9c4cd8f..4c6b7cb 100644 --- a/core/src/test/scala/io/github/gaelrenoux/tranzactio/ConnectionSourceTest.scala +++ b/core/src/test/scala/io/github/gaelrenoux/tranzactio/ConnectionSourceTest.scala @@ -1,5 +1,6 @@ package io.github.gaelrenoux.tranzactio +import zio.stream.ZStream import zio.test._ import zio.{test => _, _} @@ -31,6 +32,9 @@ object ConnectionSourceTest extends ZIOSpec[TestEnvironment] { } } + def connectionCountQueryStream(c: Connection): ZStream[Any, Throwable, Int] = + ZStream.fromZIO(connectionCountQuery(c)) + def spec: MySpec = suite("Single connection ConnectionSource Tests")( testRunTransactionFailureOnOpen, testRunTransactionFailureOnAutoCommit, @@ -40,7 +44,16 @@ object ConnectionSourceTest extends ZIOSpec[TestEnvironment] { testRunTransactionFailureOnClose, testRunAutoCommitFailureOnOpen, testRunAutoCommitFailureOnAutoCommit, - testRunAutoCommitFailureOnClose + testRunAutoCommitFailureOnClose, + testRunTransactionStreamFailureOnOpen, + testRunTransactionStreamFailureOnAutoCommit, + testRunTransactionStreamFailureOnCommit, + testRunTransactionStreamFailureOnCommitAfterFailure, + testRunTransactionStreamFailureOnRollback, + testRunTransactionStreamFailureOnClose, + testRunAutoCommitStreamFailureOnOpen, + testRunAutoCommitStreamFailureOnAutoCommit, + testRunAutoCommitStreamFailureOnClose ) private val testRunTransactionFailureOnOpen = test("runTransaction failure > on open") { @@ -122,4 +135,86 @@ object ConnectionSourceTest extends ZIOSpec[TestEnvironment] { case Cause.Die(ex, _) => assertTrue(ex == DbException.Wrapped(FailingConnectionSource.CloseException)) } } + + private val testRunTransactionStreamFailureOnOpen = test("runTransactionOrDieStream failure > on open") { + val cs = new FailingConnectionSource(errorStrategies)(failOnOpen = true) + val zio: ZStream[Any, Throwable, Int] = cs.runTransactionOrDieStream(connectionCountQueryStream) + zio.runCollect.cause.map { + case Cause.Die(ex, _) => assertTrue(ex == DbException.Wrapped(FailingConnectionSource.OpenException)) + } + } + + private val testRunTransactionStreamFailureOnAutoCommit = test("runTransactionOrDieStream failure > on auto-commit") { + val cs = new FailingConnectionSource(errorStrategies)(failOnAutoCommit = true) + val zio: ZStream[Any, Throwable, Int] = cs.runTransactionOrDieStream(connectionCountQueryStream) + zio.runCollect.cause.map { + case Cause.Die(ex, _) => assertTrue(ex == DbException.Wrapped(FailingConnectionSource.AutoCommitException)) + } + } + + private val testRunTransactionStreamFailureOnCommit = test("runTransactionOrDieStream failure > on commit (after success)") { + val cs = new FailingConnectionSource(errorStrategies)(failOnCommit = true) + val zio: ZStream[Any, Throwable, Int] = cs.runTransactionOrDieStream(connectionCountQueryStream) + zio.runCollect.cause.map { + case Cause.Die(ex, _) => assertTrue(ex == DbException.Wrapped(FailingConnectionSource.CommitException)) + } + } + + private val testRunTransactionStreamFailureOnCommitAfterFailure = test("runTransactionOrDieStream failure > on commit (after failure)") { + val cs = new FailingConnectionSource(errorStrategies)(failOnCommit = true) + val zio: ZStream[Any, Exception, Int] = cs.runTransactionOrDieStream(_ => ZStream.fail(TestException("Not a good query")), commitOnFailure = true) + zio.runCollect.cause.map { + case Cause.Then(Cause.Fail(firstError, _), Cause.Die(secondError, _)) => + assertTrue( + firstError == TestException("Not a good query"), + secondError == DbException.Wrapped(FailingConnectionSource.CommitException) + ) + } + } + + private val testRunTransactionStreamFailureOnRollback = test("runTransactionOrDieStream failure > on rollback") { + val cs = new FailingConnectionSource(errorStrategies)(failOnRollback = true) + val zio: ZStream[Any, Exception, Int] = cs.runTransactionOrDieStream(_ => ZStream.fail(TestException("Not a good query"))) + zio.runCollect.cause.map { + case Cause.Then(Cause.Fail(firstError, _), Cause.Die(secondError, _)) => + assertTrue( + firstError == TestException("Not a good query"), + secondError == DbException.Wrapped(FailingConnectionSource.RollbackException) + ) + } + } + + private val testRunTransactionStreamFailureOnClose = test("runTransactionOrDieStream failure > on close") { + val cs = new FailingConnectionSource(errorStrategies)(failOnClose = true) + val zio: ZStream[Any, Throwable, Int] = cs.runTransactionOrDieStream(connectionCountQueryStream) + zio.runCollect.cause.map { + case Cause.Die(ex, _) => assertTrue(ex == DbException.Wrapped(FailingConnectionSource.CloseException)) + } + } + + private val testRunAutoCommitStreamFailureOnOpen = test("runAutoCommitStream failure > on open") { + val cs = new FailingConnectionSource(errorStrategies)(failOnOpen = true) + val zio: ZStream[Any, Either[DbException, Throwable], Int] = cs.runAutoCommitStream(connectionCountQueryStream) + zio.runCollect.flip.map { e => + assertTrue(e == Left(DbException.Wrapped(FailingConnectionSource.OpenException))) + } + } + + private val testRunAutoCommitStreamFailureOnAutoCommit = test("runAutoCommitStream failure > on auto-commit") { + val cs = new FailingConnectionSource(errorStrategies)(failOnAutoCommit = true) + val zio: ZStream[Any, Either[DbException, Throwable], Int] = cs.runAutoCommitStream(connectionCountQueryStream) + zio.runCollect.flip.map { e => + assertTrue(e == Left(DbException.Wrapped(FailingConnectionSource.AutoCommitException))) + } + } + + private val testRunAutoCommitStreamFailureOnClose = test("runAutoCommitStream failure > on close") { + val cs = new FailingConnectionSource(errorStrategies)(failOnClose = true) + val zio: ZStream[Any, Either[DbException, Throwable], Int] = cs.runAutoCommitStream(connectionCountQueryStream) + zio.runCollect.cause.map { + case Cause.Die(ex, _) => assertTrue(ex == DbException.Wrapped(FailingConnectionSource.CloseException)) + } + } + + case class TestException(content: String) extends Exception(content) } diff --git a/core/src/test/scala/io/github/gaelrenoux/tranzactio/DatabaseOpsCompileTest.scala b/core/src/test/scala/io/github/gaelrenoux/tranzactio/DatabaseOpsCompileTest.scala index 90a3744..632ef30 100644 --- a/core/src/test/scala/io/github/gaelrenoux/tranzactio/DatabaseOpsCompileTest.scala +++ b/core/src/test/scala/io/github/gaelrenoux/tranzactio/DatabaseOpsCompileTest.scala @@ -1,6 +1,7 @@ package io.github.gaelrenoux.tranzactio import zio.ZIO +import zio.stream.ZStream /** This is not a runnable test. It is here to check the type inference produces the expected types. */ trait DatabaseOpsCompileTest { @@ -9,6 +10,8 @@ trait DatabaseOpsCompileTest { def z[R, E]: ZIO[R, E, Int] = ZIO.succeed(42) + def zs[R, E]: ZStream[R, E, Int] = ZStream.succeed(42) + val serviceOperations: DatabaseOps.ServiceOps[Connection] object ServicesCheck { @@ -57,7 +60,7 @@ trait DatabaseOpsCompileTest { serviceOperations.transactionOrDie(z[Connection, String], commitOnFailure = true) } - object AutoCommitR { + object AutoCommit { val a: ZIO[Environment, Either[DbException, String], Int] = serviceOperations.autoCommit(z[Connection with Environment, String]) @@ -85,6 +88,46 @@ trait DatabaseOpsCompileTest { serviceOperations.autoCommitOrDie(z[Connection, String]) } + object TransactionOrDieStream { + val a: ZStream[Environment, String, Int] = + serviceOperations.transactionOrDieStream(zs[Connection with Environment, String]) + val b: ZStream[Environment, String, Int] = + serviceOperations.transactionOrDieStream(zs[Connection with Environment, String], commitOnFailure = true) + + val c: ZStream[Any, String, Int] = + serviceOperations.transactionOrDieStream(zs[Connection, String]) + val d: ZStream[Any, String, Int] = + serviceOperations.transactionOrDieStream(zs[Connection, String], commitOnFailure = true) + } + + object AutoCommitStream { + val a: ZStream[Environment, Either[DbException, String], Int] = + serviceOperations.autoCommitStream(zs[Connection with Environment, String]) + + val b: ZStream[Any, Either[DbException, String], Int] = + serviceOperations.autoCommitStream(zs[Connection, String]) + } + + object AutoCommitOrWidenStream { + val a: ZStream[Environment, Exception, Int] = + serviceOperations.autoCommitOrWidenStream(zs[Connection with Environment, IllegalArgumentException]) + val b: ZStream[Environment, DbException, Int] = + serviceOperations.autoCommitOrWidenStream(zs[Connection with Environment, DbException]) + + val c: ZStream[Any, Exception, Int] = + serviceOperations.autoCommitOrWidenStream(zs[Connection, IllegalArgumentException]) + val d: ZStream[Any, DbException, Int] = + serviceOperations.autoCommitOrWidenStream(zs[Connection, DbException]) + } + + object AutoCommitOrDieStream { + val a: ZStream[Environment, String, Int] = + serviceOperations.autoCommitOrDieStream(zs[Connection with Environment, String]) + + val b: ZStream[Any, String, Int] = + serviceOperations.autoCommitOrDieStream(zs[Connection, String]) + } + } @@ -148,6 +191,42 @@ trait DatabaseOpsCompileTest { moduleOperations.autoCommitOrDie(z[Connection, String]) } + object TransactionOrDieStream { + val a: ZStream[Database with Environment, String, Int] = + moduleOperations.transactionOrDieStream(zs[Connection with Environment, String]) + + val b: ZStream[Database, String, Int] = + moduleOperations.transactionOrDieStream(zs[Connection, String]) + } + + object AutoCommitStream { + val a: ZStream[Database with Environment, Either[DbException, String], Int] = + moduleOperations.autoCommitStream(zs[Connection with Environment, String]) + + val b: ZStream[Database, Either[DbException, String], Int] = + moduleOperations.autoCommitStream(zs[Connection, String]) + } + + object AutoCommitOrWidenStream { + val a: ZStream[Database with Environment, Exception, Int] = + moduleOperations.autoCommitOrWidenStream(zs[Connection with Environment, IllegalArgumentException]) + val b: ZStream[Database with Environment, DbException, Int] = + moduleOperations.autoCommitOrWidenStream(zs[Connection with Environment, DbException]) + + val c: ZStream[Database, Exception, Int] = + moduleOperations.autoCommitOrWidenStream(zs[Connection, IllegalArgumentException]) + val d: ZStream[Database, DbException, Int] = + moduleOperations.autoCommitOrWidenStream(zs[Connection, DbException]) + } + + object AutoCommitOrDieStream { + val a: ZStream[Database with Environment, String, Int] = + moduleOperations.autoCommitOrDieStream(zs[Connection with Environment, String]) + + val b: ZStream[Database, String, Int] = + moduleOperations.autoCommitOrDieStream(zs[Connection, String]) + } + } } diff --git a/examples/src/test/scala/io/github/gaelrenoux/tranzactio/integration/DoobieIT.scala b/examples/src/test/scala/io/github/gaelrenoux/tranzactio/integration/DoobieIT.scala index 63831a9..d2ca3ee 100644 --- a/examples/src/test/scala/io/github/gaelrenoux/tranzactio/integration/DoobieIT.scala +++ b/examples/src/test/scala/io/github/gaelrenoux/tranzactio/integration/DoobieIT.scala @@ -6,9 +6,8 @@ import io.github.gaelrenoux.tranzactio.doobie._ import io.github.gaelrenoux.tranzactio.{ConnectionSource, JdbcLayers} import samples.Person import samples.doobie.PersonQueries -import zio.test.Assertion._ import zio.test._ -import zio.{Scope, ZIO, ZLayer} +import zio.{Chunk, Scope, ZIO, ZLayer} /** Integration tests for Doobie */ @@ -45,7 +44,7 @@ object DoobieIT extends ITSpec { _ <- Database.transaction(PersonQueries.setup) _ <- Database.transaction(PersonQueries.insert(buffy)) persons <- Database.transaction(PersonQueries.list) - } yield assert(persons)(equalTo(List(buffy))) + } yield assertTrue(persons == List(buffy)) } } @@ -55,7 +54,7 @@ object DoobieIT extends ITSpec { _ <- Database.transaction(PersonQueries.setup) _ <- Database.transaction(PersonQueries.insert(buffy)) connectionCount <- Database.transaction(connectionCountQuery) - } yield assert(connectionCount)(equalTo(1)) // only the current connection + } yield assertTrue(connectionCount == 1) // only the current connection } } @@ -65,7 +64,7 @@ object DoobieIT extends ITSpec { _ <- Database.transaction(PersonQueries.setup) _ <- Database.transaction(PersonQueries.insert(buffy) <*> PersonQueries.failing).flip persons <- Database.transaction(PersonQueries.list) - } yield assert(persons)(equalTo(Nil)) + } yield assertTrue(persons.isEmpty) } } @@ -75,7 +74,7 @@ object DoobieIT extends ITSpec { _ <- Database.transaction(PersonQueries.setup) _ <- Database.transaction(PersonQueries.insert(buffy) <*> PersonQueries.failing, commitOnFailure = true).flip persons <- Database.transaction(PersonQueries.list) - } yield assert(persons)(equalTo(List(buffy))) + } yield assertTrue(persons == List(buffy)) } } @@ -85,8 +84,8 @@ object DoobieIT extends ITSpec { _ <- Database.transaction(PersonQueries.setup) _ <- Database.transaction(PersonQueries.insert(buffy) <*> PersonQueries.failing).flip connectionCount <- Database.transaction(connectionCountQuery) - } yield assert(connectionCount)(equalTo(1)) - } // only the current connection + } yield assertTrue(connectionCount == 1) // only the current connection + } } private val testDataCommittedOnAutoCommitSuccess: MySpec = test("data committed on autoCommit success") { @@ -95,7 +94,7 @@ object DoobieIT extends ITSpec { _ <- Database.autoCommit(PersonQueries.setup) _ <- Database.autoCommit(PersonQueries.insert(buffy)) persons <- Database.autoCommit(PersonQueries.list) - } yield assert(persons)(equalTo(List(buffy))) + } yield assertTrue(persons == List(buffy)) } } @@ -105,17 +104,17 @@ object DoobieIT extends ITSpec { _ <- Database.autoCommit(PersonQueries.setup) _ <- Database.autoCommit(PersonQueries.insert(buffy)) connectionCount <- Database.autoCommit(connectionCountQuery) - } yield assert(connectionCount)(equalTo(1)) - } // only the current connection + } yield assertTrue(connectionCount == 1) // only the current connection + } } - private val testDataRollbackedOnAutoCommitFailure: MySpec = test("data rollbacked on autoCommit failure") { + private val testDataRollbackedOnAutoCommitFailure: MySpec = test("data committed on autoCommit failure") { wrap { for { _ <- Database.autoCommit(PersonQueries.setup) _ <- Database.autoCommit(PersonQueries.insert(buffy) <*> PersonQueries.failing).flip persons <- Database.autoCommit(PersonQueries.list) - } yield assert(persons)(equalTo(List(buffy))) + } yield assertTrue(persons == List(buffy)) } } @@ -125,7 +124,7 @@ object DoobieIT extends ITSpec { _ <- Database.autoCommit(PersonQueries.setup) _ <- Database.autoCommit(PersonQueries.insert(buffy)) connectionCount <- Database.autoCommit(connectionCountQuery) - } yield assert(connectionCount)(equalTo(1)) // only the current connection + } yield assertTrue(connectionCount == 1) // only the current connection } } @@ -135,16 +134,17 @@ object DoobieIT extends ITSpec { _ <- Database.autoCommit(PersonQueries.setup) _ <- Database.autoCommit(PersonQueries.insert(buffy)) _ <- Database.autoCommit(PersonQueries.insert(giles)) - result <- Database.autoCommit { - val doobieStream = sql"""SELECT given_name, family_name FROM person""".query[Person] + failingOnSecondStream = tzioStream { + sql"""SELECT given_name, family_name FROM person""".query[Person] .streamWithChunkSize(1) // make sure it's read one by one .map { p => if (p.givenName == "Rupert") throw new IllegalStateException // fail on the second one, if it's ever read else p } - tzioStream(doobieStream).take(1).runHead // only keep one } - } yield assert(result)(isSome(equalTo(buffy))) + zio = failingOnSecondStream.take(1).runCollect // only keep one + result <- Database.autoCommit(zio) + } yield assertTrue(result == Chunk(buffy)) } } diff --git a/examples/src/test/scala/io/github/gaelrenoux/tranzactio/integration/DoobieStreamIT.scala b/examples/src/test/scala/io/github/gaelrenoux/tranzactio/integration/DoobieStreamIT.scala new file mode 100644 index 0000000..dab6c55 --- /dev/null +++ b/examples/src/test/scala/io/github/gaelrenoux/tranzactio/integration/DoobieStreamIT.scala @@ -0,0 +1,174 @@ +package io.github.gaelrenoux.tranzactio.integration + +import doobie.implicits._ +import doobie.util.fragment.Fragment +import io.github.gaelrenoux.tranzactio.doobie._ +import io.github.gaelrenoux.tranzactio.{ConnectionSource, JdbcLayers} +import samples.Person +import samples.doobie.PersonQueries +import zio.stream.ZStream +import zio.test._ +import zio.{Chunk, Scope, ZIO, ZLayer} + + +/** Integration tests for Doobie */ +// scalastyle:off magic.number +object DoobieStreamIT extends ITSpec { + + /** Layer is recreated on each test, to have a different database every time. */ + def myLayer: ZLayer[Scope, Nothing, Database with PersonQueries] = + PersonQueries.live ++ (JdbcLayers.datasourceU >>> ConnectionSource.fromDatasource >>> Database.fromConnectionSource) + + val buffy: Person = Person("Buffy", "Summers") + val giles: Person = Person("Rupert", "Giles") + + def stream[R, E, A](zio: ZIO[R, E, A], num: Int = 1): ZStream[R, E, A] = + if (num == 0) ZStream.never + else if (num == 1) ZStream.fromZIO(zio) + else ZStream.fromZIO(zio).forever.take(num) + + val connectionCountQuery: TranzactIO[Int] = tzio(Fragment.const(connectionCountSql).query[Int].unique) + + private def wrap[E, A](z: ZIO[Database with PersonQueries, E, A]): ZIO[Scope, E, A] = + z.provideSome[Scope](myLayer) + + def spec: MySpec = suite("Doobie-Stream Integration Tests")( + testDataCommittedOnTransactionSuccess, + testConnectionClosedOnTransactionSuccess, + testDataRollbackedOnTransactionFailure, + testDataCommittedOnTransactionFailure, + testConnectionClosedOnTransactionFailure, + testMultipleTransactions, + testDataCommittedOnAutoCommitSuccess, + testConnectionClosedOnAutoCommitSuccess, + testDataRollbackedOnAutoCommitFailure, + testConnectionClosedOnAutoCommitFailure, + testStreamDoesNotLoadAllValues + ) + + private val testDataCommittedOnTransactionSuccess: MySpec = test("data committed on transaction success") { + wrap { + for { + _ <- Database.transaction(PersonQueries.setup) + _ <- Database.transactionOrDieStream(stream(PersonQueries.insert(buffy), 3)).runDrain + persons <- Database.transactionOrDieStream(PersonQueries.listStream).runCollect + } yield assertTrue(persons == Chunk(buffy, buffy, buffy)) + } + } + + private val testConnectionClosedOnTransactionSuccess: MySpec = test("connection closed on transaction success") { + wrap { + for { + _ <- Database.transaction(PersonQueries.setup) + _ <- Database.transactionOrDieStream(stream(PersonQueries.insert(buffy), 3)).runDrain + connectionCount <- Database.transaction(connectionCountQuery) + } yield assertTrue(connectionCount == 1) // only the current connection + } + } + + private val testDataRollbackedOnTransactionFailure: MySpec = test("data rollbacked on transaction failure if commitOnFailure=false") { + wrap { + for { + _ <- Database.transaction(PersonQueries.setup) + _ <- Database.transactionOrDieStream(stream(PersonQueries.insert(buffy), 2) ++ PersonQueries.failingStream ++ stream(PersonQueries.insert(buffy), 2)).runDrain.flip + persons <- Database.transactionOrDieStream(PersonQueries.listStream).runCollect + } yield assertTrue(persons.isEmpty) + } + } + + private val testDataCommittedOnTransactionFailure: MySpec = test("data committed on transaction failure if commitOnFailure=true") { + wrap { + for { + _ <- Database.transaction(PersonQueries.setup) + _ <- Database.transactionOrDieStream(stream(PersonQueries.insert(buffy), 2) ++ PersonQueries.failingStream ++ stream(PersonQueries.insert(buffy), 2), commitOnFailure = true).runDrain.flip + persons <- Database.transactionOrDieStream(PersonQueries.listStream).runCollect + } yield assertTrue(persons == Chunk(buffy, buffy)) + } + } + + private val testConnectionClosedOnTransactionFailure: MySpec = test("connection closed on transaction failure") { + wrap { + for { + _ <- Database.transaction(PersonQueries.setup) + _ <- Database.transactionOrDieStream(stream(PersonQueries.insert(buffy), 2) ++ PersonQueries.failingStream ++ stream(PersonQueries.insert(buffy), 2)).runDrain.flip + connectionCount <- Database.transaction(connectionCountQuery) + } yield assertTrue(connectionCount == 1) // only the current connection + } + } + + private val testMultipleTransactions: MySpec = test("multiple transactions") { + wrap { + for { + _ <- Database.transaction(PersonQueries.setup) + _ <- { + val stream1 = Database.transactionOrDieStream(stream(PersonQueries.insert(buffy), 2)) + val stream2 = Database.transactionOrDieStream(PersonQueries.failingStream) + val stream3 = Database.transactionOrDieStream(stream(PersonQueries.insert(giles), 2)) + (stream1 ++ stream2 ++ stream3).runDrain.flip + } + persons <- Database.transactionOrDieStream(PersonQueries.listStream).runCollect + } yield assertTrue(persons == Chunk(buffy, buffy)) + } + } + + private val testDataCommittedOnAutoCommitSuccess: MySpec = test("data committed on autoCommit success") { + wrap { + for { + _ <- Database.autoCommit(PersonQueries.setup) + _ <- Database.autoCommitStream(stream(PersonQueries.insert(buffy), 3)).runDrain + persons <- Database.autoCommitStream(PersonQueries.listStream).runCollect + } yield assertTrue(persons == Chunk(buffy, buffy, buffy)) + } + } + + private val testConnectionClosedOnAutoCommitSuccess: MySpec = test("connection closed on autoCommit success") { + wrap { + for { + _ <- Database.autoCommit(PersonQueries.setup) + _ <- Database.autoCommitStream(stream(PersonQueries.insert(buffy), 3)).runDrain + connectionCount <- Database.autoCommit(connectionCountQuery) + } yield assertTrue(connectionCount == 1) // only the current connection + } + } + + private val testDataRollbackedOnAutoCommitFailure: MySpec = test("data committed on autoCommit failure") { + wrap { + for { + _ <- Database.autoCommit(PersonQueries.setup) + _ <- Database.autoCommitStream(stream(PersonQueries.insert(buffy), 2) ++ PersonQueries.failingStream ++ stream(PersonQueries.insert(buffy), 2)).runDrain.flip + persons <- Database.autoCommitStream(PersonQueries.listStream).runCollect + } yield assertTrue(persons == Chunk(buffy, buffy)) + } + } + + private val testConnectionClosedOnAutoCommitFailure: MySpec = test("connection closed on autoCommit failure") { + wrap { + for { + _ <- Database.autoCommit(PersonQueries.setup) + _ <- Database.autoCommitStream(stream(PersonQueries.insert(buffy), 2) ++ PersonQueries.failingStream ++ stream(PersonQueries.insert(buffy), 2)).runDrain.flip + connectionCount <- Database.autoCommit(connectionCountQuery) + } yield assertTrue(connectionCount == 1) // only the current connection + } + } + + private val testStreamDoesNotLoadAllValues: MySpec = test("stream does not load all values when reading") { + wrap { + for { + _ <- Database.autoCommit(PersonQueries.setup) + _ <- Database.autoCommit(PersonQueries.insert(buffy)) + _ <- Database.autoCommit(PersonQueries.insert(giles)) + failingOnSecondStream = tzioStream { + sql"""SELECT given_name, family_name FROM person""".query[Person] + .streamWithChunkSize(1) // make sure it's read one by one + .map { p => + if (p.givenName == "Rupert") throw new IllegalStateException // fail on the second one, if it's ever read + else p + } + } + dbStream = Database.autoCommitStream(failingOnSecondStream).take(1) // only keep one + result <- dbStream.runCollect + } yield assertTrue(result == Chunk(buffy)) + } + } + +} diff --git a/examples/src/test/scala/samples/doobie/LayeredAppStreaming.scala b/examples/src/test/scala/samples/doobie/LayeredAppStreaming.scala index b81e741..fa6aa2c 100644 --- a/examples/src/test/scala/samples/doobie/LayeredAppStreaming.scala +++ b/examples/src/test/scala/samples/doobie/LayeredAppStreaming.scala @@ -6,7 +6,7 @@ import samples.{Conf, ConnectionPool, Person} import zio._ import zio.stream._ -/** Same as LayeredApp, but using Doobie's stream (converted into ZIO strem). */ +/** Same as LayeredApp, but using Doobie's stream (converted into ZIO stream). */ // scalastyle:off magic.number object LayeredAppStreaming extends zio.ZIOAppDefault { @@ -28,7 +28,7 @@ object LayeredAppStreaming extends zio.ZIOAppDefault { /** Main code for the application. Results in a big ZIO depending on the AppEnv. */ def myApp(): ZIO[AppEnv, DbException, List[Person]] = { - val queries: ZIO[Connection with AppEnv, DbException, List[Person]] = for { + val initQueries: ZIO[Connection with AppEnv, DbException, Unit] = for { _ <- Console.printLine("Creating the table").orDie _ <- PersonQueries.setup _ <- Console.printLine("Inserting the trio").orDie @@ -37,17 +37,20 @@ object LayeredAppStreaming extends zio.ZIOAppDefault { _ <- PersonQueries.insert(Person("Alexander", "Harris")) _ <- PersonQueries.insert(Person("Rupert", "Giles")) // insert one more! _ <- Console.printLine("Reading the trio").orDie - trio <- { - val stream: ZStream[PersonQueries with Connection, DbException, Person] = PersonQueries.listStream.take(3) - stream.run(ZSink.foldLeft(List[Person]()) { (ps, p) => p :: ps }) - } - } yield trio.reverse + } yield () + + val resultQueryStream: ZStream[Connection with AppEnv, DbException, Person] = PersonQueries.listStream.take(3) // take only the first 3 ZIO.serviceWithZIO[Conf] { conf => // if this implicit is not provided, tranzactio will use Conf.dbRecovery instead implicit val errorRecovery: ErrorStrategiesRef = conf.alternateDbRecovery - Database.transactionOrWiden(queries) + val init: ZIO[Database with AppEnv, DbException, Unit] = Database.transactionOrWiden(initQueries) + val resultsStream: ZStream[Database with AppEnv, DbException, Person] = Database.transactionOrDieStream(resultQueryStream) + val results: ZIO[Database with AppEnv, DbException, List[Person]] = resultsStream.runCollect.map(_.toList) + init *> results } + + } } diff --git a/examples/src/test/scala/samples/doobie/PersonQueries.scala b/examples/src/test/scala/samples/doobie/PersonQueries.scala index e58b367..e5609d5 100644 --- a/examples/src/test/scala/samples/doobie/PersonQueries.scala +++ b/examples/src/test/scala/samples/doobie/PersonQueries.scala @@ -19,6 +19,8 @@ object PersonQueries { def insert(p: Person): TranzactIO[Unit] val failing: TranzactIO[Unit] + + val failingStream: TranzactIOStream[Unit] } val live: ULayer[PersonQueries] = ZLayer.succeed(new Service { @@ -49,6 +51,10 @@ object PersonQueries { sql"""INSERT INTO nonexisting (stuff) VALUES (1)""" .update.run.map(_ => ()) } + + val failingStream: TranzactIOStream[Unit] = tzioStream { + sql"""SELECT * FROM nonexisting""".query[Unit].stream + } }) val test: ULayer[PersonQueries] = ZLayer.succeed(new Service { @@ -62,6 +68,8 @@ object PersonQueries { def insert(p: Person): TranzactIO[Unit] = ZIO.succeed(()) val failing: TranzactIO[Unit] = ZIO.fail(DbException.Wrapped(new RuntimeException)) + + val failingStream: TranzactIOStream[Unit] = ZStream.fail(DbException.Wrapped(new RuntimeException)) }) def setup: ZIO[PersonQueries with Connection, DbException, Unit] = ZIO.serviceWithZIO[PersonQueries](_.setup) @@ -74,5 +82,7 @@ object PersonQueries { val failing: ZIO[PersonQueries with Connection, DbException, Unit] = ZIO.serviceWithZIO[PersonQueries](_.failing) + val failingStream: ZStream[PersonQueries with Connection, DbException, Unit] = ZStream.serviceWithStream[PersonQueries](_.failingStream) + }