Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

一篇文章带你了解 Flow #46

Open
cnwutianhao opened this issue May 13, 2024 · 0 comments
Open

一篇文章带你了解 Flow #46

cnwutianhao opened this issue May 13, 2024 · 0 comments
Labels

Comments

@cnwutianhao
Copy link
Owner

数据流(flow)以协程(coroutines)为基础构建,可提供多个值。数据流使用挂起函数通过异步方式生成和使用值,这就是说,例如,数据流可安全地发出网络请求以生成下一个值,而不会阻塞主线程。

一、为什么需要 Flow

首先我们来回顾下 Kotlin 中我们如何使用挂起函数,我们在 main 方法中,调用挂起函数返回一组数据,代码如下所示:

suspend fun loadData(): List<Int> {
    delay(1000)
    return listOf(1, 2, 3)
}

fun main() {
    runBlocking {
        loadData().forEach { value -> println(value) }
    }
}

运行 main 函数,结果为 1 秒后输出:

1
2
3

那么我们想一下,如果 loadData 中的数据集合,并不是一起返回的,比如从网络中先获取到了 1 再获取到了 2 最后再获取到了 3 ,那么这样如果我们仍然在返回最后一个结果(其实也不知道)时一并返回数据,会造成资源浪费并且用户体验不好,那么我们如何解决这个问题呢?

上面挂起函数的返回类型是 List 类型,那么就必定只能一次性返回数据,此时,Flow 就出场了~

Flow 包含三个实体

  • 提供方(producer)会生成添加到数据流中的数据。得益于协程,数据流还可以异步生成数据。
  • (可选)中介(Intermediaries)可以修改发送到数据流的值,或修正数据流本身。
  • 使用方(consumer)则使用数据流中的值。

二、Flow 的基础使用

  1. 构建器

    我们改写 loadData 方法,返回类型修改为 Flow,并构造一个 flow,在 flow 中,每隔一秒,发送一个数据用来模拟延迟获取值,代码如下所示:

    fun loadData() = flow {
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            loadData().collect {
                println("collect $it")
            }
        }
    }

    运行结果即是,每隔 1 秒钟,打印出来一个数字:

    emit 1
    collect 1
    emit 2
    collect 2
    emit 3
    collect 3
    

    emit 方法用于发射值,collect 方法是收集值,这里需要注意的是,我们可以看到在 main 方法协程中,我们可以直接调用 loadData 的方法,这是因为 flow 构建块中的代码就是一个 suspend 函数。这样一来我们就实现了对数据的逐步加载,而不需要等待所有的数据返回。

    接下来我们在 main 方法中调用多次 loadData 方法而不调用 collect,看会有什么现象。修改代码如下所示:

    fun loadData() = flow {
        println("进入加载数据的方法")
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            println("第一次准备调用加载数据的方法")
            val first = loadData()
            println("第二次准备调用加载数据的方法")
            val second = loadData()
            second.collect {
                println("collect $it")
            }
        }
    }

    然后我们运行 main 方法,打印结果如下所示:

    第一次准备调用加载数据的方法
    第二次准备调用加载数据的方法
    进入加载数据的方法
    emit 1
    collect 1
    emit 2
    collect 2
    emit 3
    collect 3
    

    我们会发现,如果我们没有调用 flow 的 collect 方法,其实不会进入 flow 的代码块中,也就是说 flow 中的代码直到被 collect 调用的时候才会运行,否则会立即返回。

  2. Flow 的取消

    如果我们需要定时取消 flow 中代码块的执行,只需要使用 withTimeoutOrNull 函数添加超时时间即可,比如上述方法我们是在 3 秒内返回1、2、3,我们限定其在 2500 毫秒内执行完毕:

    fun loadData() = flow {
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            withTimeoutOrNull(2500) {
                loadData().collect {
                    println("collect $it")
                }
            }
        }
    }

    我们运行 main 方法,则只有 1、2 两个数字进行了打印:

    emit 1
    collect 1
    emit 2
    collect 2
    

三、Flow 的操作符

  1. map

    使用 map 我们可以将最终结果映射为其他类型,代码如下所示:

    fun loadData() = flow {
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }
    
    fun changeData(value: Int): String {
        return "打印的结果是:$value"
    }
    
    fun main() {
        runBlocking {
            loadData().map {
                changeData(it)
            }.collect {
                println("collect $it")
            }
        }
    }

    我们通过 map 操作符将结果映射为字符串的形式,运行 main 打印结果如下所示:

    emit 1
    collect 打印的结果是:1
    emit 2
    collect 打印的结果是:2
    emit 3
    collect 打印的结果是:3
    
  2. filter

    通过 filter 我们可以对结果集添加过滤条件,如下所示,我们仅打印出大于 1 的值:

    fun loadData() = flow {
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            loadData().filter {
                it > 1
            }.collect {
                println("collect $it")
            }
        }
    }

    打印结果如下所示:

    emit 1
    emit 2
    collect 2
    emit 3
    collect 3
    
  3. toList

    使用 toList 可以转换为list集合:

    fun loadData() = flow {
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            val list = loadData().toList()
            println("toList: $list")
        }
    }

    打印结果如下所示:

    emit 1
    emit 2
    emit 3
    toList: [1, 2, 3]
    
  4. reduce

    使用 reduce 可以将所有元素组合到一个单一的结果中:

    fun loadData() = flow {
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            val data = loadData().reduce { a, b ->
                a + b
            }
            println("data: $data")
        }
    }

    打印结果如下所示:

    emit 1
    emit 2
    emit 3
    data: 6
    
  5. fold

    使用 fold 可以将所有元素组合到一个单一的结果中。和 reduce 不同的是,fold 允许你提供一个初始值作为组合操作的起点:

    fun loadData() = flow {
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            val data = loadData().fold(10) { a, b ->
                a + b
            }
            println("data: $data")
        }
    }

    打印结果如下所示:

    emit 1
    emit 2
    emit 3
    data: 16
    
  6. flowOn

    flow 的代码块是执行在执行时的上下文中,比如我们不能通过在 flow 中指定线程来运行 flow 代码中的代码,如下所示:

    fun loadData() = flow {
        withContext(Dispatchers.Default) {
            for (i in 1..3) {
                delay(1000)
                println("emit $i")
                emit(i)
            }
        }
    }
    
    fun main() {
        runBlocking {
            loadData().collect {
                println("collect $it")
            }
        }
    }

    此种运行方式,将会抛出异常:

    Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
            Flow was collected in [BlockingCoroutine{Active}@654ef498, BlockingEventLoop@39459354],
            but emission happened in [DispatchedCoroutine{Active}@47445fb2, Dispatchers.Default].
    

    那么我们如何指定 flow 代码块中的上下文呢,我们需要使用 flowOn 操作符,我们将 flow 代码块中的代码指定在 IO 线程中,代码如下所示:

    fun loadData() = flow {
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }.flowOn(Dispatchers.IO)
    
    fun main() {
        runBlocking {
            loadData().collect {
                println("collect $it")
            }
        }
    }

    这样我们就把 flow 代码块中的事情放到了 IO 线程中。

  7. buffer

    协程可以提升并发请求的效率,而在 flow 代码块中,每当有一个处理结果,我们就可以收到,但如果处理结果也是耗时操作,我们来看下需要多长时间来处理,我们在打印前间隔 2 秒,并记录开始和完成的时间,代码如下所示:

    var startTime: Long = 0L
    var endTime: Long = 0L
    
    fun loadData() = flow {
        startTime = System.currentTimeMillis()
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            loadData().collect {
                delay(2000)
                println("collect $it")
            }
            endTime = System.currentTimeMillis()
            println("处理时间:${endTime - startTime}ms")
        }
    }

    运行 main 方法得到结果如下:

    emit 1
    collect 1
    emit 2
    collect 2
    emit 3
    collect 3
    处理时间:9021ms
    

    我们可以看到,处理三个数据,一共使用了 9 秒钟的时间。

    buffer 操作符可以使发射和收集的代码并发运行,从而提高效率,我们添加 buffer 代码如下所示:

    var startTime: Long = 0L
    var endTime: Long = 0L
    
    fun loadData() = flow {
        startTime = System.currentTimeMillis()
        for (i in 1..3) {
            delay(1000)
            println("emit $i")
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            loadData().buffer().collect {
                delay(2000)
                println("collect $it")
            }
            endTime = System.currentTimeMillis()
            println("处理时间:${endTime - startTime}ms")
        }
    }

    再次运行 main 方法,结果如下所示:

    emit 1
    emit 2
    collect 1
    emit 3
    collect 2
    collect 3
    处理时间:7025ms
    

    由此看出,时间较少了将近 2 秒,不要小看这小小的 2 秒,运行在手机上还是相当重要的~

  8. zip

    使用 zip 可以合并两个 flow,代码如下所示:

    fun loadData1() = flow {
        for (i in 1..3) {
            delay(1000)
            emit("data1:$i")
        }
    }
    
    fun loadData2() = flow {
        for (i in 1..3) {
            delay(1000)
            emit("data2: $i")
        }
    }
    
    fun main() {
        runBlocking {
            loadData1().zip(loadData2()) { a, b ->
                "$a, $b"
            }.collect {
                delay(2000)
                println("collect $it")
            }
        }
    }

    运行结果如下所示:

    collect data1:1, data2: 1
    collect data1:2, data2: 2
    collect data1:3, data2: 3
    
  9. combine

    使用 combine 可以将两个或更多的 flow 组合成一个单一的值,代码如下所示:

    fun loadData1() = flow {
        for (i in 1..3) {
            delay(1000)
            emit("data1:$i")
        }
    }
    
    fun loadData2() = flow {
        for (i in 1..3) {
            delay(1000)
            emit("data2: $i")
        }
    }
    
    fun main() {
        runBlocking {
            loadData1().combine(loadData2()) { a, b ->
                "$a, $b"
            }.collect {
                delay(2000)
                println("collect $it")
            }
        }
    }

    运行结果如下所示:

    collect data1:1, data2: 1
    collect data1:3, data2: 2
    collect data1:3, data2: 3
    

    注意,combine 与 zip 不同,zip 会等待每个流发出一个新的值,然后将这两个值组合在一起。而 combine 只要任何一个流发出新的值,就会使用最新的值进行组合。因此,如果两个流的发射速率不同,combine 的结果可能会包含相同的值。

  10. flatMapConcat

    使用 flatMapConcat 可以将每个原始流中的值转换为一个新的流,并将这些新流的值连接到一个单一的结果流中,代码如下所示:

    fun loadData(i: Int) = flow {
        for (j in 1..3) {
            delay(1000)
            emit("data $i: $j")
        }
    }
    
    fun main() {
        runBlocking {
            flowOf(1, 2, 3).flatMapConcat { i ->
                loadData(i)
            }.collect {
                println("collect $it")
            }
        }
    }

    运行结果如下所示:

    collect data 1: 1
    collect data 1: 2
    collect data 1: 3
    collect data 2: 1
    collect data 2: 2
    collect data 2: 3
    collect data 3: 1
    collect data 3: 2
    collect data 3: 3
    
  11. flatMapMerge

    使用 flatMapMerge 可以将每个原始流中的值转换为一个新的流,并将这些新流的值合并到一个单一的结果流中,代码如下所示:

    fun loadData(i: Int) = flow {
        for (j in 1..3) {
            delay(1000)
            emit("data $i: $j")
        }
    }
    
    fun main() {
        runBlocking {
            flowOf(1, 2, 3).flatMapMerge { i ->
                loadData(i)
            }.collect {
                println("collect $it")
            }
        }
    }

    运行结果如下所示:

    collect data 1: 1
    collect data 2: 1
    collect data 3: 1
    collect data 1: 2
    collect data 2: 2
    collect data 3: 2
    collect data 1: 3
    collect data 2: 3
    collect data 3: 3
    
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant