Skip to content

Commit

Permalink
Fix java.sql.SQLException corner case
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Sep 13, 2019
1 parent 21c30d7 commit f0e72fd
Showing 1 changed file with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package io.getquill.context.monix
import java.io.Closeable
import java.sql.{ Array => _, _ }

import cats.effect.ExitCase
import io.getquill.{ NamingStrategy, ReturnAction }
import io.getquill.context.StreamingContext
import io.getquill.context.jdbc.JdbcContextBase
import io.getquill.context.sql.idiom.SqlIdiom
import io.getquill.util.ContextLogger
import javax.sql.DataSource
import monix.eval.Task
import monix.eval.{ Task, TaskLocal }
import monix.execution.misc.Local
import monix.reactive.Observable

Expand Down Expand Up @@ -119,38 +120,37 @@ abstract class MonixJdbcContext[Dialect <: SqlIdiom, Naming <: NamingStrategy](
wrapClose(conn.setAutoCommit(wasAutocommit))
}

def transaction[A](f: Task[A]): Task[A] = {
val dbEffects = for {
result <- currentConnection() match {
case Some(_) => f // Already in a transaction
case None =>
wrap(dataSource.getConnection).bracket { conn =>
withCloseBracket(conn, conn => {
withAutocommitBracket(conn, conn => {
wrap(conn).flatMap { conn =>
currentConnection.update(Some(conn))
f.onCancelRaiseError(new IllegalStateException(
"The task was cancelled in the middle of a transaction."
)).doOnFinish {
case Some(error) =>
conn.rollback()
Task.raiseError(error)
case None =>
wrap(conn.commit())
}
def transaction[A](f: Task[A]): Task[A] = effect.boundary {
Task.suspend(
// Local read is side-effecting, need suspend
currentConnection().map(_ => f).getOrElse {
effect.wrap {
val c = dataSource.getConnection()
c.setAutoCommit(false)
c
}.bracket { conn =>
TaskLocal.wrap(Task.pure(currentConnection))
.flatMap { tl =>
// set local for the tightest scope possible, and commit/rollback/close
// only when nobody can touch the connection anymore
// also, TaskLocal.bind is more hygienic then manual set/unset
tl.bind(Some(conn))(f).guaranteeCase {
case ExitCase.Completed => effect.wrap(conn.commit())
case ExitCase.Error(e) => effect.wrap {
conn.rollback()
throw e
}
})
})
} { conn =>
wrap(currentConnection.update(None))
case ExitCase.Canceled => effect.wrap(conn.rollback())
}
}
} { conn =>
effect.wrapClose {
conn.setAutoCommit(true) // Do we need this if we're closing anyway?
conn.close()
}
}
}
} yield result

boundary {
dbEffects
.executeWithOptions(_.enableLocalContextPropagation)
}
).executeWithOptions(_.enableLocalContextPropagation)
}

// Override with sync implementation so will actually be able to do it.
Expand Down

0 comments on commit f0e72fd

Please sign in to comment.