Skip to content

Commit

Permalink
Simplify FlowScope implementation, stylistic improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad committed May 31, 2019
1 parent 2064693 commit 004e671
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 83 deletions.
20 changes: 9 additions & 11 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}

/**
* The method that is invoked when the job is cancelled to possible propagate cancellation to the parent.
* The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
* Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
*
* Invariant: never returns `true` for instances of [CancellationException], otherwise such exception
* Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
* may leak to the [CoroutineExceptionHandler].
*/
private fun cancelParent(cause: Throwable): Boolean {
Expand Down Expand Up @@ -619,19 +619,17 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
cancelImpl(parentJob)
}

/**
* Returns `true` if job should cancel itself on child [CancellationException].
*/
public open fun cancelOnChildCancellation(cause: CancellationException) = false

/**
* Child was cancelled with a cause.
* In this method parent decides whether it cancels itself (e.g. on a critical failure) and
* whether it handles the exception of the child.
* In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
* It is overridden in supervisor implementations to completely ignore any child cancellation.
* Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
*
* Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
* may leak to the [CoroutineExceptionHandler].
*/
public open fun childCancelled(cause: Throwable): Boolean {
if (cause is CancellationException && !cancelOnChildCancellation(cause)) return true
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}

Expand All @@ -643,7 +641,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren

// cause is Throwable or ParentJob when cancelChild was invoked
// returns true is exception was handled, false otherwise
private fun cancelImpl(cause: Any?): Boolean {
internal fun cancelImpl(cause: Any?): Boolean {
if (onCancelComplete) {
// make sure it is completing, if cancelMakeCompleting returns true it means it had make it
// completing and had recorded exception
Expand Down
61 changes: 61 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlinx.coroutines.flow.unsafeFlow as flow

/**
* Creates a [CoroutineScope] and calls the specified suspend block with this scope.
* This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
* and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
*
* For example:
* ```
* flowScope {
* launch {
* throw CancellationException()
* }
* } // <- CE will be rethrown here
* ```
*/
internal suspend fun <R> flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = FlowCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}

/**
* Creates a flow that also provides a [CoroutineScope] for each collector
* Shorthand for:
* ```
* flow {
* flowScope {
* ...
* }
* }
* ```
* with additional constraint on cancellation.
* To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used.
*/
internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector<R>) -> Unit): Flow<R> =
flow {
val collector = this
flowScope { block(collector) }
}

internal class FlowCoroutine<T>(context: CoroutineContext, uCont: Continuation<T>) :
ScopeCoroutine<T>(context, uCont) {

public override fun childCancelled(cause: Throwable): Boolean {
if (cause is ChildCancelledException) return true
return cancelImpl(cause)
}
}
58 changes: 0 additions & 58 deletions kotlinx-coroutines-core/common/src/flow/internal/FlowScope.kt

This file was deleted.

10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
*/
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return scopedFlow {
return scopedFlow { downstream ->
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
// Channel is not closed deliberately as there is no close with value
val collector = async {
Expand All @@ -79,13 +79,13 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
// set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
emit(NULL.unbox(value))
downstream.emit(NULL.unbox(value))
}
}

// Close with value 'idiom'
collector.onAwait {
if (lastValue != null) emit(NULL.unbox(lastValue))
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
isDone = true
}
}
Expand All @@ -111,7 +111,7 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
*/
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
require(periodMillis > 0) { "Sample period should be positive" }
return scopedFlow {
return scopedFlow { downstream ->
val values = produce<Any?>(capacity = Channel.CONFLATED) {
// Actually Any, KT-30796
collect { value -> send(value ?: NULL) }
Expand All @@ -135,7 +135,7 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
ticker.onReceive {
val value = lastValue ?: return@onReceive
lastValue = null // Consume the value
emit(NULL.unbox(value))
downstream.emit(NULL.unbox(value))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int =
require(concurrency >= 0) { "Expected non-negative concurrency level, but had $concurrency" }
return scopedFlow {
val semaphore = Channel<Unit>(concurrency)
val flatMap = SerializingFlatMapCollector(this@scopedFlow, bufferSize)
val flatMap = SerializingFlatMapCollector(it, bufferSize)
collect { outerValue ->
// TODO real semaphore (#94)
semaphore.send(Unit) // Acquire concurrency permit
Expand Down Expand Up @@ -108,7 +108,7 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int
* produces `aa bb b_last`
*/
@FlowPreview
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow {
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow { downstream ->
var previousFlow: Job? = null
collect { value ->
// Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
Expand All @@ -117,7 +117,7 @@ public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): F
// Undispatched to have better user experience in case of synchronous flows
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
transform(value).collect { innerValue ->
emit(innerValue)
downstream.emit(innerValue)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@
package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

class FlowScopeTest : TestBase() {

private suspend fun flowScope(block: suspend CoroutineScope.() -> Unit): Unit {
scopedFlow<Unit>(block).singleOrNull()
}

@Test
fun testCancellation() = runTest {
assertFailsWith<CancellationException> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.math.*
import kotlin.test.*

class FlowOnTest : TestBase() {
Expand Down

0 comments on commit 004e671

Please sign in to comment.