Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow scope #1227

Merged
merged 6 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,6 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public fun fold (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
public final fun getCancellationException ()Ljava/util/concurrent/CancellationException;
protected fun getCancelsParent ()Z
public fun getChildJobCancellationCause ()Ljava/util/concurrent/CancellationException;
public final fun getChildren ()Lkotlin/sequences/Sequence;
protected final fun getCompletionCause ()Ljava/lang/Throwable;
Expand All @@ -396,6 +395,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public final fun isCancelled ()Z
public final fun isCompleted ()Z
public final fun isCompletedExceptionally ()Z
protected fun isScopedCoroutine ()Z
public final fun join (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
protected fun onCancelling (Ljava/lang/Throwable;)V
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Exceptions.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal expect class JobCancellationException(
internal val job: Job
}

internal expect class CoroutinesInternalError(message: String, cause: Throwable) : Error
internal class CoroutinesInternalError(message: String, cause: Throwable) : Error(message, cause)

internal expect fun Throwable.addSuppressedThrowable(other: Throwable)
// For use in tests
Expand Down
70 changes: 45 additions & 25 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,31 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}

/**
* The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
* Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
*
* Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
* may leak to the [CoroutineExceptionHandler].
*/
private fun cancelParent(cause: Throwable): Boolean {
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
* This allow parent to cancel its children (normally) without being cancelled itself, unless
* child crashes and produce some other exception during its completion.
*/
val isCancellation = cause is CancellationException
val parent = parentHandle
// No parent -- ignore CE, report other exceptions.
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}

// Is scoped coroutine -- don't propagate, will be rethrown
if (isScopedCoroutine) return isCancellation
// Notify parent but don't forget to check cancellation
return parent.childCancelled(cause) || isCancellation
}

private fun NodeList.notifyCompletion(cause: Throwable?) =
notifyHandlers<JobNode<*>>(this, cause)

Expand Down Expand Up @@ -594,21 +619,29 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
cancelImpl(parentJob)
}

// Child was cancelled with cause
// It is overridden in supervisor implementations to ignore child cancellation
public open fun childCancelled(cause: Throwable): Boolean =
cancelImpl(cause) && handlesException
/**
* Child was cancelled with a cause.
* In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
* It is overridden in supervisor implementations to completely ignore any child cancellation.
* Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
*
* Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
* may leak to the [CoroutineExceptionHandler].
*/
public open fun childCancelled(cause: Throwable): Boolean {
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved

/**
* Makes this [Job] cancelled with a specified [cause].
* It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
*/
public fun cancelCoroutine(cause: Throwable?) =
cancelImpl(cause)
public fun cancelCoroutine(cause: Throwable?) = cancelImpl(cause)

// cause is Throwable or ParentJob when cancelChild was invoked
// returns true is exception was handled, false otherwise
private fun cancelImpl(cause: Any?): Boolean {
internal fun cancelImpl(cause: Any?): Boolean {
if (onCancelComplete) {
// make sure it is completing, if cancelMakeCompleting returns true it means it had make it
// completing and had recorded exception
Expand Down Expand Up @@ -912,14 +945,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
protected open fun onCancelling(cause: Throwable?) {}

/**
* When this function returns `true` the parent is cancelled on cancellation of this job.
* Note that [CancellationException] is considered "normal" and parent is not cancelled when child produces it.
* This allows parent to cancel its children (normally) without being cancelled itself, unless
* child crashes and produce some other exception during its completion.
*
* @suppress **This is unstable API and it is subject to change.*
* Returns `true` for scoped coroutines.
* Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
* Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
* Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
*/
protected open val cancelsParent: Boolean get() = true
protected open val isScopedCoroutine: Boolean get() = false

/**
* Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
Expand All @@ -939,20 +970,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
*
* This method is invoked **exactly once** when the final exception of the job is determined
* and before it becomes complete. At the moment of invocation the job and all its children are complete.
*
* @suppress **This is unstable API and it is subject to change.*
*/
protected open fun handleJobException(exception: Throwable): Boolean = false

private fun cancelParent(cause: Throwable): Boolean {
// CancellationException is considered "normal" and parent is not cancelled when child produces it.
// This allow parent to cancel its children (normally) without being cancelled itself, unless
// child crashes and produce some other exception during its completion.
if (cause is CancellationException) return true
if (!cancelsParent) return false
return parentHandle?.childCancelled(cause) == true
}

/**
* Override for completion actions that need to update some external object depending on job's state,
* right before all the waiters for coroutine's completion are notified.
Expand Down
4 changes: 1 addition & 3 deletions kotlinx-coroutines-core/common/src/Timeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ private open class TimeoutCoroutine<U, in T: U>(
override val defaultResumeMode: Int get() = MODE_DIRECT
override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame)
override fun getStackTraceElement(): StackTraceElement? = null

override val cancelsParent: Boolean
get() = false // it throws exception to parent instead of cancelling it
override val isScopedCoroutine: Boolean get() = true

@Suppress("LeakingThis", "Deprecation")
override fun run() {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/channels/Produce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public fun <E> CoroutineScope.produce(
return coroutine
}

private class ProducerCoroutine<E>(
internal open class ProducerCoroutine<E>(
parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
override val isActive: Boolean
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.()
public inline fun <T> callbackFlow(@BuilderInference noinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
channelFlow(block)

// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
private class ChannelFlowBuilder<T>(
private val block: suspend ProducerScope<T>.() -> Unit,
context: CoroutineContext = EmptyCoroutineContext,
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/Migration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = error("Should
@Deprecated(message = "withContext in flow body is deprecated, use flowOn instead", level = DeprecationLevel.ERROR)
public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block: suspend () -> R): Unit = error("Should not be called")


/**
* `subscribe` is Rx-specific API that has no direct match in flows.
* One can use `launch` instead, for example the following:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ internal abstract class ChannelFlow<T>(
scope.broadcast(context, produceCapacity, start, block = collectToFun)

fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, block = collectToFun)
scope.flowProduce(context, produceCapacity, block = collectToFun)

override suspend fun collect(collector: FlowCollector<T>) =
coroutineScope { // todo: flowScope
coroutineScope {
val channel = produceImpl(this)
channel.consumeEach { collector.emit(it) }
}
Expand Down
89 changes: 89 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlinx.coroutines.flow.unsafeFlow as flow

/**
* Creates a [CoroutineScope] and calls the specified suspend block with this scope.
* This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
* and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
*
* For example:
* ```
* flowScope {
* launch {
* throw CancellationException()
* }
* } // <- CE will be rethrown here
* ```
*/
internal suspend fun <R> flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R =
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = FlowCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}

/**
* Creates a flow that also provides a [CoroutineScope] for each collector
* Shorthand for:
* ```
* flow {
* flowScope {
* ...
* }
* }
* ```
* with additional constraint on cancellation.
* To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used.
*/
internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector<R>) -> Unit): Flow<R> =
flow {
val collector = this
flowScope { block(collector) }
}

/*
* Shortcut for produce { flowScope {block() } }
*/
internal fun <T> CoroutineScope.flowProduce(
context: CoroutineContext,
capacity: Int = 0, @BuilderInference block: suspend ProducerScope<T>.() -> Unit
): ReceiveChannel<T> {
val channel = Channel<T>(capacity)
val newContext = newCoroutineContext(context)
val coroutine = FlowProduceCoroutine(newContext, channel)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine
}

private class FlowCoroutine<T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {

public override fun childCancelled(cause: Throwable): Boolean {
if (cause is ChildCancelledException) return true
return cancelImpl(cause)
}
}

private class FlowProduceCoroutine<T>(
parentContext: CoroutineContext,
channel: Channel<T>
) : ProducerCoroutine<T>(parentContext, channel) {

public override fun childCancelled(cause: Throwable): Boolean {
if (cause is ChildCancelledException) return true
return cancelImpl(cause)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ import kotlinx.coroutines.*
* This exception should never escape outside of operator's implementation.
*/
internal expect class AbortFlowException() : CancellationException

/**
* Exception used to cancel child of [scopedFlow] without cancelling the whole scope.
*/
internal expect class ChildCancelledException() : CancellationException
92 changes: 45 additions & 47 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,34 +60,33 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
*/
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return flow {
coroutineScope {
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
// Channel is not closed deliberately as there is no close with value
val collector = async {
collect { value -> values.send(value ?: NULL) }
}
return scopedFlow { downstream ->
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
// Channel is not closed deliberately as there is no close with value
val collector = async {
collect { value -> values.send(value ?: NULL) }
}

var isDone = false
var lastValue: Any? = null
while (!isDone) {
select<Unit> {
values.onReceive {
lastValue = it
}
var isDone = false
var lastValue: Any? = null
while (!isDone) {
select<Unit> {
values.onReceive {
lastValue = it
}

lastValue?.let { value -> // set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
emit(NULL.unbox(value))
}
lastValue?.let { value ->
// set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
downstream.emit(NULL.unbox(value))
}
}

// Close with value 'idiom'
collector.onAwait {
if (lastValue != null) emit(NULL.unbox(lastValue))
isDone = true
}
// Close with value 'idiom'
collector.onAwait {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
isDone = true
}
}
}
Expand All @@ -112,32 +111,31 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
*/
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
require(periodMillis > 0) { "Sample period should be positive" }
return flow {
coroutineScope {
val values = produce<Any?>(capacity = Channel.CONFLATED) { // Actually Any, KT-30796
collect { value -> send(value ?: NULL) }
}
return scopedFlow { downstream ->
val values = produce<Any?>(capacity = Channel.CONFLATED) {
// Actually Any, KT-30796
collect { value -> send(value ?: NULL) }
}

var isDone = false
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMillis)
while (!isDone) {
select<Unit> {
values.onReceiveOrNull {
if (it == null) {
ticker.cancel()
isDone = true
} else {
lastValue = it
}
var isDone = false
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMillis)
while (!isDone) {
select<Unit> {
values.onReceiveOrNull {
if (it == null) {
ticker.cancel(ChildCancelledException())
isDone = true
} else {
lastValue = it
}
}

// todo: shall be start sampling only when an element arrives or sample aways as here?
ticker.onReceive {
val value = lastValue ?: return@onReceive
lastValue = null // Consume the value
emit(NULL.unbox(value))
}
// todo: shall be start sampling only when an element arrives or sample aways as here?
ticker.onReceive {
val value = lastValue ?: return@onReceive
lastValue = null // Consume the value
downstream.emit(NULL.unbox(value))
}
}
}
Expand Down
Loading