Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ZStream support #49

Closed
wants to merge 1 commit into from

Conversation

xQwexx
Copy link
Contributor

@xQwexx xQwexx commented Nov 7, 2023

Hi I proposing this solution for the #35 so enable tranzactio to work with ZStream more easily
What are your thoughts about it, could this be okay?
I added some tests to make sure the transaction boundaries are correct.
In the API I used the ZIO one as inspiration so I have all of them just working with ZStream.

Copy link
Owner

@gaelrenoux gaelrenoux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me long enough, but I've finally gotten into taking a look into this. Thank you a lot for providing this!

There are few minor changes I'll add later (basic reformatting, adding some comments and documentation). However, there are a few more important points I'd like your input on.

def runTransactionStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] = {

ZStream.acquireReleaseWith(openConnection.tap(c => setAutoCommit(c, autoCommit = false)).mapError(Left(_)))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By setting the auto-commit in a tap in the acquisition step (instead of doing it later, as is done in runTransaction), you're introducing a potential issue: if setting the auto-commit fails, the connection won't get closed, as the acquisition step won't have ended with a success.

Comment on lines +53 to +55
{ c => commitConnection(c).tapEither(_ => closeConnection(c)).orDie }
.flatMap(c => task(c).mapError(Right(_))
.tapError(_ => (if (commitOnFailure) commitConnection(c) else rollbackConnection(c)).mapError(Left(_))))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why you use a different approach over the one in runTransaction here?

As far as I can see, it behaves the same in case of a success (the transaction is committed). However, on a failure, the transaction is first rollbacked (line 55), then commited (line 53). It shouldn't have any adverse effect semantically, but it can impact performance.

=> Unless there's an advantage I'm missing, I prefer my approach: commit on success, rollback on failure (using tapBoth instead of tapError), and close the connection in the release step.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I think I get it: the tap argument is run for every entry in the stream, not a the end of the stream, duh.

I'm playing a bit with it, and I think I actually like having two levels of acquireReleaseWith: one to get the connections, handling only opening and closing the connection, and a second one to either commit or rollback depending on what happened.

{ c => commitConnection(c).tapEither(_ => closeConnection(c)).orDie }
.flatMap(c => task(c).mapError(Right(_))
.tapError(_ => (if (commitOnFailure) commitConnection(c) else rollbackConnection(c)).mapError(Left(_))))
.catchSomeCause { case Cause.Die(error: DbException, _) => ZStream.fail(Left(error)) }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you want to catch the Die here? As far as I can see, at this point, the only reason to have the stream die with a DbException is if the connection could not be closed.

If that happens, your whole application is pretty much screwed. There's nothing you can do to recover from that apart from stopping the process and restarting. If there's a very corner case when someone might want to catch that and do something, they can catch the Die themselves, but that shouldn't be the default behavior.

Copy link
Contributor Author

@xQwexx xQwexx Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a bug in ZIO framework were its not worked correctly with the resource,
I needed to play with it til I realised it, I also made the Connection to a private layer which could be used in both ZIO and ZStream

    private def connectionLayer(commitOnFailure: => Boolean = false)(implicit
        errorStrategies: ErrorStrategiesRef,
        trace: Trace
    ): ZLayer[Any, DbException, Connection] =
      ZLayer
        .scoped {
          ZIO.acquireRelease(openConnection)(c => closeConnection(c).orDie)
        }
        .flatMap(env =>
          ZLayer
            .scoped {
              ZIO
                .acquireReleaseExit(
                  setAutoCommit(env.get, autoCommit = false).as(env.get)
                ) {
                  case (c, Exit.Success(_)) => commitConnection(c).orDie
                  case (c, Exit.Failure(_)) =>
                    (if (commitOnFailure) commitConnection(c)
                     else rollbackConnection(c)).orDie
                }
            }
        )
      ZStream
        .serviceWithStream[Connection](c => task(c).mapError(Right(_)))
        .provideSomeLayer[R](connectionLayer(commitOnFailure).mapError(Left(_)))
      ZIO
        .serviceWith[Connection](c => task(c).mapError(Right(_)))
        .provideSomeLayer[R](connectionLayer(commitOnFailure).mapError(Left(_)))

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I like the idea of having a single transaction-handler for both the ZIO and ZStream methods 👍. I want to add an additional distinction though: if the ZIO or ZStream dies (not just an error), we shouldn't try to either commit or rollback. Only closing the connection is the way to go I think.

I'll try and integrate that next week-end.

(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] =
ZStream.acquireReleaseWith(openConnection.mapError(Left(_)))(closeConnection(_).orDie).flatMap { (c: Connection) =>
ZStream.fromZIO(setAutoCommit(c, autoCommit = true).mapError(Left(_)))
.zipRight {task(c).mapError(Right(_))}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be crossRight or we'll only keep the first element.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think executing the setAutoCommit to true is enough ones, it should stay the same for the whole connection

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes on the auto-commit sides, but it also only keeps one element on the task side (so only the first entry in the ZStream you've passed as a parameter). I've added some tests, and with zipRight it crashes because the only the first entry in the stream is actually found in the result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay then you are are right I thought it was merging the two stream in case of zipRight as well just not executing the first before each element, only before start executing the stream. Then in this case what you think about a flatMap? I think it would be enough to execute the auto commit command ones.

@gaelrenoux
Copy link
Owner

I'll try to finish working on this next week-end, I don't have much time during the weeks.

@gaelrenoux
Copy link
Owner

OK, so I've spent a lot of time trying to make this work, and kept running into issues when adding tests. I think I finally understand why: providing a Connection within the stream cannot actually work. Here's the problem.

When I try to provide a Connection to a ZStream, I must use the acquire/release mechanism, to make sure the transaction is committed or rollbacked, and the connection is closed. However, as indicated in ZIO's documentation, the release is performed "after the stream is consumed". This means that the actual commit will NOT be done until the stream is actually consumed, taking into account everything that happened in the stream. Take this example:

    val queryStreamA: ZStream[Connection, DbException, String] = ???
    val streamA: ZStream[Database, DbException, String] = Database.transactionOrWidenStream(queryStreamA)

    val queryStreamB: ZStream[Connection, DbException, String] = ???
    val streamB: ZStream[Database, DbException, String] = Database.transactionOrWidenStream(queryStreamB)

    val result: ZIO[Database, DbException, Chunk[String]] = (streamA ++ streamB).runCollect

In case any failure happens when running streamB, then both transactions A and B will be rollbacked, as the commit for transaction A would only happen when consuming the stream in runCollect. This is definitely NOT what the developer is trying to do here.

Therefore, I don't think there is any purpose in having a way to convert a ZStream[Connection, E, A] into a ZStream[Database, E, A]. I can't do anything that has a different semantic from just consuming the stream into a ZIO, then using the existing methods for ZIO[Connection, E, A] . Maybe in the future, if ZStream allows for release effects to happen in the middle of the stream, not just at the end?

Don't hesitate to correct me if you think I'm missing something. I'm going to keep my experimentation branch for a little while in case someone comes with an idea, but in the meantime I'm not going to spend more time on this.

@xQwexx
Copy link
Contributor Author

xQwexx commented Feb 21, 2024

@gaelrenoux Can you add the full test because for me it is working if I replace the runDrain to runCollect.
It will result the first stream committed the second one did not.

    test("Test transaction boundary - fail outside - commit progress")(
      for {
        database <- ZIO.service[Database]
        streamPersons: ZStream[PersonQueries, Either[DbException, DbException], Person] = database.transactionStream(PersonQueries.listStream)
        insertStream = database.transactionStream(ZStream.fromZIO(insertRow).forever).mapError(_.merge)
        stream = insertStream.take(2) ++ failingStream ++ insertStream
        _ <- (insertStream.take(2) ++ failingStream ++ insertStream).runCollect.exit
//        _ <- stream.runDrain.exit

        persons <- streamPersons.run(ZSink.foldLeft(List[Person]()) { (ps, p) => p :: ps }).mapError(_.merge)
      } yield assertTrue(persons.length == 2)
    ),

@gaelrenoux
Copy link
Owner

I've looked at it again, and actually the issue was coming from one of my changes. I went back to your initial version for runTransactionStream, and the tests that were failing are now working. However, I still have other tests failing, even with your version, and some issues I don't know how to solve. Essentially, I can't wrap my head around how acquireReleaseWith is working and what its semantics are.

Let me recap.

On the branch zstream-support-0

It's mostly your version. I've done some reformatting, but mostly I've added tests and fixed the crossRight/zipRight bug mentioned above. All tests pass, but there are still some issues (see next point).

On the branch zstream-support-1

It starts on branch zstream-support-0. The first commit (991fdec) adds some more tests, which are failing. Errors aren't being reported as they should: it looks like catchSomeCause isn't catching everything we think it should. I've tried some changes, which didn't solve the issue (following commits).

On the last commit of the branch, I've added a separate test (BasicTest) which contains side-to-side some ConnectionSource tests, and an attempt to replicate them using just ZStreams. It seems that defects in the release step are not always caught.

On the branch zstream-support-2
I still want to get the auto-commit part out of the acquire step of getting the connection. Currently, if the auto-commit fails, it will prevent the acquire step from succeeding, which means there won't be a release step (so the connection will not be closed).

On the branch zstream-support-2, I've tried to move to two levels of acquireReleaseWith: one to handle opening/closing the connection, and one to handle the commit/rollback part. However, that's when the other tests started breaking. Not sure why, but this can wait until the issue on zstream-support-1 is solved.

@xQwexx
Copy link
Contributor Author

xQwexx commented Mar 6, 2024

So I had some time today and looked at the zstream-support-1 branch
I think the problem is that the ZStream could not catch the dies which happen in the resource closing part (or it could not remap it). I think the dies are only possible to handled at the call side so I think its not possible to commit at the resource level with the streams.
(I also think catchSomeCause needed because there was a bug in ZIO framework were its was unable to compile it before I had that change but it is fixed now.)

I found this solution make all of the ConnectionSourceTest pass:

    def runTransactionStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false)
      (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] = {
      ZStream
        .acquireReleaseWith(openConnection.tap(c => setAutoCommit(c, autoCommit = false)).mapError(Left(_))) (
          closeConnection(_).orDie
        )
        .flatMap { (c: Connection) =>
          // No commit, no rollback in case of a defect, just close the connection
          task(c).mapError(Right(_)).tapBoth(
              error => (if (commitOnFailure) commitConnection(c) else rollbackConnection(c))
                .mapErrorCause(db => Cause.Then(Cause.fail(error), db.map(Left(_)))),
              _ => commitConnection(c).mapError(Left(_))
            )
        }
    }

EDIT: I think is more precise:

    def runTransactionStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false)
                                     (implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] = {
      ZStream
        .acquireReleaseWith(openConnection.tap(c => setAutoCommit(c, autoCommit = false)).mapError(Left(_))) (
          closeConnection(_).orDie
        )
        .flatMap { (c: Connection) =>
          // No commit, no rollback in case of a defect, just close the connection
          ZStream.fromIterableZIO(task(c).runCollect).mapError(Right(_)).tapBoth(
            error => (if (commitOnFailure) commitConnection(c) else rollbackConnection(c))
              .mapErrorCause(db => Cause.Then(Cause.fail(error), db.map(Left(_)))),
            _ => commitConnection(c).mapError(Left(_))
          )
        }
    }

@gaelrenoux
Copy link
Owner

Your first proposition doesn't work: with tapBoth, the commit is done on every single element of the stream, instead of being done once at the end (see failing test data rollbacked on transaction failure if commitOnFailure=false in DoobieStreamIT).

On your second proposal, doing the runCollect and then wrapping it back into a stream defeats the entire purpose of runTransactionStream. It's completely equivalent to just using runTransaction(queriesStream.runCollect) (and then wrapping it again in a stream if you want).

@gaelrenoux
Copy link
Owner

I think I have a working compromise, though. Given that ZStream's API doesn't allow us to catch the defects, let's embrace that. In branch zstream-support-1bis, I've simply implemented transactionOrDieStream, instead of trying to implement a transactionStream that would return connection errors as Lefts. Then, it's up to the caller to catch defects that are DbExceptions if that's relevant to them.

I've asked on the ZIO Discord for precisions about how ZStream.acquireReleaseWith handle errors in the release clause. If there isn't a good solution for what we were trying to do, I'll go with the transactionOrDieStream solution.

@gaelrenoux
Copy link
Owner

Superceded by #53, which I've merged in. Thank you for your help @xQwexx!

@gaelrenoux gaelrenoux closed this Mar 24, 2024
@xQwexx xQwexx deleted the zstream-support branch March 27, 2024 08:50
@xQwexx
Copy link
Contributor Author

xQwexx commented Mar 27, 2024

Its awesome 🚀 thank you for the work 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants