Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StateFlow with SharingStarted.Lazily parameter does not start background coroutine in specific cases #2488

Closed
denyshorman opened this issue Jan 21, 2021 · 8 comments

Comments

@denyshorman
Copy link

denyshorman commented Jan 21, 2021

Consider the following test:

@Test
fun lazyStateFlowTest() = runBlocking(Dispatchers.Default) {
    val scope = CoroutineScope(Dispatchers.Default + SupervisorJob() + CoroutineName("Test"))

    val flow0 = flow {
        var i = 1
        while (true) {
            delay(100)
            emit(i++)
        }
    }.stateIn(scope, SharingStarted.Lazily, initialValue = 1)

    val flow1 = flow0.stateIn(scope, SharingStarted.Lazily, initialValue = 0)

    assertEquals(expected = 0, actual = flow1.first()) // actual is 0
    delay(3000)
    assertNotEquals(illegal = 0, actual = flow1.first()) // actual is 0 but must be non-zero
}

The reference for the SharingStarted.Lazily says: "Sharing is started when the first subscriber appears and never stops".
When flow1.first() statement is executed, based on the reference, it is expected that flow1 stream starts on the background and never stops.
The test above shows that the actual behavior differs from the expected: second call of flow1.first() statement sends 0 value but it shouldn't because flow0 has already populated flow1 state with new values.

It happens, perhaps, because first() operator gets the default value and cancels subscription from flow1 stream before it subscribes to flow0. In some cases the test above is passed.

Another example that can reproduce race condition:

@Test
fun lazyStateFlowTest2() = runBlocking(Dispatchers.Default) {
    coroutineScope {
        repeat(1000) {
            launch {
                val x = MutableStateFlow(value = 1)
                val y = MutableStateFlow(value = 1)

                val z = x.combine(y) { a, b -> a + b }
                    .stateIn(CoroutineScope(Dispatchers.Default), SharingStarted.Lazily, initialValue = 0)

                assertEquals(expected = 0, actual = z.first())
                delay(2000)
                assertEquals(expected = 2, actual = z.first())
            }
        }
    }
}

Kotlin: 1.4.21
Coroutines: 1.4.2
Runtime: JVM

@elizarov
Copy link
Contributor

The root cause is that stateIn does not always subscribe to the upstream flow when its downstream subscriber gets initial value and cancels too fast. One can argue that it is the right behavior. I wonder how critical is that and what would be should be the right behavior? Can you, please, exaplain how did you run into this problem? What kind of app you were writing? What were you trying to do with flows?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jan 22, 2021

One can argue that it is the right behavior

Our documentation for SharingStarted.Lazily states: "Sharing is started when the first subscriber appears and never stops."

Now consider a simplified reproducer without flaky timings:

val flow0 = flow {
    println("Flow is started")
    emit(1)
}.stateIn(this, SharingStarted.Lazily, initialValue = 0)

assertEquals(expected = 0, actual = flow0.first())
yield()
assertNotEquals(illegal = 0, actual = flow0.first())

This code will fail, Flow is started won't be printed. This definitely violates the principle of the least surprise and should be addressed somehow

@elizarov elizarov self-assigned this Jan 22, 2021
@elizarov
Copy link
Contributor

Agreed. I'll fix it.

@elizarov
Copy link
Contributor

Having investigated it I don't see a way to fix it. The underlying reason of this behavior is that Flow and SharedFlow architecture is highly asynchronous. In particular, the number of subscriptions is tracked via a conflated StateFlow<Int>. So, when a subscriber appears and disappears very fast the change in the number of subscribers from 0 to 1 and back to 0 can happen fast, will get conflated, and could not be noticed as a result. For the SharingStarted.Lazily it looks like if nothing had happenned at all. The only way to "fix it" that I see so far is to update documentation to note this fact.

@hrach
Copy link
Contributor

hrach commented Jan 25, 2021

in the number of subscribers from 0 to 1 and back to 0 can happen fast

Couldn't be this workarounded by providing other initial value (like -1) so that change to 0 would trigger the lazily started sharing?

@elizarov
Copy link
Contributor

elizarov commented Jan 25, 2021

There are some other hacks that can be made to make it work specifically for the case of Lazily, but the problem is far more wide-spread. I can have a custom starting policy that waits for a specified number of subscribers to appear and it will suffer from the same asynchrony. I frankly think this asynchrony should be treated as a feature, not as a bug as it pertains to many aspects of how various Flow operators work and is not limited to shared flows.

@denyshorman
Copy link
Author

I, as a library user, would definitely want to have the best performance and expected behavior for the StateFlow created using stateIn operator with SharingStarted.Lazily parameter.

StateFlow basically represents a state. If I see StateFlow, I understand that I can get current state (.value) or subscribe to the state updates (.collect).
On the other hand, API designer can expose StateFlow. It can be simple MutableStateFlow or StateFlow created with stateIn operator.
Also designer can apply optimization strategy depending on the nature of the data (Lazy or Custom optimization, Eager - no optimization).
If a designer decides to optimize StateFlow and a user decides to get the data in a non-expected way (e.g. by calling first operator), then the user has a risk to run into unexpected behavior.

I see 3 solutions for this problem:

  1. Fix stateIn optimizations to eliminate any unexpected behavior
  2. Deny optimizations for stateIn (leave only SharingStarted.Eagerly since required optimizations safely can be achieved with shareIn)
  3. Leave all as is but add warning message that stateIn with optimizations should be used with caution

I would personally strive for the ideal solution by providing extensive API and expected behavior (1) but if it is not possible for some reason (e.g. performance downgrade), I would eliminate any controversial API that can lead to unexpected behavior (2), especially if it can be replaced safely with similar code.

@LouisCAD
Copy link
Contributor

LouisCAD commented Mar 27, 2021

Another solution is to make the number of subscriptions a SharedFlow instead of a StateFlow, for the cases that care about history and not just the latest value, ignoring changes reverted very fast.

qwwdfsad added a commit that referenced this issue Aug 11, 2021
Sharing strategies are too sensitive to conflation around extrema and may miss the necessity to start or not to stop the sharing. For more particular examples see #2863 and #2488

Fixes #2488
Fixes #2863
Fixes #2871
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
…#2872)

* Non-conflating subscription count in SharedFlow and StateFlow

Sharing strategies are too sensitive to conflation around extrema and may miss the necessity to start or not to stop the sharing. For more particular examples see Kotlin#2863 and Kotlin#2488

Fixes Kotlin#2488
Fixes Kotlin#2863
Fixes Kotlin#2871
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants