Skip to content

Commit

Permalink
Documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
gaelrenoux committed Mar 24, 2024
1 parent 22fa5e4 commit 93e2985
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,26 @@ val es: ErrorStrategies =



### Streaming

When the wrapped framework handle streaming, you can convert the framework's stream to a `ZStream` using `tzioStream`.
To provide a `Connection` to the ZIO stream, you can either consume the stream into a ZIO first (then use the same functions as above), or use the `Database`'s streaming methods.

The methods `transactionOrDieStream` and `autoCommitStream` work in the same way as the similar, non-stream method.
Note that for transactions, only the `OrDie` variant exists: this is because ZIO's acquire-release mechanism for stream does not allow to pass errors that occur during the acquire-release phase in the error channel.
Defects in the stream due to connection errors can only be caught after the `ZStream` has been consumed into a `ZIO`.

```scala
import io.github.gaelrenoux.tranzactio.doobie._
import zio._
val queryStream: ZStream[Connection, Error, Person] = tzioStream { sql"""SELECT given_name, family_name FROM person""".query[Person].stream }.mapError(transform)
val zStream: ZStream[Database, DbException, Person] = Database.transactionOrDieStream(queryStream)
```

You can see a full example in the `examples` submodule (in `LayeredAppStreaming`).



### Multiple Databases

Some applications use multiple databases.
Expand All @@ -339,7 +359,6 @@ You only need to provide a different marker type for each database you use.

```scala
import io.github.gaelrenoux.tranzactio.doobie._
import javax.sql.DataSource
import zio._

trait Db1
Expand Down
19 changes: 11 additions & 8 deletions examples/src/test/scala/samples/doobie/LayeredAppStreaming.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import samples.{Conf, ConnectionPool, Person}
import zio._
import zio.stream._

/** Same as LayeredApp, but using Doobie's stream (converted into ZIO strem). */
/** Same as LayeredApp, but using Doobie's stream (converted into ZIO stream). */
// scalastyle:off magic.number
object LayeredAppStreaming extends zio.ZIOAppDefault {

Expand All @@ -28,7 +28,7 @@ object LayeredAppStreaming extends zio.ZIOAppDefault {

/** Main code for the application. Results in a big ZIO depending on the AppEnv. */
def myApp(): ZIO[AppEnv, DbException, List[Person]] = {
val queries: ZIO[Connection with AppEnv, DbException, List[Person]] = for {
val initQueries: ZIO[Connection with AppEnv, DbException, Unit] = for {
_ <- Console.printLine("Creating the table").orDie
_ <- PersonQueries.setup
_ <- Console.printLine("Inserting the trio").orDie
Expand All @@ -37,17 +37,20 @@ object LayeredAppStreaming extends zio.ZIOAppDefault {
_ <- PersonQueries.insert(Person("Alexander", "Harris"))
_ <- PersonQueries.insert(Person("Rupert", "Giles")) // insert one more!
_ <- Console.printLine("Reading the trio").orDie
trio <- {
val stream: ZStream[PersonQueries with Connection, DbException, Person] = PersonQueries.listStream.take(3)
stream.run(ZSink.foldLeft(List[Person]()) { (ps, p) => p :: ps })
}
} yield trio.reverse
} yield ()

val resultQueryStream: ZStream[Connection with AppEnv, DbException, Person] = PersonQueries.listStream.take(3) // take only the first 3

ZIO.serviceWithZIO[Conf] { conf =>
// if this implicit is not provided, tranzactio will use Conf.dbRecovery instead
implicit val errorRecovery: ErrorStrategiesRef = conf.alternateDbRecovery
Database.transactionOrWiden(queries)
val init: ZIO[Database with AppEnv, DbException, Unit] = Database.transactionOrWiden(initQueries)
val resultsStream: ZStream[Database with AppEnv, DbException, Person] = Database.transactionOrDieStream(resultQueryStream)
val results: ZIO[Database with AppEnv, DbException, List[Person]] = resultsStream.runCollect.map(_.toList)
init *> results
}


}

}

0 comments on commit 93e2985

Please sign in to comment.