Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel closed issue during consumption since #312 #317

Closed
FlorentinDUBOIS opened this issue Jul 25, 2024 · 13 comments
Closed

Channel closed issue during consumption since #312 #317

FlorentinDUBOIS opened this issue Jul 25, 2024 · 13 comments
Assignees

Comments

@FlorentinDUBOIS
Copy link
Collaborator

Hello @cirias and @BewareMyPower 👋,

Since the introduction of #312, I keep having the following error in production at Clever Cloud:

2024-07-25T15:27:57.778501Z ERROR tokio-runtime-worker ThreadId(04) consume: dispatcher::svc::dispatch::shared: 112: Could not acknowledge enhanced access logs record produce to reconcile topic error="failed to unacknowledged message on topic 'persistent://orga_xxxxx/pulsar_xxx/enhanced-access-logs-partition-9' with sequence_id '126066' from producer 'c93a304b-a512-4629-ab23-43b4d73e1ce8-producer-2' using consumer '8d7b91e1-ea6a-4fc7-97d1-36b07121d22e-consumer-1', cannot send message to the consumer engine: the channel is closed" topic="persistent://orga_xxxxxx/pulsar_xxxxxx/enhanced-access-logs" owner_id="user_xxxx" service_id="app_xxxx" consumer="8d7b91e1-ea6a-4fc7-97d1-36b07121d22e-consumer-1"

This error is blocking the consumption, the workaround was to fork and revert the following commit : 918c25a

As I did not have the context of this feature, could you enlighten me?

@BewareMyPower
Copy link

Could this issue be resolved by setting a greater value of the with_outbount_channel_size config?

@FlorentinDUBOIS
Copy link
Collaborator Author

FlorentinDUBOIS commented Jul 26, 2024

I saw that there is a default to 100, which value to I need to put? I wonder about 2 * batch size, but it is a magic computation to define a value.

I will give it a try.

@FlorentinDUBOIS
Copy link
Collaborator Author

FlorentinDUBOIS commented Jul 26, 2024

So, I tried to set the value and I have a few questions,

  • why do you return a result as the operation is infaillible?
  • why do you return an std::io:Error as there is a pulsar::Error?

To me this break the developer experience and add not necessary complexity.
This pattern force me to write this:

#[tracing::instrument]
pub async fn create_client(
    opts: &Opts,
    batch_size: Option<usize>,
) -> Result<Pulsar<TokioExecutor>, pulsar::Error> {
    Pulsar::builder(&opts.endpoint, TokioExecutor)
        .with_allow_insecure_connection(true)
        .with_auth(Authentication {
            name: "token".to_string(),
            data: opts.token.to_owned().into_bytes(),
        })
        .with_outbound_channel_size(
            batch_size
                .map(|batch_size| batch_size * 2)
                .unwrap_or(10_000),
        )
        .map_err(|err| pulsar::Error::Custom(err.to_string()))?
        .with_connection_retry_options(ConnectionRetryOptions {
            connection_timeout: Duration::from_millis(opts.retry.timeout),
            keep_alive: Duration::from_millis(opts.retry.keep_alive),
            min_backoff: Duration::from_millis(opts.retry.min_backoff),
            max_backoff: Duration::from_millis(opts.retry.min_backoff),
            max_retries: opts.retry.max_retries,
        })
        .build()
        .instrument(info_span!("Pulsar::builder.build"))
        .await
}

instead of

#[tracing::instrument]
pub async fn create_client(
    opts: &Opts,
    batch_size: Option<usize>,
) -> Result<Pulsar<TokioExecutor>, pulsar::Error> {
    Pulsar::builder(&opts.endpoint, TokioExecutor)
        .with_allow_insecure_connection(true)
        .with_auth(Authentication {
            name: "token".to_string(),
            data: opts.token.to_owned().into_bytes(),
        })
        .with_outbound_channel_size(
            batch_size
                .map(|batch_size| batch_size * 2)
                .unwrap_or(10_000),
        )
        .with_connection_retry_options(ConnectionRetryOptions {
            connection_timeout: Duration::from_millis(opts.retry.timeout),
            keep_alive: Duration::from_millis(opts.retry.keep_alive),
            min_backoff: Duration::from_millis(opts.retry.min_backoff),
            max_backoff: Duration::from_millis(opts.retry.min_backoff),
            max_retries: opts.retry.max_retries,
        })
        .build()
        .instrument(info_span!("Pulsar::builder.build"))
        .await
}

@FlorentinDUBOIS
Copy link
Collaborator Author

FlorentinDUBOIS commented Jul 26, 2024

So far, the computation of outbound size from twice the batch size seems to be quite ok.
I did not get the above error anymore.
However I am not really sure of what I am doing, because I do not understand the need to set a bounded channel.
I have the feeling that we will hit that spot on another journey when the number of messages per seconds will increase.

image

@FlorentinDUBOIS
Copy link
Collaborator Author

Besides, I wonder why I am impacted by the change of the outbound (bounded) channel as we used only the non-blocking method to send messages 🤔

@FlorentinDUBOIS
Copy link
Collaborator Author

Hello @BewareMyPower,

Please correct me, if I am wrong. This is my understanding of the introduction bounded channel with #312 and why it works well with twice the batch size.

First, the introduction of the bounded channel affects only producers. To me, this behavior is here to implements back pressure on production in case of the pulsar broker is under heavy load and respond slowly. Is it correct? So, if it is the case, I have no idea for now of how implementing the following pattern.

I am suggesting that we should move away from sending a SlowDown error, but instead use the Rust asynchronous environment to do so. The idea is when the broker is under heavy load, the future that want to produce a message have to send a Poll::NotReady (that say I am not ready yet! Because, the broker is not able to process the message) that notify the scheduler of Tokio and/or Async-Std to retry later. This is how the back pressure is implemented on Rust asynchronous environment according to https://tokio.rs/blog/2021-05-14-inventing-the-service-trait#backpressure. What do you think about this?

For the reason of why twice the batch size works well is due to this line 918c25a#diff-ec6805b5ebfa2ad15066ddd189a37d48cc5abd2516ec350d3d4a2ea1f2c11d19R165

@BewareMyPower
Copy link

I am suggesting that we should move away from sending a SlowDown error

Honestly, I didn't look into details for this error code that pulsar-rs deals with the back pressure. But your point makes sense to me. This approach looks very hacky.

@cirias
Copy link
Contributor

cirias commented Jul 30, 2024

I have a theory of what's happening here. But I couldn't find the exact cause.

The message to ACK/NACK a consumed message is also pushed into the bounded channel. When messages are arriving in a high rate, the client emits ACK messages in a high rate as well. Then it triggers the slowdown error, and the code doesn't handle the error properly which then crashes the engine.

@FlorentinDUBOIS Could you enable the debug level logging for pulsar-rs. And see if this line is triggered? https://github.com/streamnative/pulsar-rs/blob/master/src/consumer/topic.rs#L126

@cirias
Copy link
Contributor

cirias commented Jul 30, 2024

I think it's here. But I'm not sure. @FlorentinDUBOIS A minimal example to reproduce the error would be very helpful.

@cirias
Copy link
Contributor

cirias commented Jul 30, 2024

Just pushed a potential fix above. I couldn't test it since I can't reproduce the issue. @FlorentinDUBOIS Can you help you test the fix?

@FlorentinDUBOIS
Copy link
Collaborator Author

Hello @cirias 👋,

Thank you for the fix, I will try it this morning with debug level enabled.
To reproduce the issue try to produce 10k msg/s.

@FlorentinDUBOIS
Copy link
Collaborator Author

@cirias I have tried your fix and it works pretty well, I was able to read and produce up to 18k msg/s. I couldn't go above as logging is slow 😅.

image

So far, I will approve your fix, merge it and close this issue. Thanks a lot for the debugging.

@FlorentinDUBOIS
Copy link
Collaborator Author

@cirias without the debug logging level, I was able to read and produce up to 27k msg/s which is pretty good. As I merged your fix, I close this issue as well.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants