Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow 中的三个数据相关的操作符:debounce、buffer 和 conflate #56

Open
cnwutianhao opened this issue Aug 11, 2024 · 0 comments
Labels

Comments

@cnwutianhao
Copy link
Owner

在 Kotlin 中,Flow 是一种处理异步数据流的 API,它类似于 RxJava 中的 Observable。

debounce 操作符

debounceFlow 中的一个操作符,用于过滤快速连续发射的数据项,只保留在指定时间段内最后一个数据项。这在处理类似搜索输入、按钮点击这类短时间内可能会触发多次的事件时非常有用。

  1. 作用

    减少频繁的数据发射。它等待指定的一段时间,如果在这段时间内没有新的数据项发射出来,那么它就会发射最新的数据项。如果在这段时间内有新的数据项发射出来,它会重新开始等待。

  2. 用法

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    // 创建一个流,每0.5秒发射一次数据
    fun main() = runBlocking {
        val flow = (1..5).asFlow()
            .onEach { delay(500) }   // 模拟延迟
            .debounce(1000)          // 只保留最后一个在1秒内发射的数据项
    
        flow.collect { value ->
            println(value)           // 预期输出:5
        }
    }

    在上面的例子中,debounce 操作符将1秒内发射的所有数据项过滤掉,只保留最后一个。由于每个数据项之间的间隔时0.5秒,因此只有最后一个数据项被保留。

  3. 实际应用示例

    以下是一个实际应用示例,展示了如何使用 debounce 操作符来处理搜索输入:

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val searchFlow = MutableStateFlow("")
    
        // 模拟用户输入
        CoroutineScope(Dispatchers.Default).launch {
            delay(100)
            searchFlow.value = "T"
            delay(200)
            searchFlow.value = "Ty"
            delay(300)
            searchFlow.value = "Tyh"
            delay(400)
            searchFlow.value = "Tyho"
            delay(500)
            searchFlow.value = "Tyhoo"
        }
    
        // 收集搜索输入,并在0.5秒没有变化时执行搜索
        searchFlow
            .debounce(500)
            .filter { it.isNotBlank() }
            .collectLatest { query ->
                performanceSearch(query)
            }
    }
    
    suspend fun performanceSearch(query: String) {
        println("Searching for $query")
        // 模拟搜索延迟
        delay(1000)
        println("Search result for $query")
    }

    在这个示例中,searchFlow 是一个 MutableStateFlow,它使用 debounce 操作符来缓解快速的输入变化,只在停止输入0.5秒后才执行搜索操作。

  4. 小结

    debounce 操作符用于过滤频繁发射的数据项,只保留最后一个在指定时间内发射的数据项。常用于处理用户输入、按钮点击等可能频繁触发的事件,避免不必要的操作频繁发生。

buffer 操作符

buffer 操作符允许在流的上下游之间引入缓存,从而减少背压的影响。

背压:是指由于上下游处理速度不一致而导致的阻塞现象。

  1. 作用

    通过在数据流动过程中引入缓冲区,从而使得较慢的消费者不会过多影响生产者的效率。

  2. 用法

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flow {
            for (i in 1..5) {
                delay(100)      // 模拟生产者速度
                emit(i)
            }
        }
    
        flow.buffer(2)
            .collect { value ->
                delay(300)      // 模拟消费者速度
                println(value)
            }
    }

    在这个例子中,生产者每0.1秒发射一个值,而消费者每0.3秒消耗一个值。如果不使用 buffer,生产者将会被阻塞。但是通过引入一个大小为2的缓冲区,可以使得生产者和消费者更多地并行运行。

conflate 操作符

conflate 操作符和 buffer 操作符相比则直接跳过中间的缓冲阶段,只保留最新的数据。

  1. 作用

    当生产速度比消费速度快的时候,这个操作符有助于减轻背压问题。

  2. 用法

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flow {
            for (i in 1..5) {
                delay(100)      // 模拟生产者速度
                emit(i)
            }
        }
    
        flow.conflate()
            .collect { value ->
                delay(300)      // 模拟消费者速度
                println(value)
            }
    }

    在这个例子中,由于使用了 conflate 操作符,流会跳过中间的值,只保留最新的。尽管生产者每0.1秒生成一个值,但消费者每0.3秒消耗一个值,由于 conflate 的存在,很多中间值会被抛弃。

bufferconflate 的对比

  • buffer 引入了一个具体大小的缓存,允许生产和消费在一定程度上的异步处理。
  • conflate 不保存中间值,只保留最新的值,适合需要减少处理负担且关心最新数据的场景。

小结

  • bufferconflate 都是用于处理流的性能优化操作符。
  • buffer 通过引入缓冲区降低背压,让上下游可以并发运行。
  • conflate 则直接跳过中间值,只保留最新的,大幅度减少处理频率,适用于对最新数据更敏感的场景。
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant