Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use produceIn instead toReceiveChannel of custom operator #53

Merged
merged 1 commit into from
Aug 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@

package soil.query.core

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import kotlin.time.Duration

Expand All @@ -33,7 +29,7 @@ internal fun <T> Flow<T>.chunkedWithTimeout(
val ticker = MutableSharedFlow<Unit>(extraBufferCapacity = size)
val tickerTimeout = ticker
.debounce(duration)
.toReceiveChannel(this)
.produceIn(this)
try {
while (isActive) {
var isTimeout = false
Expand Down Expand Up @@ -63,15 +59,3 @@ internal fun <T> Flow<T>.chunkedWithTimeout(
}
}
}

private fun <T> Flow<T>.toReceiveChannel(scope: CoroutineScope): ReceiveChannel<T> {
val channel = Channel<T>(Channel.BUFFERED)
scope.launch {
try {
collect { value -> channel.send(value) }
} finally {
channel.close()
}
}
return channel
}