Skip to content

Commit

Permalink
Merge pull request #53 from soil-kt/tweak_flow_ext
Browse files Browse the repository at this point in the history
Use `produceIn` instead `toReceiveChannel` of custom operator
  • Loading branch information
ogaclejapan committed Aug 4, 2024
2 parents ad20221 + 47949e5 commit 0a16ba4
Showing 1 changed file with 1 addition and 17 deletions.
18 changes: 1 addition & 17 deletions soil-query-core/src/commonMain/kotlin/soil/query/core/FlowExt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@

package soil.query.core

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import kotlin.time.Duration

Expand All @@ -33,7 +29,7 @@ internal fun <T> Flow<T>.chunkedWithTimeout(
val ticker = MutableSharedFlow<Unit>(extraBufferCapacity = size)
val tickerTimeout = ticker
.debounce(duration)
.toReceiveChannel(this)
.produceIn(this)
try {
while (isActive) {
var isTimeout = false
Expand Down Expand Up @@ -63,15 +59,3 @@ internal fun <T> Flow<T>.chunkedWithTimeout(
}
}
}

private fun <T> Flow<T>.toReceiveChannel(scope: CoroutineScope): ReceiveChannel<T> {
val channel = Channel<T>(Channel.BUFFERED)
scope.launch {
try {
collect { value -> channel.send(value) }
} finally {
channel.close()
}
}
return channel
}

0 comments on commit 0a16ba4

Please sign in to comment.