diff --git a/kotlinx-coroutines-core/concurrent/src/MultithreadedDispatchers.common.kt b/kotlinx-coroutines-core/concurrent/src/MultithreadedDispatchers.common.kt index a2b4241f0f..cc2c16a62e 100644 --- a/kotlinx-coroutines-core/concurrent/src/MultithreadedDispatchers.common.kt +++ b/kotlinx-coroutines-core/concurrent/src/MultithreadedDispatchers.common.kt @@ -2,10 +2,40 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:JvmMultifileClass +@file:JvmName("ThreadPoolDispatcherKt") package kotlinx.coroutines +import kotlin.jvm.* + +/** + * Creates a coroutine execution context using a single thread with built-in [yield] support. + * **NOTE: The resulting [CloseableCoroutineDispatcher] owns native resources (its thread). + * Resources are reclaimed by [CloseableCoroutineDispatcher.close].** + * + * If the resulting dispatcher is [closed][CloseableCoroutineDispatcher.close] and + * attempt to submit a task is made, then: + * * On the JVM, the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the + * [Dispatchers.IO], so that the affected coroutine can clean up its resources and promptly complete. + * * On Native, the attempt to submit a task throws an exception. + * + * This is a **delicate** API. The result of this method is a closeable resource with the + * associated native resources (threads or native workers). It should not be allocated in place, + * should be closed at the end of its lifecycle, and has non-trivial memory and CPU footprint. + * If you do not need a separate thread-pool, but only have to limit effective parallelism of the dispatcher, + * it is recommended to use [CoroutineDispatcher.limitedParallelism] instead. + * + * If you need a completely separate thread-pool with scheduling policy that is based on the standard + * JDK executors, use the following expression: + * `Executors.newSingleThreadExecutor().asCoroutineDispatcher()`. + * See `Executor.asCoroutineDispatcher` for details. + * + * @param name the base name of the created thread. + */ @ExperimentalCoroutinesApi -public expect fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher +@DelicateCoroutinesApi +public fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher = + newFixedThreadPoolContext(1, name) @ExperimentalCoroutinesApi public expect fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher diff --git a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt index dc0b7e29c5..0828c0bccf 100644 --- a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt @@ -2,38 +2,13 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:JvmMultifileClass +@file:JvmName("ThreadPoolDispatcherKt") package kotlinx.coroutines import java.util.concurrent.* import java.util.concurrent.atomic.AtomicInteger -/** - * Creates a coroutine execution context using a single thread with built-in [yield] support. - * **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its thread). - * Resources are reclaimed by [ExecutorCoroutineDispatcher.close].** - * - * If the resulting dispatcher is [closed][ExecutorCoroutineDispatcher.close] and - * attempt to submit a continuation task is made, - * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the - * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. - * - * This is a **delicate** API. The result of this method is a closeable resource with the - * associated native resources (threads). It should not be allocated in place, - * should be closed at the end of its lifecycle, and has non-trivial memory and CPU footprint. - * If you do not need a separate thread-pool, but only have to limit effective parallelism of the dispatcher, - * it is recommended to use [CoroutineDispatcher.limitedParallelism] instead. - * - * If you need a completely separate thread-pool with scheduling policy that is based on the standard - * JDK executors, use the following expression: - * `Executors.newSingleThreadExecutor().asCoroutineDispatcher()`. - * See [Executor.asCoroutineDispatcher] for details. - * - * @param name the base name of the created thread. - */ -@DelicateCoroutinesApi -public actual fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = - newFixedThreadPoolContext(1, name) - /** * Creates a coroutine execution context with the fixed-size thread-pool and built-in [yield] support. * **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads). diff --git a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt index 0012ff65db..007d079a8d 100644 --- a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt +++ b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt @@ -9,17 +9,15 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* import kotlin.native.concurrent.* - -@ExperimentalCoroutinesApi -public actual fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher { - return WorkerDispatcher(name) -} +import kotlin.time.* +import kotlin.time.Duration.Companion.milliseconds public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher { require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads" } return MultiWorkerDispatcher(name, nThreads) } +@OptIn(ExperimentalTime::class) internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay { private val worker = Worker.start(name = name) @@ -52,21 +50,30 @@ internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), override fun dispose() { disposableHolder.value = null } + + fun isDisposed() = disposableHolder.value == null + } + + fun Worker.runAfterDelay(block: DisposableBlock, targetMoment: TimeMark) { + if (block.isDisposed()) return + val durationUntilTarget = -targetMoment.elapsedNow() + val quantum = 100.milliseconds + if (durationUntilTarget > quantum) { + executeAfter(quantum.inWholeMicroseconds) { runAfterDelay(block, targetMoment) } + } else { + executeAfter(maxOf(0, durationUntilTarget.inWholeMicroseconds), block) + } } val disposableBlock = DisposableBlock(block) - worker.executeAfter(timeMillis.toMicrosSafe(), disposableBlock) + val targetMoment = TimeSource.Monotonic.markNow() + timeMillis.milliseconds + worker.runAfterDelay(disposableBlock, targetMoment) return disposableBlock } override fun close() { worker.requestTermination().result // Note: calling "result" blocks } - - private fun Long.toMicrosSafe(): Long { - val result = this * 1000 - return if (result > this) result else Long.MAX_VALUE - } } private class MultiWorkerDispatcher( diff --git a/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt b/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt index ce433cc3e3..501b7b5b4b 100644 --- a/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt +++ b/kotlinx-coroutines-core/native/test/MultithreadedDispatchersTest.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* import kotlin.native.concurrent.* import kotlin.test.* +import kotlin.time.Duration.Companion.seconds private class BlockingBarrier(val n: Int) { val counter = atomic(0) @@ -63,4 +64,20 @@ class MultithreadedDispatchersTest { dispatcher.close() } } + + /** + * Test that [newSingleThreadContext] will not wait for the cancelled scheduled coroutines before closing. + */ + @Test + fun timeoutsNotPreventingClosing(): Unit = runBlocking { + val dispatcher = WorkerDispatcher("test") + withContext(dispatcher) { + withTimeout(5.seconds) { + } + } + withTimeout(1.seconds) { + dispatcher.close() // should not wait for the timeout + yield() + } + } }