Skip to content

Commit

Permalink
Code style + rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad committed Jun 5, 2019
1 parent 602ec61 commit 50787bf
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ internal abstract class ChannelFlow<T>(
scope.broadcast(context, produceCapacity, start, block = collectToFun)

fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, block = collectToFun)
scope.flowProduce(context, produceCapacity, block = collectToFun)

override suspend fun collect(collector: FlowCollector<T>) =
coroutineScope { // todo: flowScope
coroutineScope {
val channel = produceImpl(this)
channel.consumeEach { collector.emit(it) }
}
Expand Down
21 changes: 14 additions & 7 deletions kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,33 @@ internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(Flo
/*
* Shortcut for produce { flowScope {block() } }
*/
internal fun <T> CoroutineScope.flowProduce(capacity: Int = 0, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): ReceiveChannel<T> {
internal fun <T> CoroutineScope.flowProduce(
context: CoroutineContext,
capacity: Int = 0, @BuilderInference block: suspend ProducerScope<T>.() -> Unit
): ReceiveChannel<T> {
val channel = Channel<T>(capacity)
val newContext = newCoroutineContext(EmptyCoroutineContext) // To have a default dispatcher and coroutine id
val newContext = newCoroutineContext(context)
val coroutine = FlowProduceCoroutine(newContext, channel)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine

}

private class FlowCoroutine<T>(context: CoroutineContext, uCont: Continuation<T>) :
ScopeCoroutine<T>(context, uCont) {
private class FlowCoroutine<T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {

public override fun childCancelled(cause: Throwable): Boolean {
if (cause is ChildCancelledException) return true
return cancelImpl(cause)
}
}

private class FlowProduceCoroutine<T>(parentContext: CoroutineContext, channel: Channel<T>) :
ProducerCoroutine<T>(parentContext, channel) {
private class FlowProduceCoroutine<T>(
parentContext: CoroutineContext,
channel: Channel<T>
) : ProducerCoroutine<T>(parentContext, channel) {

public override fun childCancelled(cause: Throwable): Boolean {
if (cause is ChildCancelledException) return true
return cancelImpl(cause)
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private class ChannelFlowMerge<T>(
override suspend fun flowCollect(collector: FlowCollector<T>) {
// this function should not have been invoked when channel was explicitly requested
check(capacity == OPTIONAL_CHANNEL)
coroutineScope { // todo: flowScope
flowScope {
mergeImpl(this, collector.asConcurrentFlowCollector())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,3 @@ class FlowCallbackTest : TestBase() {
finish(3)
}
}

0 comments on commit 50787bf

Please sign in to comment.