Skip to content

Commit

Permalink
Explicit naming for CancellableContinuation modes, drop suspendAtomic…
Browse files Browse the repository at this point in the history
…CancellableCoroutine

* MODE_ATOMIC_DEFAULT split into MODE_ATOMIC (for dispatch) and MODE_ATOMIC_REUSABLE (for suspendCancellableCoroutineReusable only).
  Dispatch modes are orthogonal to additional REUSE capability now.
* Better documentation for MODE_XXX constants.
* suspendCancellableCoroutineReusable does not have a default mode anymore, so its use is more explicit.
* Completely drop (inline) suspendAtomicCancellableCoroutine. Any kind of legacy code where this call might have been inlined still works because the constant value of MODE_ATOMIC = 0 is retained and carries its legacy meaning (no continuation reuse).
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
  • Loading branch information
elizarov committed Apr 8, 2020
1 parent 1952649 commit 5cd0c24
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
}

class CancellableChannel : SimpleChannel() {
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand All @@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {

class CancellableReusableChannel : SimpleChannel() {
@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit>(MODE_ATOMIC_REUSABLE) {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand Down
3 changes: 0 additions & 3 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/

public final class kotlinx/coroutines/CancellableContinuationKt {
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down
47 changes: 11 additions & 36 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -204,30 +204,18 @@ public suspend inline fun <T> suspendCancellableCoroutine(
}

/**
* Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*.
* Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
* [CancellableContinuationImpl] is reused.
*
* When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when the continuation
* was already resumed and was posted for execution to the thread's queue.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
block(cancellable)
cancellable.getResult()
}

/**
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
* * when [resumeMode] is [MODE_CANCELLABLE_REUSABLE] works like [suspendCancellableCoroutine].
* * when [resumeMode] is [MODE_ATOMIC_REUSABLE] it has *atomic cancellation*.
* When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when the continuation
* was already resumed and was posted for execution to the thread's queue.
*/
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
resumeMode: Int = MODE_ATOMIC_DEFAULT,
internal suspend inline fun <T> suspendCancellableCoroutineReusable(
resumeMode: Int,
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode)
Expand All @@ -238,6 +226,7 @@ internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
internal fun <T> getOrCreateCancellableContinuation(
delegate: Continuation<T>, resumeMode: Int
): CancellableContinuationImpl<T> {
assert { resumeMode.isReusableMode }
// If used outside of our dispatcher
if (delegate !is DispatchedContinuation<T>) {
return CancellableContinuationImpl(delegate, resumeMode)
Expand All @@ -260,20 +249,6 @@ internal fun <T> getOrCreateCancellableContinuation(
?: return CancellableContinuationImpl(delegate, resumeMode)
}

/**
* @suppress **Deprecated**
*/
@Deprecated(
message = "holdCancellability parameter is deprecated and is no longer used",
replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)")
)
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
holdCancellability: Boolean = false,
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendAtomicCancellableCoroutine(block)

/**
* Removes the specified [node] on cancellation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ internal open class CancellableContinuationImpl<in T>(
private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)

/**
* Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work.
* Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
* Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work.
* Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
*/
@JvmName("resetState") // Prettier stack traces
internal fun resetState(resumeMode: Int): Boolean {
Expand Down Expand Up @@ -174,7 +174,7 @@ internal open class CancellableContinuationImpl<in T>(
if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) }
// Complete state update
detachChildIfNonResuable()
dispatchResume(mode = MODE_ATOMIC_DEFAULT)
dispatchResume(mode = MODE_ATOMIC) // no need for additional cancellation checks
return true
}
}
Expand Down Expand Up @@ -232,10 +232,10 @@ internal open class CancellableContinuationImpl<in T>(
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
// if the parent job was already cancelled, then throw the corresponding cancellation exception
// otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
// otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
// before the block returns. This getResult would return a result as opposed to cancellation
// exception that should have happened if the continuation is dispatched for execution later.
if (resumeMode == MODE_CANCELLABLE) {
if (resumeMode.isCancellableMode) {
val job = context[Job]
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
Expand Down
12 changes: 6 additions & 6 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
return closed.sendException
}

private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) sc@ { cont ->
loop@ while (true) {
if (isFullImpl) {
val send = SendElement(element, cont)
Expand Down Expand Up @@ -543,13 +543,13 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// slow-path does suspend
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE, MODE_ATOMIC_DEFAULT)
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE, MODE_ATOMIC_REUSABLE)
}

@Suppress("UNCHECKED_CAST")
private suspend fun <R> receiveSuspend(
receiveMode: Int, resumeMode: Int
): R = suspendAtomicCancellableCoroutineReusable(resumeMode) sc@ { cont ->
): R = suspendCancellableCoroutineReusable(resumeMode) sc@ { cont ->
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, receiveMode)
while (true) {
if (enqueueReceive(receive)) {
Expand Down Expand Up @@ -583,7 +583,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// slow-path does suspend
return receiveSuspend(RECEIVE_NULL_ON_CLOSE, MODE_ATOMIC_DEFAULT)
return receiveSuspend(RECEIVE_NULL_ON_CLOSE, MODE_ATOMIC_REUSABLE)
}

@Suppress("UNCHECKED_CAST")
Expand All @@ -601,7 +601,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
val result = pollInternal()
if (result !== POLL_FAILED) return result.toResult()
// slow-path does suspend
return receiveSuspend(RECEIVE_RESULT, if (atomic) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE_REUSABLE)
return receiveSuspend(RECEIVE_RESULT, if (atomic) MODE_ATOMIC_REUSABLE else MODE_CANCELLABLE_REUSABLE)
}

@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -816,7 +816,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
return true
}

private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) sc@ { cont ->
val receive = ReceiveHasNext(this, cont)
while (true) {
if (channel.enqueueReceive(receive)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
) : DispatchedTask<T>(MODE_ATOMIC), CoroutineStackFrame, Continuation<T> by continuation {
@JvmField
@Suppress("PropertyName")
internal var _state: Any? = UNDEFINED
Expand All @@ -43,7 +43,7 @@ internal class DispatchedContinuation<in T>(
* }
* // state == CC
* ```
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendAtomicCancellableCoroutineReusable],
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
* [CancellableContinuationImpl.getResult] will check for cancellation later.
*
* [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
Expand Down Expand Up @@ -83,7 +83,7 @@ internal class DispatchedContinuation<in T>(
}

/**
* Claims the continuation for [suspendAtomicCancellableCoroutineReusable] block,
* Claims the continuation for [suspendCancellableCoroutineReusable] block,
* so all cancellations will be postponed.
*/
@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -180,10 +180,10 @@ internal class DispatchedContinuation<in T>(
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
Expand Down
45 changes: 36 additions & 9 deletions kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,44 @@ import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*

/**
* Non-cancellable dispatch mode.
*
* **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
* inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
*/
internal const val MODE_ATOMIC = 0

/**
* Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
* Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
*
* **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
*/
@PublishedApi
internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
@PublishedApi
internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
internal const val MODE_CANCELLABLE = 1

internal const val MODE_CANCELLABLE_REUSABLE = 2 // same as MODE_CANCELLABLE but supports reused
internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine
/**
* Atomic dispatch mode for [suspendCancellableCoroutineReusable].
* Note, that implementation of reuse checks mode via [Int.isReusableMode] extension.
*/
internal const val MODE_ATOMIC_REUSABLE = 2

/**
* Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
* Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
* implementation of reuse checks mode via [Int.isReusableMode] extension.
*/
internal const val MODE_CANCELLABLE_REUSABLE = 3

/**
* Undispatched mode for [CancellableContinuation.resumeUndispatched].
* It is used when the thread is right, but it needs to be mark it with the current coroutine.
*/
internal const val MODE_UNDISPATCHED = 4

internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
internal val Int.isDispatchedMode get() = this != MODE_UNDISPATCHED
internal val Int.isReusableMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE_REUSABLE
internal val Int.isReusableMode get() = this == MODE_ATOMIC_REUSABLE || this == MODE_CANCELLABLE_REUSABLE

internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
Expand Down Expand Up @@ -103,7 +130,7 @@ internal abstract class DispatchedTask<in T>(

internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
if (mode != MODE_UNDISPATCHED && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation
val dispatcher = delegate.dispatcher
val context = delegate.context
Expand All @@ -124,7 +151,7 @@ internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: In
val exception = getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) }
val result = if (exception != null) Result.failure(exception) else Result.success(state as T)
when (useMode) {
MODE_ATOMIC_DEFAULT -> delegate.resumeWith(result)
MODE_ATOMIC, MODE_ATOMIC_REUSABLE -> delegate.resumeWith(result)
MODE_CANCELLABLE, MODE_CANCELLABLE_REUSABLE -> delegate.resumeCancellableWith(result)
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
else -> error("Invalid mode $useMode")
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/sync/Mutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
return lockSuspend(owner)
}

private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutineReusable<Unit> sc@ { cont ->
private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit>(MODE_ATOMIC_REUSABLE) sc@ { cont ->
val waiter = LockCont(owner, cont)
_state.loop { state ->
when (state) {
Expand Down
3 changes: 1 addition & 2 deletions kotlinx-coroutines-core/common/src/sync/Semaphore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlin.math.*
import kotlin.native.concurrent.*

Expand Down Expand Up @@ -136,7 +135,7 @@ private class SemaphoreImpl(
cur + 1
}

private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable<Unit> sc@ { cont ->
private suspend fun addToQueueAndSuspend() = suspendCancellableCoroutineReusable<Unit>(MODE_ATOMIC_REUSABLE) sc@ { cont ->
val last = this.tail
val enqIdx = enqIdx.getAndIncrement()
val segment = getSegment(last, enqIdx / SEGMENT_SIZE)
Expand Down
41 changes: 36 additions & 5 deletions kotlinx-coroutines-core/jvm/test/JobStructuredJoinStressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,60 @@
package kotlinx.coroutines

import org.junit.*
import kotlin.coroutines.*

/**
* Test a race between job failure and join.
*
* See [#1123](https://github.com/Kotlin/kotlinx.coroutines/issues/1123).
*/
class JobStructuredJoinStressTest : TestBase() {
private val nRepeats = 1_000 * stressTestMultiplier
private val nRepeats = 10_000 * stressTestMultiplier

@Test
fun testStress() {
repeat(nRepeats) {
fun testStressRegularJoin() {
stress(Job::join)
}

@Test
fun testStressSuspendCancellable() {
stress { job ->
suspendCancellableCoroutine { cont ->
job.invokeOnCompletion { cont.resume(Unit) }
}
}
}

@Test
fun testStressSuspendCancellableReusable() {
stress { job ->
suspendCancellableCoroutineReusable(MODE_CANCELLABLE_REUSABLE) { cont ->
job.invokeOnCompletion { cont.resume(Unit) }
}
}
}

private fun stress(join: suspend (Job) -> Unit) {
expect(1)
repeat(nRepeats) { index ->
assertFailsWith<TestException> {
runBlocking {
// launch in background
val job = launch(Dispatchers.Default) {
throw TestException("OK") // crash
}
assertFailsWith<CancellationException> {
job.join()
try {
join(job)
error("Should not complete successfully")
} catch (e: CancellationException) {
// must always crash with cancellation exception
expect(2 + index)
} catch (e: Throwable) {
error("Unexpected exception", e)
}
}
}
}
finish(2 + nRepeats)
}
}
Loading

0 comments on commit 5cd0c24

Please sign in to comment.