Skip to content

Commit

Permalink
Fix potential data race in EventLoop (#3289)
Browse files Browse the repository at this point in the history
Fixes #3251
  • Loading branch information
qwwdfsad authored May 23, 2022
1 parent 20d47b7 commit 8cdb4d6
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 4 deletions.
8 changes: 7 additions & 1 deletion kotlinx-coroutines-core/common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,13 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
continuation.disposeOnCancellation(task)
/*
* Order is important here: first we schedule the heap and only then
* publish it to continuation. Otherwise, `DelayedResumeTask` would
* have to know how to be disposed of even when it wasn't scheduled yet.
*/
schedule(now, task)
continuation.disposeOnCancellation(task)
}
}
}
Expand Down Expand Up @@ -410,6 +415,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
*/
@JvmField var nanoTime: Long
) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
@Volatile
private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK

override var heap: ThreadSafeHeap<*>?
Expand Down
2 changes: 0 additions & 2 deletions kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHe
}
}

// @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized
public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) {
val first = firstImpl() ?: return null
if (predicate(first)) {
Expand All @@ -59,7 +58,6 @@ public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHe

public fun addLast(node: T): Unit = synchronized(this) { addImpl(node) }

// @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized
// Condition also receives current first node in the heap
public inline fun addLastIf(node: T, cond: (T?) -> Boolean): Boolean = synchronized(this) {
if (cond(firstImpl())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import kotlinx.coroutines.*
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import kotlin.test.*

class ThreadSafeHeapStressTest : TestBase() {
private class DisposableNode : EventLoopImplBase.DelayedTask(1L) {
override fun run() {
}
}

@Test
fun testConcurrentRemoveDispose() = runTest {
val heap = EventLoopImplBase.DelayedTaskQueue(1)
repeat(10_000 * stressTestMultiplierSqrt) {
withContext(Dispatchers.Default) {
val node = DisposableNode()
val barrier = CyclicBarrier(2)
launch {
heap.addLast(node)
barrier.await()
heap.remove(node)
}
launch {
barrier.await()
Thread.yield()
node.dispose()
}
}
}
}

@Test()
fun testConcurrentAddDispose() = runTest {
repeat(10_000 * stressTestMultiplierSqrt) {
val jobToCancel = Job()
val barrier = CyclicBarrier(2)
val jobToJoin = launch(Dispatchers.Default) {
barrier.await()
jobToCancel.cancelAndJoin()
}

try {
runBlocking { // Use event loop impl
withContext(jobToCancel) {
// This one is to work around heap allocation optimization
launch(start = CoroutineStart.UNDISPATCHED) {
delay(100_000)
}
barrier.await()
delay(100_000)
}
}
} catch (e: CancellationException) {
// Expected exception
}
jobToJoin.join()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,4 @@ class ThreadSafeHeapTest : TestBase() {
assertEquals(set.size, h.size)
}
}
}
}

0 comments on commit 8cdb4d6

Please sign in to comment.