Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Timeout.cancel for waitUntilNotified #1391

Merged
merged 2 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions okio/api/okio.api
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ public class okio/Timeout {
public static final field NONE Lokio/Timeout;
public fun <init> ()V
public final fun awaitSignal (Ljava/util/concurrent/locks/Condition;)V
public fun cancel ()V
public fun clearDeadline ()Lokio/Timeout;
public fun clearTimeout ()Lokio/Timeout;
public final fun deadline (JLjava/util/concurrent/TimeUnit;)Lokio/Timeout;
Expand Down
48 changes: 37 additions & 11 deletions okio/src/jvmMain/kotlin/okio/Timeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import java.io.IOException
import java.io.InterruptedIOException
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Condition
import kotlin.concurrent.Volatile
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toTimeUnit
Expand All @@ -32,6 +33,12 @@ actual open class Timeout {
private var deadlineNanoTime = 0L
private var timeoutNanos = 0L

/**
* A sentinel that is updated to a new object on each call to [cancel]. Sample this property
* before and after an operation to test if the timeout was canceled during the operation.
*/
@Volatile private var cancelMark: Any? = null
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also considered doing cancelCount: Int but I think I like this better. It doesn’t have an overflow problem!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would have worked. Concurrent read, add, writes would still have changed the value. Overflowing still results in a new value. The only problem would have been if you called cancel 2^32 times during an operation it wouldn't be detected (but that's probably okay because it's impossible).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s exactly the case I was considering. It’s practically impossible but ugly


/**
* Wait at most `timeout` time before aborting an operation. Using a per-operation timeout means
* that as long as forward progress is being made, no sequence of operations will fail.
Expand Down Expand Up @@ -107,6 +114,20 @@ actual open class Timeout {
}
}

/**
* Prevent all current applications of this timeout from firing. Use this when a time-limited
* operation should no longer be time-limited because the nature of the operation has changed.
*
* This function does not mutate the [deadlineNanoTime] or [timeoutNanos] properties of this
* timeout. It only applies to active operations that are limited by this timeout, and applies by
* allowing those operations to run indefinitely.
*
* Subclasses that override this method must call `super.cancel()`.
*/
open fun cancel() {
cancelMark = Any()
}

/**
* Waits on `monitor` until it is signaled. Throws [InterruptedIOException] if either the thread
* is interrupted or if this timeout elapses before `monitor` is signaled.
Expand Down Expand Up @@ -245,18 +266,23 @@ actual open class Timeout {
timeoutNanos
}

// Attempt to wait that long. This will break out early if the monitor is notified.
var elapsedNanos = 0L
if (waitNanos > 0L) {
val waitMillis = waitNanos / 1000000L
(monitor as Object).wait(waitMillis, (waitNanos - waitMillis * 1000000L).toInt())
elapsedNanos = System.nanoTime() - start
}
if (waitNanos <= 0) throw InterruptedIOException("timeout")

// Throw if the timeout elapsed before the monitor was notified.
if (elapsedNanos >= waitNanos) {
throw InterruptedIOException("timeout")
}
val cancelMarkBefore = cancelMark

// Attempt to wait that long. This will return early if the monitor is notified.
val waitMillis = waitNanos / 1000000L
(monitor as Object).wait(waitMillis, (waitNanos - waitMillis * 1000000L).toInt())
val elapsedNanos = System.nanoTime() - start

// If there's time remaining, we probably got the call we were waiting for.
if (elapsedNanos < waitNanos) return

// Return without throwing if this timeout was canceled while we were waiting. Note that this
// return is a 'spurious wakeup' because Object.notify() was not called.
if (cancelMark !== cancelMarkBefore) return

throw InterruptedIOException("timeout")
} catch (e: InterruptedException) {
Thread.currentThread().interrupt() // Retain interrupted status.
throw InterruptedIOException("interrupted")
Expand Down
53 changes: 53 additions & 0 deletions okio/src/jvmTest/kotlin/okio/WaitUntilNotifiedTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,49 @@ class WaitUntilNotifiedTest {
}
}

@Test
@Synchronized
fun cancelBeforeWaitDoesNothing() {
assumeNotWindows()
val timeout = Timeout()
timeout.timeout(1000, TimeUnit.MILLISECONDS)
timeout.cancel()
val start = now()
try {
timeout.waitUntilNotified(this)
fail()
} catch (expected: InterruptedIOException) {
assertEquals("timeout", expected.message)
}
assertElapsed(1000.0, start)
}

@Test
@Synchronized
fun canceledTimeoutDoesNotThrowWhenNotNotifiedOnTime() {
val timeout = Timeout()
timeout.timeout(1000, TimeUnit.MILLISECONDS)
timeout.cancelLater(500)

val start = now()
timeout.waitUntilNotified(this) // Returns early but doesn't throw.
assertElapsed(1000.0, start)
}

@Test
@Synchronized
fun multipleCancelsAreIdempotent() {
val timeout = Timeout()
timeout.timeout(1000, TimeUnit.MILLISECONDS)
timeout.cancelLater(250)
timeout.cancelLater(500)
timeout.cancelLater(750)

val start = now()
timeout.waitUntilNotified(this) // Returns early but doesn't throw.
assertElapsed(1000.0, start)
}

/** Returns the nanotime in milliseconds as a double for measuring timeouts. */
private fun now(): Double {
return System.nanoTime() / 1000000.0
Expand All @@ -178,4 +221,14 @@ class WaitUntilNotifiedTest {
private fun assertElapsed(duration: Double, start: Double) {
assertEquals(duration, now() - start - 200.0, 250.0)
}

private fun Timeout.cancelLater(delay: Long) {
executorService.schedule(
{
cancel()
},
delay,
TimeUnit.MILLISECONDS,
)
}
}