From 69e5f0d05ecc80e6a985f9e37fed04c86e387d9a Mon Sep 17 00:00:00 2001 From: Tapac Date: Wed, 3 Jul 2019 00:54:47 +0300 Subject: [PATCH] coroutines rework #2 --- exposed/build.gradle.kts | 3 +- .../org/jetbrains/exposed/sql/Transaction.kt | 1 + .../transactions/experimental/Suspended.kt | 130 ++++++++++-------- .../jetbrains/exposed/sql/tests/h2/H2Tests.kt | 4 - .../exposed/sql/tests/mysql/MysqlTests.kt | 10 -- .../sql/tests/shared/CoroutineTests.kt | 31 +++-- spring-transaction/build.gradle.kts | 2 +- 7 files changed, 92 insertions(+), 89 deletions(-) diff --git a/exposed/build.gradle.kts b/exposed/build.gradle.kts index df0eccceac..86aed299ea 100644 --- a/exposed/build.gradle.kts +++ b/exposed/build.gradle.kts @@ -16,7 +16,7 @@ val dialect: String by project dependencies { api(kotlin("stdlib")) api(kotlin("reflect")) - implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.0.1") + implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.3.0-M1") api("joda-time", "joda-time", "2.10.2") api("org.slf4j", "slf4j-api", "1.7.25") implementation("com.h2database", "h2", "1.4.199") @@ -26,6 +26,7 @@ dependencies { testImplementation("log4j", "log4j", "1.2.17") testImplementation("junit", "junit", "4.12") testImplementation("org.hamcrest", "hamcrest-library", "1.3") + testImplementation("org.jetbrains.kotlinx","kotlinx-coroutines-debug", "1.3.0-M1") testImplementation("com.opentable.components", "otj-pg-embedded", "0.12.0") testImplementation("mysql", "mysql-connector-mxj", "5.0.12") diff --git a/exposed/src/main/kotlin/org/jetbrains/exposed/sql/Transaction.kt b/exposed/src/main/kotlin/org/jetbrains/exposed/sql/Transaction.kt index 951112e9e7..145df157d6 100644 --- a/exposed/src/main/kotlin/org/jetbrains/exposed/sql/Transaction.kt +++ b/exposed/src/main/kotlin/org/jetbrains/exposed/sql/Transaction.kt @@ -1,6 +1,7 @@ package org.jetbrains.exposed.sql import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.Semaphore import org.jetbrains.exposed.dao.Entity import org.jetbrains.exposed.dao.EntityCache import org.jetbrains.exposed.dao.EntityHook diff --git a/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/experimental/Suspended.kt b/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/experimental/Suspended.kt index ce8b14ddc9..b50a8ea702 100644 --- a/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/experimental/Suspended.kt +++ b/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/experimental/Suspended.kt @@ -1,99 +1,107 @@ package org.jetbrains.exposed.sql.transactions.experimental import kotlinx.coroutines.* -import kotlinx.coroutines.selects.SelectClause1 -import kotlinx.coroutines.selects.SelectInstance -import kotlinx.coroutines.sync.withLock import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.transactions.* import org.jetbrains.exposed.sql.transactions.rollbackLoggingException import java.lang.Exception -import java.util.* import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.coroutineContext -suspend fun suspendedTransaction(context: CoroutineContext? = null, db: Database? = null, statement: suspend Transaction.() -> T): T { - return suspendedTransactionAsync(context, db, statement = statement).await() -} +internal class TransactionCoroutineElement(val outerTransaction: Transaction?, val newTransaction: Transaction, val manager: TransactionManager) : ThreadContextElement /*by original*/ { + override val key: CoroutineContext.Key = Companion -suspend fun Transaction.suspendedTransaction(context: CoroutineContext? = null, statement: suspend Transaction.() -> T): T { - val suspendedTransactionAsyncInternal = suspendedTransactionAsyncInternal(context, db, currentTransaction = this, statement = statement) - return suspendedTransactionAsyncInternal.await() -} + private var prevManager : TransactionManager? = null -class TransactionResult(internal val transaction: Transaction, - internal val deferred: Deferred, - private val selectClause: SelectClause1, - internal var closeTransaction : Boolean = true) : Deferred by deferred, SelectClause1 by selectClause { + override fun updateThreadContext(context: CoroutineContext): Transaction { + prevManager = TransactionManager.currentThreadManager.get() + (manager as? ThreadLocalTransactionManager)?.let { + it.threadLocal.set(newTransaction) + TransactionManager.currentThreadManager.set(manager) + } + return newTransaction + } - override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle { - if (closeTransaction) { - try { - transaction.commit() - transaction.closeLoggingException { exposedLogger.warn("Transaction close failed: ${it.message}. Statement: ${transaction.currentStatement}", it) } - } catch (e: Exception) { - transaction.rollbackLoggingException { exposedLogger.warn("Transaction rollback failed: ${it.message}. Statement: ${transaction.currentStatement}", it) } - throw e + override fun restoreThreadContext(context: CoroutineContext, oldState: Transaction) { + require(newTransaction == oldState) + if (outerTransaction == null) { + with(newTransaction) { + try { + commit() + try { + currentStatement?.let { + it.close() + currentStatement = null + } + closeExecutedStatements() + } catch (e: Exception) { + exposedLogger.warn("Statements close failed", e) + } + closeLoggingException { exposedLogger.warn("Transaction close failed: ${it.message}. Statement: $currentStatement", it) } + } catch (e: Exception) { + rollbackLoggingException { exposedLogger.warn("Transaction rollback failed: ${it.message}. Statement: $currentStatement", it) } + throw e + } } + (manager as? ThreadLocalTransactionManager)?.threadLocal?.remove() + + } else { + (manager as? ThreadLocalTransactionManager)?.threadLocal?.set(outerTransaction) + } + prevManager?.let { + TransactionManager.currentThreadManager.set(it) + } ?: TransactionManager.currentThreadManager.remove() + } + + companion object : CoroutineContext.Key { + fun forTLManager(manager: TransactionManager, currentTransaction: Transaction?, newTransaction: Transaction) : ThreadContextElement { + return TransactionCoroutineElement(currentTransaction, newTransaction, manager) } - return deferred.invokeOnCompletion(handler) } } +suspend fun suspendedTransaction(context: CoroutineDispatcher? = null, db: Database? = null, statement: suspend Transaction.() -> T): T { + return suspendedTransactionAsyncInternal(context, db, TransactionManager.currentOrNull(), false, statement).await() +} + +suspend fun Transaction.runSuspended(context: CoroutineDispatcher? = null, statement: suspend Transaction.() -> T): T { + val tx = this + return suspendedTransactionAsyncInternal(context, db, tx, false, statement = statement).await() +} + +class TransactionResult(internal val transaction: Transaction, + internal val deferred: Deferred) : Deferred by deferred + suspend fun TransactionResult.andThen(statement: suspend Transaction.(T) -> R) : TransactionResult { val currentAsync = this - return suspendedTransactionAsyncInternal(currentTransaction = transaction) { - currentAsync.closeTransaction = false + return suspendedTransactionAsyncInternal(currentTransaction = currentAsync.transaction, lazy = false) { statement(currentAsync.deferred.await()) } } -suspend fun suspendedTransactionAsync(context: CoroutineContext? = null, db: Database? = null, +suspend fun suspendedTransactionAsync(context: CoroutineDispatcher? = null, db: Database? = null, useOuterTransactionIfAccessible: Boolean = true, statement: suspend Transaction.() -> T) : TransactionResult { val currentTransaction = TransactionManager.currentOrNull().takeIf { useOuterTransactionIfAccessible } - return suspendedTransactionAsyncInternal(context, db, currentTransaction, statement) + return suspendedTransactionAsyncInternal(context, db, currentTransaction, false, statement) } -private suspend fun suspendedTransactionAsyncInternal(context: CoroutineContext? = null, db: Database? = null, - currentTransaction: Transaction?, statement: suspend Transaction.() -> T) : TransactionResult = coroutineScope { +private suspend fun suspendedTransactionAsyncInternal(context: CoroutineDispatcher? = null, db: Database? = null, + currentTransaction: Transaction?, lazy: Boolean, + statement: suspend Transaction.() -> T) : TransactionResult { val manager = (currentTransaction?.db ?: db)?.let { TransactionManager.managerFor(it) } ?: TransactionManager.manager - val threadLocalManager = manager as? ThreadLocalTransactionManager - - if (currentTransaction != null) { - val scope = (if (threadLocalManager != null) { - (context ?: coroutineContext) + threadLocalManager.threadLocal.asContextElement(currentTransaction) - } else (context ?: coroutineContext)) + TransactionManager.currentThreadManager.asContextElement(manager) - val asyncLazy = asyncTxInvocation(scope, currentTransaction, statement) - TransactionResult(currentTransaction, asyncLazy, asyncLazy as SelectClause1, false) - } else { - val tx = manager.newTransaction(manager.defaultIsolationLevel) - val scope = (if (threadLocalManager != null) { - (context ?: coroutineContext) + threadLocalManager.threadLocal.asContextElement(tx) - } else (context ?: coroutineContext)) + TransactionManager.currentThreadManager.asContextElement() - val asyncLazy = asyncTxInvocation(scope, tx, statement) - TransactionResult(tx, asyncLazy, asyncLazy as SelectClause1, false) - } -} -private fun CoroutineScope.asyncTxInvocation(scope: CoroutineContext, tx: Transaction, statement: suspend Transaction.() -> T): Deferred { - return async(context = scope, start = CoroutineStart.LAZY) { - tx.suspendedMutex.withLock { + val tx = currentTransaction ?: manager.newTransaction(manager.defaultIsolationLevel) + val element = TransactionCoroutineElement.forTLManager(manager, currentTransaction, tx) + val contextNew = (context ?: coroutineContext) + element + return withContext(contextNew) { + TransactionResult(tx, async { try { tx.statement() } catch (e: Throwable) { tx.rollbackLoggingException { exposedLogger.warn("Transaction rollback failed: ${it.message}. Statement: ${tx.currentStatement}", it) } throw e - } finally { - try { - tx.currentStatement?.let { - it.close() - tx.currentStatement = null - } - tx.closeExecutedStatements() - } catch (e: Exception) { - exposedLogger.warn("Statements close failed", e) - } } - } + }) } } \ No newline at end of file diff --git a/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/h2/H2Tests.kt b/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/h2/H2Tests.kt index 14e6691f6e..64b997d22b 100644 --- a/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/h2/H2Tests.kt +++ b/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/h2/H2Tests.kt @@ -1,12 +1,8 @@ package org.jetbrains.exposed.sql.tests.h2 -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.tests.DatabaseTestsBase import org.jetbrains.exposed.sql.tests.TestDB -import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransaction import org.junit.Test import kotlin.test.assertFailsWith diff --git a/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/mysql/MysqlTests.kt b/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/mysql/MysqlTests.kt index cebebee85a..6cdce79f7e 100644 --- a/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/mysql/MysqlTests.kt +++ b/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/mysql/MysqlTests.kt @@ -1,18 +1,8 @@ package org.jetbrains.exposed.sql.tests.mysql -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import org.jetbrains.exposed.sql.SchemaUtils -import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq -import org.jetbrains.exposed.sql.insert -import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.tests.DatabaseTestsBase import org.jetbrains.exposed.sql.tests.TestDB -import org.jetbrains.exposed.sql.tests.h2.H2Tests import org.jetbrains.exposed.sql.transactions.TransactionManager -import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransaction -import org.jetbrains.exposed.test.utils.RepeatableTest import org.jetbrains.exposed.test.utils.RepeatableTestRule import org.junit.Rule import org.junit.Test diff --git a/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/CoroutineTests.kt b/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/CoroutineTests.kt index 715b4bf23d..7042ae9d0d 100644 --- a/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/CoroutineTests.kt +++ b/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/CoroutineTests.kt @@ -3,47 +3,50 @@ package org.jetbrains.exposed.sql.tests.shared import kotlinx.coroutines.* import kotlinx.coroutines.debug.junit4.CoroutinesTimeout import org.jetbrains.exposed.sql.SchemaUtils -import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.tests.DatabaseTestsBase -import org.jetbrains.exposed.sql.tests.TestDB import org.jetbrains.exposed.sql.tests.h2.H2Tests +import org.jetbrains.exposed.sql.transactions.experimental.andThen +//import org.jetbrains.exposed.sql.transact/ions.experimental.andThen +import org.jetbrains.exposed.sql.transactions.experimental.runSuspended import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransaction import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransactionAsync import org.jetbrains.exposed.test.utils.RepeatableTest import org.junit.Rule import org.junit.Test -import kotlin.test.assertEquals class CoroutineTests : DatabaseTestsBase() { @Rule @JvmField - val timeout = CoroutinesTimeout.seconds(10) + val timeout = CoroutinesTimeout.seconds(100) @Test @RepeatableTest(10) fun suspendedTx() { withDb { runBlocking { SchemaUtils.create(H2Tests.Testing) + commit() - val launchResult = suspendedTransaction(Dispatchers.IO) { + val job = suspendedTransaction(Dispatchers.IO) { H2Tests.Testing.insert{} launch(Dispatchers.Default) { - suspendedTransaction { + runSuspended { assertEquals(1, H2Tests.Testing.select { H2Tests.Testing.id.eq(1) }.singleOrNull()?.getOrNull(H2Tests.Testing.id)) } } } - val result = suspendedTransaction(Dispatchers.Default) { + + val result = runSuspended(Dispatchers.Default) { + job.join() H2Tests.Testing.select { H2Tests.Testing.id.eq(1) }.single()[H2Tests.Testing.id] } assertEquals(1, result) - launchResult.join() + SchemaUtils.drop(H2Tests.Testing) } } @@ -59,18 +62,22 @@ class CoroutineTests : DatabaseTestsBase() { H2Tests.Testing.insert{} launch(Dispatchers.Default) { - suspendedTransaction { + runSuspended { assertEquals(1, H2Tests.Testing.select { H2Tests.Testing.id.eq(1) }.singleOrNull()?.getOrNull(H2Tests.Testing.id)) } + commit() } } - val result = suspendedTransactionAsync(Dispatchers.Default) { + val result = suspendedTransactionAsync(Dispatchers.Default, useOuterTransactionIfAccessible = true) { + launchResult.await().join() H2Tests.Testing.select { H2Tests.Testing.id.eq(1) }.single()[H2Tests.Testing.id] + }.andThen { + assertEquals(1, it) + true } - launchResult.await() - assertEquals(1, result.await()) + assertEquals(true, result.await()) SchemaUtils.drop(H2Tests.Testing) } } diff --git a/spring-transaction/build.gradle.kts b/spring-transaction/build.gradle.kts index cedb2bdab0..4a7459e4f9 100644 --- a/spring-transaction/build.gradle.kts +++ b/spring-transaction/build.gradle.kts @@ -15,7 +15,7 @@ dependencies { api(project(":exposed")) api("org.springframework", "spring-jdbc", "5.1.7.RELEASE") api("org.springframework", "spring-context", "5.1.7.RELEASE") - implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.0.1") + implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.3.0-M1") implementation("com.h2database", "h2", "1.4.199") testImplementation(kotlin("test-junit"))