Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve documentation for subscribing with exactly-once delivery #236

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

julianbraha
Copy link

I just spent a week troubleshooting, because I was using subscribe() to receive messages from my Pub/Sub subscription, and strangely, most acked messages were getting redelivered.

At first I thought that my Pub/Sub monitoring was broken. I was calling message.ack(), but the metrics in Cloud Console showed that the number of unacked messages wasn't decreasing.

The problem was that calling subscribe() on a subscription with exactly-once delivery only allows the first received message to be acked. The rest of the messages delivered, but calling message.ack() returned an error status, and then the subscription would redeliver them.

Once I started to using pull() instead, everything started working as expected.

This PR adds a note in the documentation that pull() is recommended over subscribe() for subscriptions with exactly-once delivery. And hopefully prevents someone else from another long week of debugging.

@yoshidan
Copy link
Owner

message.ack() returned an error status

What error occurred?

I tried to subscribe() using for exactly once delivery message, but I did not get any errors in message.ack() and could not confirm the redelivery in message within the acknowledgment deadline.

    #[tokio::test]
    async fn test_subscribe_exactly_once_delivery() {
        let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
            .await
            .unwrap();

        // Check if the subscription is exactly_once_delivery
        let subscription = client.subscription("eod-test");
        let config = subscription.config(None).await.unwrap().1;
        assert!(config.enable_exactly_once_delivery);

        // publish message
        let ctx = CancellationToken::new();
        let ctx_pub = ctx.clone();
        let publisher = client.topic("eod-test").new_publisher(None);
        let pub_task = tokio::spawn(async move {
            tracing::info!("start publisher");
            loop {
                if ctx_pub.is_cancelled() {
                    tracing::info!("finish publisher");
                    return;
                }
                publisher.publish_blocking(PubsubMessage {
                    data: "msg".into(),
                    ..Default::default()
                }).get().await.unwrap();
            }
        });

        // subscribe message
        let ctx_sub = ctx.clone();
        let sub_task = tokio::spawn(async move {
            tracing::info!("start subscriber");
            let mut stream = subscription.subscribe(None).await.unwrap();
            let mut msgs = HashMap::new();
            while let Some(message) = stream.next().await {
                if ctx_sub.is_cancelled() {
                    break;
                }
                let msg_id = &message.message.message_id;
                *msgs.entry(msg_id.clone()).or_insert(0) += 1;
                message.ack().await.unwrap();
            }
            tracing::info!("finish subscriber");
            return msgs;
        });

        tokio::time::sleep(Duration::from_secs(30)).await;

        // check redelivered messages
        ctx.cancel();
        let _ = pub_task.await.unwrap();
        let received_msgs = sub_task.await.unwrap();
        assert!(received_msgs.len() > 0);

        tracing::info!("Number of received messages = {}", received_msgs.len());
        for (msg_id , count) in received_msgs {
            assert_eq!(count, 1, "msg_id = {msg_id}, count = {count}");
        }
    
2024-02-26T09:08:32.073704Z  INFO google_cloud_pubsub::client::tests_in_gcp: start publisher
2024-02-26T09:08:32.073859Z  INFO google_cloud_pubsub::client::tests_in_gcp: start subscriber
2024-02-26T09:09:02.239786Z  INFO google_cloud_pubsub::client::tests_in_gcp: finish publisher
2024-02-26T09:09:02.960599Z  INFO google_cloud_pubsub::client::tests_in_gcp: finish subscriber
2024-02-26T09:09:02.961968Z  INFO google_cloud_pubsub::client::tests_in_gcp: Number of received messages = 83

Process finished with exit code 0

@rmagatti
Copy link

rmagatti commented Mar 1, 2024

I'm having a similar issue here. I'm getting errors on message.ack()

The error:

 Error: Status { code: InvalidArgument, message: "Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed."

Note that I'm acking only 1 or 2 seconds after receiving the message so the ack expiry shouldn't be coming into play, meaning there's probably something about how acking is happening that isn't quite working every time.
Also note that I'm able to ack the first message coming through but not subsequent messages for the same ordering key.

PS: I'm subscribing through the "alternative way" mentioned in the readme.

@yoshidan
Copy link
Owner

yoshidan commented Mar 4, 2024

Thanks. We have identified the same error with the default settings in loadtest.

This can be suppressed by adjusting stream_ack_deadline_seconds and max_outstanding_messages.

Because stream_ack_deadline_seconds is set to 60sec and max_outstanding_messages to 1000 by default,
message.ack() must be executed within 60 seconds for 1000 messages received at a time.

If stream_ack_deadline_seconds is set to 600sec and max_outstanding_messages=5, you will get a grace period within 600 seconds for each of the 5 messages received at a time.

let config = SubscribeConfig::default().with_subscriber_config(SubscriberConfig {
     stream_ack_deadline_seconds: 600,
     max_outstanding_messages: 5,
     ..Default::default()
});
let mut stream = subscription.subscribe(Some(config)).await.unwrap();

While google-cloud-go adjusts the ack deadline statistically according to the load, this library for Rust does not adjust it that finely, so it is necessary to set the stream_ack_deadline_seconds statically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants