Skip to content

Latest commit

ย 

History

History
869 lines (778 loc) ยท 25.6 KB

asynchronous_flow.md

File metadata and controls

869 lines (778 loc) ยท 25.6 KB

Asynchronous Flow

์—ฌ๋Ÿฌ ๊ฐ’์„ ํ‘œ์‹œํ•˜๋Š” ๋ฐฉ๋ฒ•

Collection

๋ชจ๋“  ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•œ ์ˆ˜ ํ•œ๋ฒˆ์— List<Int>๋ฅผ ๋ฐ˜ํ™˜ ํ•œ๋‹ค

fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
    simple().forEach { value -> println(value) } 
}

Sequences

์ค‘๊ฐ„์— ๋ฆฌ์ŠคํŠธ๋ฅผ ๋งŒ๋“ค์ง€ ์•Š๊ณ  ๊ฐ’์„ ์ˆœ์ฐจ์ ์œผ๋กœ ๊ณ„์‚ฐํ•œ๋‹ค

fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}
listOf(1, 2, 3, 4)
    .asSequence()
    .map { it * it }
    .find { it > 3 }

// list(1) -> map(1*1) -> find(1>3) -> list(2) -> map(2*2) -> find(4>3) -> ์ข…๋ฃŒ

Kotlin-Sequences

Suspending functions

suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

Flows

List๊ฐ€ ๋ชจ๋“  ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•œ ํ›„ ํ•œ๋ฒˆ์— ๋ชจ๋“ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜๋Š”๊ฒƒ๊ณผ ๋ฐ˜๋Œ€๋กœ
Flow๋Š” Sequnce์™€ ๊ฐ™์ด ์ˆœ์ฐจ์ ์œผ๋กœ ๊ฐ’์„ ๋‚ด๋ณด๋‚ด๊ณ  ์ •์ƒ์ ์œผ๋กœ ์™„๋ฃŒ ๋˜๋Š” ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๋Š” ๋น„๋™๊ธฐ ์ŠคํŠธ๋ฆผ์ด๋‹ค.

fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}

์œ„ ์ฝ”๋“œ๋Š”

  • Flow ํƒ€์ž…์„ ์ƒ์„ฑ์€ flow { } ๋นŒ๋”๋ฅผ ์ด์šฉํ•œ๋‹ค
  • flow { ... } ๋ธ”๋ก ์•ˆ์˜ ์ฝ”๋“œ๋Š” ์ค‘๋‹จ ๊ฐ€๋Šฅํ•˜๋‹ค
  • simple() ํ•จ์ˆ˜๋Š” ๋”์ด์ƒ suspend ๋กœ ์„ ์–ธํ•˜์ง€ ์•Š๋Š”๋‹ค
  • ๊ฒฐ๊ณผ ๊ฐ’๋“ค์€ flow ์—์„œ emit() ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•˜์—ฌ ๋ฐฉ์ถœ๋œ๋‹ค
  • flow ์—์„œ ๋ฐฉ์ถœ๋œ ๊ฐ’๋“ค์€ collect ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•˜์—ฌ ์ˆ˜์ง‘๋œ๋‹ค
  • main thread๋ฅผ ์ฐจ๋‹จํ•˜์ง€ ์•Š๊ณ  println(value)ํ•˜๊ธฐ ์ „์— 100ms๋ฅผ ๋Œ€๊ธฐํ•œ๋‹ค
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

ํ”Œ๋กœ์šฐ๋Š” ์ฐจ๊ฐ‘๋‹ค(Flows are cold)

flow๋Š” sequnce์™€ ๋น„์Šทํ•˜๊ฒŒ cold ์ŠคํŠธ๋ฆผ์ด๋‹ค.
flow{ } ๋นŒ๋”๊ฐ€ collect() ๋ฅผ ํ˜ธ์ถœํ•  ๋•Œ ๊นŒ์ง€ ์‹คํ–‰๋˜์ง€ ์•Š๋Š”๋‹ค.
์ด ๋•Œ๋ฌธ์— suspend ๋กœ ์„ ์–ธํ•˜์ง€ ์•Š์•„๋„ ๋˜๋Š”๊ฒƒ์ด๋‹ค.

ํ”Œ๋กœ์šฐ์˜ ์ทจ์†Œ(Flow cancellation basics)

flow์˜ ์ทจ์†Œ๋Š” coroutine์˜ ์ผ๋ฐ˜์ ์ธ ํ˜‘๋ ฅ์ ์ธ ์ทจ์†Œ๋ฅผ ์ง€ํ‚จ๋‹ค.

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

flow๊ฐ€ withTimeoutOrNull{ } ์—์„œ ์‹คํ–‰๋ ๋•Œ

Emitting 1 
1 
Emitting 2 
2 
Done

ํ”Œ๋กœ์šฐ ๋นŒ๋”(Flow Builders)

  • flow { }
    • ๊ธฐ๋ณธ์ ์ธ flow Builder
  • flowOf(...)
    • ๊ณ ์ •๋œ ๊ฐ’์„ ๋ฐฉ์ถœํ•˜๋Š” flow ์ •์˜
  • awFlow()
    • ๋‹ค์–‘ํ•œ Collection, Sequnce๋ฅผ ํ™•์žฅํ•จ์ˆ˜๋ฅผ ํ†ตํ•˜์—ฌ Flow๋กœ ๋ณ€ํ™˜

์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž(Intermedicate flow operators)

Collection์ด๋‚˜ Sequnce์™€ ๋™์ผํ•˜๊ฒŒ ์—ฐ์‚ฐ์ž๋กœ ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค.
ํ•˜์ง€๋งŒ ์ค‘์š”ํ•œ ์ฐจ์ด์ ์€ ์—ฐ์‚ฐ์ž๋กœ ์ˆ˜ํ–‰๋˜๋Š” ์ฝ”๋“œ๋ธ”๋Ÿญ์—์„œ suspend ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•  ์ˆ˜ ์žˆ๋‹ค.
์ต์ˆ™ํ•œ map(), filter() ๋“ฑ์ด ๋Œ€ํ‘œ์ ์ธ ์˜ˆ์‹œ์ด๋‹ค.

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }

๋ณ€ํ™˜ ์—ฐ์‚ฐ์ž(Transform operator)

ํ”Œ๋กœ์šฐ ๋ณ€ํ™˜ ์—ฐ์‚ฐ์ž๋“ค ์ค‘์—์„œ ๊ฐ€์žฅ ์ผ๋ฐ˜์ ์ธ ๊ฒƒ์€ transform ์—ฐ์‚ฐ์ž๋‹ค.
์ด ์—ฐ์‚ฐ์ž๋Š” map ์ด๋‚˜ filter() ๊ฐ™์€ ๋‹จ์ˆœํ•œ ๋ณ€ํ™˜์ด๋‚˜ ํ˜น์€ ๋ณต์žกํ•œ ๋‹ค๋ฅธ ๋ณ€ํ™˜๋“ค์„ ๊ตฌํ˜„ํ•˜๊ธฐ ์œ„ํ•ด ๋œ๋‹ค.
transform() ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์šฐ๋ฆฌ๋Š” ์ž„์˜์˜ ํšŸ์ˆ˜๋กœ ์ž„์˜์˜ ๊ฐ’๋“ค์„ ๋ฐฉ์ถœํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }
}

์˜ˆ๋ฅผ ๋“ค์–ด, transform ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๋Š” ๋น„๋™๊ธฐ ์š”์ฒญ์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์ „์— ๊ธฐ๋ณธ ๋ฌธ์ž์—ด์„ ๋จผ์ € ๋ฐฉ์ถœํ•˜๊ณ 
์š”์ฒญ์— ๋Œ€ํ•œ ์‘๋‹ต์ด ๋„์ฐฉํ•˜๋ฉด ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ๋ฐฉ์ถœํ•  ์ˆ˜ ์žˆ๋‹ค.

Making request 1 
response 1 
Making request 2 
response 2 
Making request 3 
response 3

ํฌ๊ธฐ ์ œํ•œ ์—ฐ์‚ฐ์ž(Size-limiting operators)

take๊ฐ™์€ ํฌ๊ธฐ ์ œํ•œ ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž๋Š” ์ •์˜๋œ ์ œํ•œ์น˜์— ๋„๋‹ฌํ•˜๋ฉด ์‹คํ–‰์„ ์ทจ์†Œํ•œ๋‹ค.
์ฝ”๋ฃจํ‹ด์—์„œ ์ทจ์†Œ๋Š” ์–ธ์ œ๋‚˜ ์˜ˆ์™ธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ค๋Š” ๋ฐฉ์‹์œผ๋กœ ์ˆ˜ํ–‰ ๋˜๋ฉฐ,
์ด๋ฅผ ํ†ตํ•ด try { ... } finally { ... } ๋กœ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ๋“ฑ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}  
1 
2 
Finally in numbers

ํ”Œ๋กœ์šฐ ์ข…๋‹จ ์—ฐ์‚ฐ์ž(Terminal flow operator)

ํ”Œ๋กœ์šฐ ์ˆ˜์ง‘์„ ์‹œ์ž‘ํ•˜๋Š” ์ค‘๋‹จ ํ•จ์ˆ˜์ด๋‹ค.

  • collect()
  • toList()
  • toSet()
  • reduce()
  • fold()
val sum = (1..5).asFlow()
   .map { it * it } // squares of numbers from 1 to 5                           
   .reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)

ํ”Œ๋กœ์šฐ๋Š” ์ˆœ์ฐจ์ ์ด๋‹ค(Flow are sequential)

Flow๋Š” Sequnce์ฒ˜๋Ÿผ ๊ธฐ๋ณธ์ ์œผ๋กœ collect() ๋“ฑ์˜ ์ข…๋‹จ ์—ฐ์‚ฐ์ž๊ฐ€ ํ˜ธ์ถœ๋  ๋•Œ ์ˆœ์ฐจ์ ์œผ๋กœ ์—ฐ์‚ฐ๋œ๋‹ค.

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }    
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

ํ”Œ๋กœ์šฐ ์ปจํ…์ŠคํŠธ(Flow Context)

flow์˜ ์ˆ˜์ง‘(์ข…๋‹จํ•จ์ˆ˜ ํ˜ธ์ถœ)์€ ํ•ญ์ƒ CoroutineContext์•ˆ์—์„œ ์ˆ˜ํ–‰๋œ๋‹ค.
์ด๋ฅผ ์ปจํ…์ŠคํŠธ ๋ณด์กด(context preservation) ์ด๋ผ ํ•œ๋‹ค.

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}      
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

withContext๋ฅผ ํ†ตํ•œ ์ž˜๋ชป ๋œ ๋ฐฉ์ถœ(Wrong emission withContext)

flow{ } ๋ธ”๋Ÿญ ๋‚ด์—์„œ withContext ๋กœ CoroutineContext๋ฅผ ๋ณ€๊ฒฝํ•˜๋ฉด ์•ˆ๋œ๋‹ค.

fun simple(): Flow<Int> = flow {
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100)
            emit(i) 
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}   

๋‹ค์Œ๊ณผ ๊ฐ™์€ ์—๋Ÿฌ ๋ฐœ์ƒ

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
        Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
        but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
        Please refer to 'flow' documentation or use 'flowOn' instead
    at ...

flowOn ์—ฐ์‚ฐ์ž(flowOn operator)

flow์—์„œ CoroutineContext๋ฅผ ๋ณ€๊ฒฝํ•˜๋ ค๋ฉด flowOn() ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•ด์•ผํ•œ๋‹ค.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) 
        log("Emitting $i")
        emit(i) / emit next value
    }
}.flowOn(Dispatchers.Default) 

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}       

flow {} ์€ Background์—์„œ ๋™์ž‘ํ•˜๋ฉฐ,
๊ทธ ์ดํ›„ collect() ๋Š” MainThread์—์„œ ์‹คํ–‰๋œ๋‹ค.

[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

flowOn ์—ฐ์‚ฐ์ž๊ฐ€ CoroutineDispatcher๋ฅผ ๋ณ€๊ฒฝํ•  buffering ๋งค์ปค๋‹ˆ์ฆ˜์„ ์‚ฌ์šฉํ•˜๊ฒŒ๋˜์–ด
flow{ } ์˜ ๊ธฐ๋ณธ์ ์ธ ํŠน์„ฑ์ธ ์ˆœ์ฐจ์„ฑ์„ ์žƒ์–ด๋ฒ„๋ฆฌ๊ฒŒ ๋  ์ˆ˜ ์žˆ๋‹ค.

๋ฒ„ํผ๋ง(Buffering)

๋น„๋™๊ธฐ ์ž‘์—…์˜ ๊ฒฝ์šฐ์— buffer๋ฅผ ์‚ฌ์šฉํ•˜์˜€์„ ๋•Œ ์‹œ๊ฐ„์ด ๋‹จ์ถ•๋˜๋Š” ๊ฒฝ์šฐ๋„ ์žˆ๋‹ค.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300)
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

(100ms + 300ms) * 3 = 1200ms ์˜ ์‹œ๊ฐ„์ด ๊ฑธ๋ฆฐ๋‹ค

1
2
3
Collected in 1220 ms

์•„๋ž˜์™€ ๊ฐ™์ด buffer๋ฅผ ์‚ฌ์šฉํ•ด๋ณด๋ฉด

val time = measureTimeMillis {
    simple()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

์ฒซ ๋ฒˆ์งธ ์ˆ˜๋ฅผ ์œ„ํ•ด์„œ 100ms๋ฅผ ๊ธฐ๋‹ค๋ฆฐ ๋’ค ๊ฐ๊ฐ์˜ ์ˆ˜์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด์„œ 300ms๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๊ฒŒ ๋œ๋‹ค 100ms + (300ms * 3) = 1000ms

1
2
3
Collected in 1034 ms

๋˜ํ•œ flowOn ์—ฐ์‚ฐ์ž๊ฐ€ flowOn ์—ฐ์‚ฐ์ž๊ฐ€ CoroutineDispatcher๋ฅผ ๋ณ€๊ฒฝํ•  ๊ฒฝ์šฐ ๋™์ผํ•œ ๋ฒ„ํผ๋ง ๋งค์ปค๋‹ˆ์ฆ˜์„ ์‚ฌ์šฉํ•œ๋‹ค.

๋ณ‘ํ•ฉ(Conflation)

flow๊ฐ€ ์—ฐ์‚ฐ์˜ ์ผ๋ถ€๋ถ„์ด๋‚˜, ์ƒํƒœ์˜ ์—…๋ฐ์ดํŠธ๋งŒ์„ ์ฒ˜๋ฆฌํ•ด์•ผ ํ•  ๊ฒฝ์šฐ conflate()๋ฅผ ๋ณ‘ํ•ฉ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
conflate() ์„ ์‚ฌ์šฉํ•˜์—ฌ collect() ์˜ ์ฒ˜๋ฆฌ๊ฐ€ ๋„ˆ๋ฌด ๋Š๋ฆด ๊ฒฝ์šฐ ๋ฐฉ์ถœ๋œ ์ค‘๊ฐ„ ๊ฐ’์„ ์Šคํ‚ตํ•  ์ˆ˜ ์žˆ๋‹ค.

val time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

๋‘ ๋ฒˆ์งธ ์ˆ˜๋ฅผ ์Šคํ‚ตํ•˜๊ณ  ๊ฐ€์žฅ ์ตœ๊ทผ ๊ฐ’์ธ ์„ธ ๋ฒˆ์งธ ์ˆ˜๊ฐ€ collect() ๋กœ ์ „๋‹ฌ๋œ๋‹ค.

1 
3 
Collected in 758 ms

์ตœ์‹  ๊ฐ’ ์ฒ˜๋ฆฌ(Processing the latest value)

์ค‘๊ฐ„๊ฐ’์„ ๋ชจ๋‘ ์‚ญ์ œํ•˜์—ฌ ์ตœ์‹ ์˜ ๊ฐ’๋งŒ์„ ์ฒ˜๋ฆฌํ•˜์—ฌ ์†๋„๋ฅผ ๋†’์ด๋Š” ๋ฐฉ๋ฒ•๋„ ์žˆ๋‹ค.
collectLatest() ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ƒˆ๋กœ์šด ๊ฐ’์ด emit๋  ๋•Œ ๋งˆ๋‹ค ๊ธฐ์กด์˜ collect ์ž‘์—…์„ ์ทจ์†Œํ•˜๊ณ  ์žฌ์‹œ์ž‘ ํ•œ๋‹ค.

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            println("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            println("Done $value") 
        } 
}   
println("Collected in $time ms")

๋งˆ์ง€๋ง‰ ๊ฐ’ 3์— ๋Œ€ํ•ด์„œ๋งŒ collect ๋ฅผ ๋๊นŒ์ง€ ์ˆ˜ํ–‰ํ•œ๋‹ค.
์•ฝ (100ms * 3) + 300ms = 600ms ์˜ ์‹œ๊ฐ„์ด ์†Œ์š”๋œ๋‹ค.

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 677 ms

๋‹ค์ค‘ ํ”Œ๋กœ์šฐ ํ•ฉ์„ฑ(Composing multiple flows)

์—ฌ๋Ÿฌ๊ฐ€์ง€ flow๋ฅผ ํ•ฉ์„ฑํ•˜๋Š” ์—ฌ๋Ÿฌ๊ฐ€์ง€ ๋ฐฉ๋ฒ•์ด ์žˆ๋‹ค.

Zip

val nums = (1..3).asFlow() 
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" }
    .collect { println(it) } 

Combine

๋‘ flow์—์„œ ๋ฐฉ์ถœ์ด ์ผ์–ด๋‚ ๋•Œ๋งˆ๋‹ค ๋‹ค๋ฅธ flow์˜ ์ตœ์‹ ๊ฐ’์„ ๊ฐ€์ง€๊ณ  ๋ณ‘ํ•ฉํ•˜์—ฌ ์ถœ๋ ฅ.

val nums = (1..3).asFlow().onEach { delay(300) } 
val strs = flowOf("one", "two", "three").onEach { delay(400) } 

val startTime = System.currentTimeMillis() 
nums.combine(strs) { a, b -> "$a -> $b" } 
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
1 -> one at 452 ms from start 
2 -> one at 651 ms from start 
2 -> two at 854 ms from start 
3 -> two at 952 ms from start 
3 -> three at 1256 ms from start

ํ”Œ๋กœ์šฐ ํ”Œ๋ž˜ํŠธ๋‹(Flattening flows)

flow{ flow{ } } ์ฒ˜๋Ÿผ flow๊ฐ€ ์ค‘์ฒฉ๋˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ์žˆ๋‹ค. ( Flow<Flow<String>>) ์˜ˆ๋ฅผ๋“ค์–ด

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}
(1..3).asFlow().map { requestFlow(it) }

์ด๋Ÿฌํ•œ ๊ฒฝ์šฐ flattening์ด ์—ฐ์‚ฐ์ž๋กœ flattning์ด ํ•„์š”ํ•˜๋‹ค

flatMapConcat

flatMapConcat()์€ ํ˜„์žฌ flow๊ฐ€ ์™„๋ฃŒ๋œ ํ›„ ๊ทธ ๋‹ค์Œ flow๋ฅผ ์ˆœ์ฐจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ณ  ๊ฒฐ๊ณผ๋กœ Flow๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค

val startTime = System.currentTimeMillis() 
(1..3).asFlow().onEach { delay(100) 
    .flatMapConcat { requestFlow(it) }              
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
1: First at 121 ms from start 
1: Second at 622 ms from start 
2: First at 727 ms from start 
2: Second at 1227 ms from start 
3: First at 1328 ms from start 
3: Second at 1829 ms from start

flatMapMerge

flatMapMerge()๋Š” ๋‘ flow๋ฅผ ๋™์‹œ์— ์ˆ˜์ง‘ํ•˜๊ณ  ๊ฐ€๋Šฅํ•œ ๋นจ๋ฆฌ ๊ฐ’์„ ๋ฐฉ์ถœํ•˜๋„๋ก ํ•œ๋‹ค.

val startTime = System.currentTimeMillis() 
(1..3).asFlow().onEach { delay(100) }
    .flatMapMerge { requestFlow(it) }
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

flatMapLatest

flatMapLatest()๋Š” ์ƒˆ๋กœ์šด flow๊ฐ€ ๋ฐฉ์ถœ๋  ๋•Œ๋งˆ๋‹ค ์ง์ „ flow๋ฅผ ์ทจ์†Œํ•œ๋‹ค.

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapLatest { requestFlow(it) }
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

์ƒˆ ๊ฐ’์ด ๋ฐฉ์ถœ๋˜๋ฉด ํ˜„์žฌ ์ง„ํ–‰์ค‘์ธ { requestFlow(it) } ๋ฅผ ์ทจ์†Œํ•œ๋‹ค.

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start

ํ”Œ๋กœ์šฐ ์˜ˆ์™ธ(Flow Exception)

flow๋Š” ๋ธ”๋Ÿญ ์•ˆ์—์„œ ์ฝ”๋“œ๊ฐ€ ์˜ˆ์™ธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ค๋ฉด ์˜ˆ์™ธ ๋ฐœ์ƒ ์ƒํƒœ๋กœ ์ข…๋ฃŒ๋œ๋‹ค.
์˜ˆ์™ธ์ฒ˜๋ฆฌ์— ๋Œ€ํ•˜์—ฌ ์•Œ์•„๋ณด์ž

์ˆ˜์ง‘๊ธฐ์˜ try and catch(Collector try and catch)

collector์—์„œ try{ } catch{ } ๋ฅผ ์‚ฌ์šฉ

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}   
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

๋ชจ๋“  ์˜ˆ์™ธ์ฒ˜๋ฆฌ(Everthing is cautch)

flow{ } ์ด๋‚˜, ์ค‘๊ฐ„์—ฐ์‚ฐ์ž, ์ข…๋‹จ์—ฐ์‚ฐ์ž ๋“ฑ์—์„œ ๋ฐœ์ƒํ•˜๋Š” ๋ชจ๋“  ์—๋Ÿฌ๋„ try{ } catch{ } ๋กœ ์˜ˆ์™ธ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}     
Emitting 1 
string 1 
Emitting 2 
Caught java.lang.IllegalStateException: Crashed on 2

์˜ˆ์™ธ ํˆฌ๋ช…์„ฑ(Exception transparency)

์œ„์˜ try{ } catch{ } ๋ฅผ ์‚ฌ์šฉํ•˜๋Š”๊ฒƒ์€ ์˜ˆ์™ธํˆฌ๋ช…์„ฑ์„ ์œ„๋ฐ˜ํ•˜๋Š” ๊ฒƒ์ด๋‹ค.
์˜ˆ์™ธ ํˆฌ๋ช…์„ฑ์„ ๋ณด์กดํ•˜๊ธฐ ์œ„ํ•œ ๋ฐฉ๋ฒ•์œผ๋กœ ์•„๋ž˜์™€ ๊ฐ™์€ ๋ฐฉ๋ฒ•์ด ์žˆ๋‹ค.

  • throw ์—ฐ์‚ฐ์ž๋ฅผ ํ†ตํ•œ ์˜ˆ์™ธ ๋‹ค์‹œ ๋˜์ง€๊ธฐ
  • catch() ๋กœ์ง์—์„œ emit() ์„ ์‚ฌ์šฉํ•˜์—ฌ ๊ฐ’ ํƒ€์ž…์œผ๋กœ ๋ฐฉ์ถœ
  • ๋‹ค๋ฅธ ์ฝ”๋“œ๋ฅผ ํ†ตํ•œ ์˜ˆ์™ธ ๋ฌด์‹œ, ๋กœ๊น…, ๊ธฐํƒ€ ์ฒ˜๋ฆฌ

catch ์˜ˆ์™ธ ํˆฌ๋ช…์„ฑ(Transparent catch)

catch()๋Š” ์—…์ŠคํŠธ๋ฆผ์—์„œ ๋ฐœ์ƒํ•œ ์˜ˆ์™ธ๋งŒ์„ ์ฒ˜๋ฆฌํ•œ๋‹ค.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}     
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
 at FileKt$main$1$invokeSuspend$$inlined$collect$1.emit (Collect.kt:136) 
 ....

์„ ์–ต์ ์ธ ์—๋Ÿฌ ์บ์น˜(Catching declaratively)

simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

ํ”Œ๋กœ์šฐ ์™„๋ฃŒ(Flow completion)

flow์˜ ์ˆ˜์ง‘์ด ์ข…๋ฃŒ(์ •์ƒ์ข…๋ฃŒ or ์˜ˆ์™ธ๋ฐœ์ƒ)๋˜์—‡์„ ๋•Œ ๊ทธ ์ดํ›„ ๋™์ž‘์„ ์ฒ˜๋ฆฌํ•ด์•ผ ํ•  ๋•Œ ๊ฐ€ ์žˆ๋‹ค

์„ ์–ธ์ ์ธ ์ฒ˜๋ฆฌ(Declarative handling)

onCompletion() ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ
flow๊ฐ€ ์™„์ „ํžˆ ์ˆ˜์ง‘๋˜์—ˆ์„๋•Œ ์‹คํ–‰ํ•  ๋กœ์ง์„ ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }

onCompletion์„ ์‚ฌ์šฉํ•จ์œผ๋กœ์จ ์–ป์„ ์ˆ˜ ์žˆ๋Š” ์ตœ๋Œ€์˜ ์ด์ ์€
๋žŒ๋‹ค์— nullable๋กœ ์ •์˜๋˜๋Š” Throwable ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ด์šฉํ•ด ์ˆ˜์ง‘์ด ์„ฑ๊ณต์ ์œผ๋กœ ์ข…๋ฃŒ๋˜์—ˆ๋Š”์ง€ ์•Œ ์ˆ˜ ์žˆ๋‹ค๋Š” ์ ์ด๋‹ค.

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}            

onCompletion()์—ฐ์‚ฐ์ž๋Š” catch()์™€ ๋‹ฌ๋ฆฌ ์˜ˆ์™ธ๋ฅผ ์ฒ˜๋ฆฌํ•˜์ง€ ์•Š๋Š”๋‹ค. ์˜ˆ์™ธ๋Š” ๋‹ค์šด์ŠคํŠธ๋ฆผ์œผ๋กœ ๊ณ„์† ์ „๋‹ฌ๋œ๋‹ค.

1
Flow completed exceptionally
Caught exception

๋‹ค์šด์ŠคํŠธ๋ฆผ์—์„œ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•  ์‹œ Throwable์€ null์ด๋‹ค.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}
1
Flow completed with 
java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2

ํ”Œ๋กœ์šฐ ์‹คํ–‰(Launching flow)

flow์—์„œ ๋ฐœ์ƒํ•˜๋Š” ์ด๋ฒคํŠธ๋“ค์— ๋Œ€์‘ํ•˜๋Š” ์ฒ˜๋ฆฌ๋ฅผ ๊ฐ๊ฐ ํ•ด์•ผํ•œ๋‹ค๋ฉด ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž onEach()๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}       
Event: 1
Event: 2
Event: 3
Done

์ค‘๊ฐ„ ์—ฐ์‚ฌ์ž ์ด๋ฏ€๋กœ collect()๋ฅผ ํ˜ธ์ถœํ•˜์ง€ ์•Š์œผ๋ฉด ์ˆ˜์ง‘๋˜์ง€์•Š๋Š”๋‹ค.

ํ•˜์ง€๋งŒ ์ด๋•Œ launchIn()์„ ์‚ฌ์šฉํ•˜๋ฉด
flow์˜ ์ˆ˜์ง‘์„ ๋‹ค๋ฅธ Coroutine์—์„œ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ
์ด๋ฅผ ํ†ตํ•ด ์ดํ›„ ์ž‘์„ฑ๋œ ์ฝ”๋“œ๋“ค์ด ๊ณง๋ฐ”๋กœ ์‹คํ–‰๋˜๋„๋ก ํ•  ์ˆ˜ ์žˆ๋‹ค.

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
} 
Done
Event: 1
Event: 2
Event: 3

launchIn()์— ๋ฐ˜๋“œ์‹œ ํ•„์š”ํ•œ ์ธ์ž๋Š” CoroutineScope์ด๋‹ค.
๋˜ํ•œ launchIn()์€ Job์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

ํ”Œ๋กœ์šฐ ์ทจ์†Œ ํ™•์ธ(Flow cancellation checks)

flow { } ๋Š” ๋‚ด๋ณด๋‚ธ ๊ฐ ๊ฐ’์— ๋Œ€ํ•˜์—ฌ ์ž์ฒด์ ์œผ๋กœ ensureActive ๊ฒ€์‚ฌ๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค.

fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

3๊นŒ์ง€ ์ˆซ์ž๋ฅผ ๋ฐฉ์ถœํ•˜์˜€๊ณ  4๋ฒˆ์งธ๋ฅผ ๋ฐฉ์ถœํ•˜๊ณ  ๋‚œ๋’ค์—๋Š” collect ๋ฅผ ์‹คํ–‰ํ•  ์ˆ˜ ์—†์œผ๋ฏ€๋กœ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c

ํ•˜์ง€๋งŒ asFlow() ์™€ ๊ฐ™์€ ๋Œ€๋ถ€๋ถ„์˜ ๋‹ค๋ฅธ ์—ฐ์‚ฐ์ž๋“ค์€
์„ฑ๋Šฅ์ƒ์˜ ์ด์œ ๋กœ ์ž์ฒด์ ์œผ๋กœ ์ถ”๊ฐ€ ์ทจ์†Œ ํ™•์ธ์„ ์ˆ˜ํ–‰ํ•˜์ง€ ์•Š๋Š”๋‹ค.

          
fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

1~5๊นŒ์ง€ ๋ชจ๋“  ์ˆซ์ž๊ฐ€ ์ˆ˜์ง‘๋˜๊ณ  runBlocking ์ด ๋ฐ˜ํ™˜๋˜๊ธฐ ์ „์— ์ทจ์†Œ๊ฐ€ ๊ฐ์ง€๋œ๋‹ค

1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23

๋ฐ”์œ flow๋ฅผ ์ทจ์†Œ ๊ฐ€๋Šฅํ•˜๊ฒŒ ๋งŒ๋“ค๊ธฐ(Making busy flow cancellable)

cancellable() ์„ ์‚ฌ์šฉํ•˜๋ฉด .onEach{ currentCoroutineContext().ensureActive() }๋ฅผ ์ˆ˜ํ–‰ํ•˜์—ฌ 1~3๊นŒ์ง€ ์ˆซ์ž๋งŒ ์ˆ˜์ง‘๋œ๋‹ค

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365