Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coroutine commands that result in a Flow can hang #1837

Closed
mattfff opened this issue Aug 30, 2021 · 6 comments
Closed

Coroutine commands that result in a Flow can hang #1837

mattfff opened this issue Aug 30, 2021 · 6 comments
Assignees
Labels
type: bug A general bug
Milestone

Comments

@mattfff
Copy link

mattfff commented Aug 30, 2021

Bug Report

We observed that when executing an mget() with a large set of keys against an empty Redis DB, the call would periodically stall. Further, we observed that this only occurred when the size of the requested set of keys was some multiple of Channel.CHANNEL_DEFAULT_CAPACITY (64). This occurs with some degree of randomness, but the attached test can usually trigger it within 15 - 50 calls to mget.

Current Behavior:

The following was collected via kotlinx-coroutines-debug, once a stalled request occurred:

Coroutine DeferredCoroutine{Active}@7dae0690, state: SUSPENDED
at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:97)
at kotlinx.coroutines.flow.FlowKt__CollectionKt.toCollection(Collection.kt:32)
....application stack

These are permanently SUSPENDED, and never resume.

Input Code

Code Sample
import io.lettuce.core.ExperimentalLettuceCoroutinesApi
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisURI
import io.lettuce.core.api.coroutines
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicInteger

@OptIn(ExperimentalLettuceCoroutinesApi::class)
fun main() {
    val i = AtomicInteger()
    val client = RedisClient.create()
    val connection = RedisURI.builder().apply {
        withHost("localhost")
        withPort(6379)
    }.build().let {
        client.connect(it)
    }.coroutines()

    // The usage of 128 here is meaningful, as mentioned above this is only
    // observed with multiples of 64 in the size of the requested set of keys.
    val array = Array(128) { "111" }

    runBlocking {
        repeat(1000) {
            connection.mget(
                *array
            ).mapNotNull { result ->
                if (result.hasValue()) {
                    result.value as String
                } else null
            }.toList()

            println(i.getAndIncrement())
        }
    }

    Thread.sleep(1000)

    println("finish")
}

Observed Behavior

The run will generally stall before 50 iterations are complete. A temporary fix appears to be applying an UNLIMITED buffer:

connection.mget(*array).buffer(Channel.UNLIMITED).mapNotNull { result ->

The default behavior is Channel.BUFFERED, which triggers the CHANNEL_DEFAULT_CAPACITY buffering in Kotlin's PublisherAsFlow Reactive adapter.

Expected Behavior

mget() should not permanently SUSPEND, and either return results or an error when the command completes.

Environment

  • Lettuce: Originally observed on io.lettuce:lettuce-core:6.0.2.RELEASE, we've also reproduced on 6.1.4.RELEASE.
  • Redis: AWS Elasticache Redis 6.0.5, the above demo script was run against a local Redis 4 via Docker.
@mp911de
Copy link
Collaborator

mp911de commented Aug 31, 2021

Lettuce exposes Coroutines over a Reactive API. It suspends reading if there's not sufficient demand. This case looks pretty much it. Does the same happen if you move the repeat(1000) closure to outside of runBlocking leaving the actual client code within runBlocking?

@mattfff
Copy link
Author

mattfff commented Aug 31, 2021

Is the following adjustment what you are suggesting:

Revised Example
@OptIn(ExperimentalLettuceCoroutinesApi::class)
fun main() {
    val i = AtomicInteger()
    val client = RedisClient.create()
    val connection = RedisURI.builder().apply {
        withHost("localhost")
        withPort(6379)
    }.build().let {
        client.connect(it)
    }.coroutines()

    val array = Array(128) { "111" }

    repeat(1000) {
        runBlocking {
            connection.mget(
                *array
            ).toList()

            println("Iteration: ${i.getAndIncrement()}")
        }
    }

    Thread.sleep(1000)

    println("finish")
}

The above sample still stalls randomly after some number of iterations, usually less than 50. This is surprising (to me, anyway), as I would expect a request for a specific set of elements in an mget to always return a matching sized result set, even if that's generated over multiple batches of results consumed from the Producer? The workaround of setting an unlimited buffer seems to sidestep the batching behavior, which is what appears to be at issue...

@mp911de
Copy link
Collaborator

mp911de commented Sep 1, 2021

Thanks. That's what I meant. I'm not sure how/whether Kotlin Coroutines optimize if results aren't consumed. However, fully consuming a result and running the same code in cycles should not cause a hanging application. I need to check what's happening.

@mp911de mp911de self-assigned this Sep 1, 2021
@mp911de mp911de added type: bug A general bug and removed status: waiting-for-triage labels Sep 14, 2021
@mp911de mp911de changed the title Coroutine commands that result in a Flow can hang if the flow has a finite buffer. Coroutine commands that result in a Flow can hang Sep 14, 2021
@mp911de
Copy link
Collaborator

mp911de commented Sep 14, 2021

I had a look and it seems a bug in RedisPublisher where a particular code path wasn't safe when changing from no demand to demand.

@mp911de mp911de added this to the 6.0.8 milestone Sep 14, 2021
mp911de added a commit that referenced this issue Sep 14, 2021
RedisPublisher's NO_DEMAND.request(…) while switching to DEMAND now calls onDataAvailable(…) on the currently active state to ensure data signalling if there's (still) demand.

Previously, we called onDataAvailable(…) on the current NO_DEMAND state anticipating that if there's no data yet, then someone will read data from the channel and once it's there, we will be notified to emit it. Apparently, we can have data already in our buffer and so, upon requesting data, no one will notify us. Data was lingering in the publisher buffer and wasn't emitted. That caused downstream subscribers to hang indefinitely.

Data emission (and command completion) can happen when responses for a command. The request size doesn't correlate with the response that we receive from Redis. Redis can respond with less, exactly or more items. In case we receive from Redis more items than were requested, we buffer these to comply with RS back pressure semantics. After emitting the last requested item to the subscription, we switch from DEMAND (READING) to NO_DEMAND. And the issue exactly starts there.

We now call onDataAvailable(…) on the current state to ensure that when the state is DEMAND, that we will emit all data we have in our buffers.
mp911de added a commit that referenced this issue Sep 14, 2021
RedisPublisher's NO_DEMAND.request(…) while switching to DEMAND now calls onDataAvailable(…) on the currently active state to ensure data signalling if there's (still) demand.

Previously, we called onDataAvailable(…) on the current NO_DEMAND state anticipating that if there's no data yet, then someone will read data from the channel and once it's there, we will be notified to emit it. Apparently, we can have data already in our buffer and so, upon requesting data, no one will notify us. Data was lingering in the publisher buffer and wasn't emitted. That caused downstream subscribers to hang indefinitely.

Data emission (and command completion) can happen when responses for a command. The request size doesn't correlate with the response that we receive from Redis. Redis can respond with less, exactly or more items. In case we receive from Redis more items than were requested, we buffer these to comply with RS back pressure semantics. After emitting the last requested item to the subscription, we switch from DEMAND (READING) to NO_DEMAND. And the issue exactly starts there.

We now call onDataAvailable(…) on the current state to ensure that when the state is DEMAND, that we will emit all data we have in our buffers.
@mp911de mp911de closed this as completed Sep 14, 2021
mp911de added a commit that referenced this issue Sep 14, 2021
RedisPublisher's NO_DEMAND.request(…) while switching to DEMAND now calls onDataAvailable(…) on the currently active state to ensure data signalling if there's (still) demand.

Previously, we called onDataAvailable(…) on the current NO_DEMAND state anticipating that if there's no data yet, then someone will read data from the channel and once it's there, we will be notified to emit it. Apparently, we can have data already in our buffer and so, upon requesting data, no one will notify us. Data was lingering in the publisher buffer and wasn't emitted. That caused downstream subscribers to hang indefinitely.

Data emission (and command completion) can happen when responses for a command. The request size doesn't correlate with the response that we receive from Redis. Redis can respond with less, exactly or more items. In case we receive from Redis more items than were requested, we buffer these to comply with RS back pressure semantics. After emitting the last requested item to the subscription, we switch from DEMAND (READING) to NO_DEMAND. And the issue exactly starts there.

We now call onDataAvailable(…) on the current state to ensure that when the state is DEMAND, that we will emit all data we have in our buffers.
@mattfff
Copy link
Author

mattfff commented Sep 14, 2021

Thanks so much for chasing this down! We'll grab the next release and give it a go...

@rami51
Copy link

rami51 commented Oct 25, 2021

Hi, I was trying to run an mget on a program, stressing it with over 6000 requests per minute and after an hour, my program just stopped working and the last log that I have is:

{'level':'DEBUG', 'message': '[channel=0x83706877, /127.0.0.1:55280 -> localhost/127.0.0.1:6379] writing command SubscriptionCommand [type=MGET, output=KeyValueListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command]', 'loggerName': 'io.lettuce.core.protocol.CommandEncoder:101', 'timestamp': '2021-10-25 11:57:58,521', 'thread': 'lettuce-nioEventLoop-4-1'}

As you can see, I was locally running a docker container with my redis instance and connected to it by lettuce.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

3 participants