Skip to content

Commit

Permalink
coroutines rework #2
Browse files Browse the repository at this point in the history
  • Loading branch information
Tapac committed Jul 2, 2019
1 parent 1bd64eb commit 69e5f0d
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 89 deletions.
3 changes: 2 additions & 1 deletion exposed/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ val dialect: String by project
dependencies {
api(kotlin("stdlib"))
api(kotlin("reflect"))
implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.0.1")
implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.3.0-M1")
api("joda-time", "joda-time", "2.10.2")
api("org.slf4j", "slf4j-api", "1.7.25")
implementation("com.h2database", "h2", "1.4.199")
Expand All @@ -26,6 +26,7 @@ dependencies {
testImplementation("log4j", "log4j", "1.2.17")
testImplementation("junit", "junit", "4.12")
testImplementation("org.hamcrest", "hamcrest-library", "1.3")
testImplementation("org.jetbrains.kotlinx","kotlinx-coroutines-debug", "1.3.0-M1")

testImplementation("com.opentable.components", "otj-pg-embedded", "0.12.0")
testImplementation("mysql", "mysql-connector-mxj", "5.0.12")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.jetbrains.exposed.sql

import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import org.jetbrains.exposed.dao.Entity
import org.jetbrains.exposed.dao.EntityCache
import org.jetbrains.exposed.dao.EntityHook
Expand Down
Original file line number Diff line number Diff line change
@@ -1,99 +1,107 @@
package org.jetbrains.exposed.sql.transactions.experimental

import kotlinx.coroutines.*
import kotlinx.coroutines.selects.SelectClause1
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.sync.withLock
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.*
import org.jetbrains.exposed.sql.transactions.rollbackLoggingException
import java.lang.Exception
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext

suspend fun <T> suspendedTransaction(context: CoroutineContext? = null, db: Database? = null, statement: suspend Transaction.() -> T): T {
return suspendedTransactionAsync(context, db, statement = statement).await()
}
internal class TransactionCoroutineElement(val outerTransaction: Transaction?, val newTransaction: Transaction, val manager: TransactionManager) : ThreadContextElement<Transaction> /*by original*/ {
override val key: CoroutineContext.Key<TransactionCoroutineElement> = Companion

suspend fun <T> Transaction.suspendedTransaction(context: CoroutineContext? = null, statement: suspend Transaction.() -> T): T {
val suspendedTransactionAsyncInternal = suspendedTransactionAsyncInternal(context, db, currentTransaction = this, statement = statement)
return suspendedTransactionAsyncInternal.await()
}
private var prevManager : TransactionManager? = null

class TransactionResult<T>(internal val transaction: Transaction,
internal val deferred: Deferred<T>,
private val selectClause: SelectClause1<T>,
internal var closeTransaction : Boolean = true) : Deferred<T> by deferred, SelectClause1<T> by selectClause {
override fun updateThreadContext(context: CoroutineContext): Transaction {
prevManager = TransactionManager.currentThreadManager.get()
(manager as? ThreadLocalTransactionManager)?.let {
it.threadLocal.set(newTransaction)
TransactionManager.currentThreadManager.set(manager)
}
return newTransaction
}

override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle {
if (closeTransaction) {
try {
transaction.commit()
transaction.closeLoggingException { exposedLogger.warn("Transaction close failed: ${it.message}. Statement: ${transaction.currentStatement}", it) }
} catch (e: Exception) {
transaction.rollbackLoggingException { exposedLogger.warn("Transaction rollback failed: ${it.message}. Statement: ${transaction.currentStatement}", it) }
throw e
override fun restoreThreadContext(context: CoroutineContext, oldState: Transaction) {
require(newTransaction == oldState)
if (outerTransaction == null) {
with(newTransaction) {
try {
commit()
try {
currentStatement?.let {
it.close()
currentStatement = null
}
closeExecutedStatements()
} catch (e: Exception) {
exposedLogger.warn("Statements close failed", e)
}
closeLoggingException { exposedLogger.warn("Transaction close failed: ${it.message}. Statement: $currentStatement", it) }
} catch (e: Exception) {
rollbackLoggingException { exposedLogger.warn("Transaction rollback failed: ${it.message}. Statement: $currentStatement", it) }
throw e
}
}
(manager as? ThreadLocalTransactionManager)?.threadLocal?.remove()

} else {
(manager as? ThreadLocalTransactionManager)?.threadLocal?.set(outerTransaction)
}
prevManager?.let {
TransactionManager.currentThreadManager.set(it)
} ?: TransactionManager.currentThreadManager.remove()
}

companion object : CoroutineContext.Key<TransactionCoroutineElement> {
fun forTLManager(manager: TransactionManager, currentTransaction: Transaction?, newTransaction: Transaction) : ThreadContextElement<Transaction> {
return TransactionCoroutineElement(currentTransaction, newTransaction, manager)
}
return deferred.invokeOnCompletion(handler)
}
}

suspend fun <T> suspendedTransaction(context: CoroutineDispatcher? = null, db: Database? = null, statement: suspend Transaction.() -> T): T {
return suspendedTransactionAsyncInternal(context, db, TransactionManager.currentOrNull(), false, statement).await()
}

suspend fun <T> Transaction.runSuspended(context: CoroutineDispatcher? = null, statement: suspend Transaction.() -> T): T {
val tx = this
return suspendedTransactionAsyncInternal(context, db, tx, false, statement = statement).await()
}

class TransactionResult<T>(internal val transaction: Transaction,
internal val deferred: Deferred<T>) : Deferred<T> by deferred

suspend fun <T, R> TransactionResult<T>.andThen(statement: suspend Transaction.(T) -> R) : TransactionResult<R> {
val currentAsync = this
return suspendedTransactionAsyncInternal(currentTransaction = transaction) {
currentAsync.closeTransaction = false
return suspendedTransactionAsyncInternal(currentTransaction = currentAsync.transaction, lazy = false) {
statement(currentAsync.deferred.await())
}
}

suspend fun <T> suspendedTransactionAsync(context: CoroutineContext? = null, db: Database? = null,
suspend fun <T> suspendedTransactionAsync(context: CoroutineDispatcher? = null, db: Database? = null,
useOuterTransactionIfAccessible: Boolean = true, statement: suspend Transaction.() -> T) : TransactionResult<T> {
val currentTransaction = TransactionManager.currentOrNull().takeIf { useOuterTransactionIfAccessible }
return suspendedTransactionAsyncInternal(context, db, currentTransaction, statement)
return suspendedTransactionAsyncInternal(context, db, currentTransaction, false, statement)
}

private suspend fun <T> suspendedTransactionAsyncInternal(context: CoroutineContext? = null, db: Database? = null,
currentTransaction: Transaction?, statement: suspend Transaction.() -> T) : TransactionResult<T> = coroutineScope {
private suspend fun <T> suspendedTransactionAsyncInternal(context: CoroutineDispatcher? = null, db: Database? = null,
currentTransaction: Transaction?, lazy: Boolean,
statement: suspend Transaction.() -> T) : TransactionResult<T> {
val manager = (currentTransaction?.db ?: db)?.let { TransactionManager.managerFor(it) }
?: TransactionManager.manager
val threadLocalManager = manager as? ThreadLocalTransactionManager

if (currentTransaction != null) {
val scope = (if (threadLocalManager != null) {
(context ?: coroutineContext) + threadLocalManager.threadLocal.asContextElement(currentTransaction)
} else (context ?: coroutineContext)) + TransactionManager.currentThreadManager.asContextElement(manager)
val asyncLazy = asyncTxInvocation(scope, currentTransaction, statement)
TransactionResult(currentTransaction, asyncLazy, asyncLazy as SelectClause1<T>, false)
} else {
val tx = manager.newTransaction(manager.defaultIsolationLevel)
val scope = (if (threadLocalManager != null) {
(context ?: coroutineContext) + threadLocalManager.threadLocal.asContextElement(tx)
} else (context ?: coroutineContext)) + TransactionManager.currentThreadManager.asContextElement()
val asyncLazy = asyncTxInvocation(scope, tx, statement)
TransactionResult(tx, asyncLazy, asyncLazy as SelectClause1<T>, false)
}
}

private fun <T> CoroutineScope.asyncTxInvocation(scope: CoroutineContext, tx: Transaction, statement: suspend Transaction.() -> T): Deferred<T> {
return async(context = scope, start = CoroutineStart.LAZY) {
tx.suspendedMutex.withLock {
val tx = currentTransaction ?: manager.newTransaction(manager.defaultIsolationLevel)
val element = TransactionCoroutineElement.forTLManager(manager, currentTransaction, tx)
val contextNew = (context ?: coroutineContext) + element
return withContext(contextNew) {
TransactionResult(tx, async {
try {
tx.statement()
} catch (e: Throwable) {
tx.rollbackLoggingException { exposedLogger.warn("Transaction rollback failed: ${it.message}. Statement: ${tx.currentStatement}", it) }
throw e
} finally {
try {
tx.currentStatement?.let {
it.close()
tx.currentStatement = null
}
tx.closeExecutedStatements()
} catch (e: Exception) {
exposedLogger.warn("Statements close failed", e)
}
}
}
})
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package org.jetbrains.exposed.sql.tests.h2

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.tests.DatabaseTestsBase
import org.jetbrains.exposed.sql.tests.TestDB
import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransaction
import org.junit.Test
import kotlin.test.assertFailsWith

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
package org.jetbrains.exposed.sql.tests.mysql

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.tests.DatabaseTestsBase
import org.jetbrains.exposed.sql.tests.TestDB
import org.jetbrains.exposed.sql.tests.h2.H2Tests
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransaction
import org.jetbrains.exposed.test.utils.RepeatableTest
import org.jetbrains.exposed.test.utils.RepeatableTestRule
import org.junit.Rule
import org.junit.Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,50 @@ package org.jetbrains.exposed.sql.tests.shared
import kotlinx.coroutines.*
import kotlinx.coroutines.debug.junit4.CoroutinesTimeout
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.tests.DatabaseTestsBase
import org.jetbrains.exposed.sql.tests.TestDB
import org.jetbrains.exposed.sql.tests.h2.H2Tests
import org.jetbrains.exposed.sql.transactions.experimental.andThen
//import org.jetbrains.exposed.sql.transact/ions.experimental.andThen
import org.jetbrains.exposed.sql.transactions.experimental.runSuspended
import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransaction
import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransactionAsync
import org.jetbrains.exposed.test.utils.RepeatableTest
import org.junit.Rule
import org.junit.Test
import kotlin.test.assertEquals

class CoroutineTests : DatabaseTestsBase() {

@Rule
@JvmField
val timeout = CoroutinesTimeout.seconds(10)
val timeout = CoroutinesTimeout.seconds(100)

@Test @RepeatableTest(10)
fun suspendedTx() {
withDb {
runBlocking {
SchemaUtils.create(H2Tests.Testing)
commit()

val launchResult = suspendedTransaction(Dispatchers.IO) {
val job = suspendedTransaction(Dispatchers.IO) {
H2Tests.Testing.insert{}

launch(Dispatchers.Default) {
suspendedTransaction {
runSuspended {
assertEquals(1, H2Tests.Testing.select { H2Tests.Testing.id.eq(1) }.singleOrNull()?.getOrNull(H2Tests.Testing.id))
}
}
}

val result = suspendedTransaction(Dispatchers.Default) {

val result = runSuspended(Dispatchers.Default) {
job.join()
H2Tests.Testing.select { H2Tests.Testing.id.eq(1) }.single()[H2Tests.Testing.id]
}

assertEquals(1, result)
launchResult.join()

SchemaUtils.drop(H2Tests.Testing)
}
}
Expand All @@ -59,18 +62,22 @@ class CoroutineTests : DatabaseTestsBase() {
H2Tests.Testing.insert{}

launch(Dispatchers.Default) {
suspendedTransaction {
runSuspended {
assertEquals(1, H2Tests.Testing.select { H2Tests.Testing.id.eq(1) }.singleOrNull()?.getOrNull(H2Tests.Testing.id))
}
commit()
}
}

val result = suspendedTransactionAsync(Dispatchers.Default) {
val result = suspendedTransactionAsync(Dispatchers.Default, useOuterTransactionIfAccessible = true) {
launchResult.await().join()
H2Tests.Testing.select { H2Tests.Testing.id.eq(1) }.single()[H2Tests.Testing.id]
}.andThen {
assertEquals(1, it)
true
}

launchResult.await()
assertEquals(1, result.await())
assertEquals(true, result.await())
SchemaUtils.drop(H2Tests.Testing)
}
}
Expand Down
2 changes: 1 addition & 1 deletion spring-transaction/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies {
api(project(":exposed"))
api("org.springframework", "spring-jdbc", "5.1.7.RELEASE")
api("org.springframework", "spring-context", "5.1.7.RELEASE")
implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.0.1")
implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.3.0-M1")
implementation("com.h2database", "h2", "1.4.199")

testImplementation(kotlin("test-junit"))
Expand Down

0 comments on commit 69e5f0d

Please sign in to comment.