Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messaging] Cumulative ack may lose message. #10431

Closed
congbobo184 opened this issue Apr 29, 2021 · 2 comments
Closed

[Messaging] Cumulative ack may lose message. #10431

congbobo184 opened this issue Apr 29, 2021 · 2 comments
Assignees
Labels
lifecycle/stale type/bug The PR fixed a bug or issue reported a bug

Comments

@congbobo184
Copy link
Contributor

congbobo184 commented Apr 29, 2021

example:
Transaction abort message1, message2 and then clear incomingMessages.
after incomingMessage cleared the message3 can add in incomingMessages. This time we cumulativeAck message3 with txn will lose message1 and message2.

    @Test
    public void transactionCumulativeAck() throws Exception {
        String topic = NAMESPACE1 + "/txn-timeout";

        @Cleanup
        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("test")
                .subscribe();

        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .sendTimeout(0, TimeUnit.SECONDS)
                .enableBatching(false)
                .producerName("txn-timeout")
                .create();

        producer.send("Hello Pulsar one!");
        producer.send("Hello Pulsar two!");

        Transaction transaction = pulsarClient
                .newTransaction()
                .withTransactionTimeout(3, TimeUnit.SECONDS)
                .build().get();

        Message<String> message = consumer.receive();

        System.out.println(message.getValue());

        consumer.acknowledgeCumulativeAsync(message.getMessageId(), transaction).get();

        transaction.abort();

        transaction = pulsarClient
                .newTransaction()
                .withTransactionTimeout(3, TimeUnit.SECONDS)
                .build().get();
        // it may receive Hello Pulsar two!
        Message<String> reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS);
        System.out.println(reReceiveMessage.getValue());

    }

It not a transaction bug, now normal cumulative ack will produce race condition.

consumer pull request and clear incomming queue is not synchronized

image

when clear completed, we don't send redeliverCommond to server, this time consumer receive MessageCommand the incomming queue will add new message, the messageId may more than the old messageId whtich you don't cumulative ack. then you cumulative ack this new messageId, the old messageID will lost.

may we should add an epoch for every messageCommand, when invoke redeliverUnacknowledgedMessages, we can add the epoch and refuse the messageCommand which is smaller than current consumer epoch.

@congbobo184 congbobo184 added the type/bug The PR fixed a bug or issue reported a bug label Apr 29, 2021
@congbobo184 congbobo184 changed the title [Transaction bug] Transaction cumulativeAck bug. [Messaging] cumulative ack Apr 29, 2021
@congbobo184 congbobo184 changed the title [Messaging] cumulative ack [Messaging] Cumulative ack may lose message. Apr 29, 2021
@congbobo184 congbobo184 self-assigned this Nov 3, 2021
@codelipenghui
Copy link
Contributor

The issue had no activity for 30 days, mark with Stale label.

@congbobo184
Copy link
Contributor Author

congbobo184 commented Jul 15, 2022

#10478 fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lifecycle/stale type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants