Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] key-SHARED subscription is not performing redelivery and going in infinite while loop of reading messages from disk and not dispatching any messages #21656

Closed
2 tasks done
rdhabalia opened this issue Dec 2, 2023 · 4 comments · Fixed by #23352 · May be fixed by #21657
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@rdhabalia
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Version

master

Minimal reproduce step

SHARED or key-SHARED subscription must dispatch redelivered messages in any scenario. every shared subscription should dispatch already delivered unack messages. You can follow strict ordering for new messages which broker is reading first time by advancing readPosition of the cursor but broker can dispatch already delivered unack messages when its required without restricting any scenario.

However, key-shared subscription is incorrectly handling redelivered messages by keep reading redelivered messages , discarding them and not dispatching any single messages to the consumer by incorrectly changing the semantics of consumer delivery ordering. broker doesn't dispatch redelivery message if that message id is smaller than consumer's assigned offset-message-id when it joined. broker assigns cursor's current read position as consumer's min-message-id offset to manage ordering but delivered messageId can be smaller than that position and redelivery should not be restricted by ordering as we already discussed semantics of shared subscription earlier. But as broker handles it incorrectly in key-shared because of that key-shared subscription topics which have connected consumers with positive permits are not able to receive any messages and dispatching is stuck also broker is keep performing same cold reads across those stuck topics and wasting storage and CPU resources by discarding read messages. which impacts application, broker and bookies and such buggy handling is semantically and practically invalid.

Right now, such multiple topics with key-shared subscription and redelivery messages can significantly impact broker and bookies by keep reading large number of messages without dispatching them and client application are not able to consume any messages which also impacts application significantly.

Test case


    @Test
    public void test()
            throws Exception {
        String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
        boolean enableBatch = false;
        Set<Integer> values = new HashSet<>();

        @Cleanup
        Consumer<Integer> consumer1 = createConsumer(topic);

        @Cleanup
        Producer<Integer> producer = createProducer(topic, enableBatch);
        int count = 0;
        for (int i = 0; i < 10; i++) {
            // Send the same key twice so that we'll have a batch message
            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
            producer.newMessage().key(key).value(count++).send();
        }

        @Cleanup
        Consumer<Integer> consumer2 = createConsumer(topic);

        for (int i = 0; i < 10; i++) {
            // Send the same key twice so that we'll have a batch message
            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
            producer.newMessage().key(key).value(count++).send();
        }

        @Cleanup
        Consumer<Integer> consumer3 = createConsumer(topic);

        consumer2.redeliverUnacknowledgedMessages();

        for (int i = 0; i < 10; i++) {
            // Send the same key twice so that we'll have a batch message
            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
            producer.newMessage().key(key).value(count++).send();
        }
        consumer1.close();

        for(int i = 0; i < count; i++) {
            Message<Integer> msg = consumer2.receive(100, TimeUnit.MILLISECONDS);
            if (msg!=null) {
                values.add(msg.getValue());
            } else {
                break;
            }
        }
        for(int i = 0; i < count; i++) {
            Message<Integer> msg = consumer3.receive(1, TimeUnit.MILLISECONDS);
            if (msg!=null) {
                values.add(msg.getValue());
            } else {
                break;
            }
        }
        assertEquals(values.size(), count);
    }

What did you expect to see?

  • Consumers with available permits, must be able to consume delivered messages
  • It should not be end of world scenario for brokers to serve such subscription consumers

What did you see instead?

Broker should not have broken logic to stuck delivery with such incorrect assumptions

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari
Copy link
Member

lhotari commented Sep 7, 2024

It seems that this issue might be addressed together with PIP-282 changes #21953 and other PRs #23226 (merged) and #23231 (in-progress).

@lhotari
Copy link
Member

lhotari commented Sep 16, 2024

I've created PIP-379: Key_Shared Draining Hashes for Improved Message Ordering as a proposal to address such issues as this one.

@lhotari
Copy link
Member

lhotari commented Sep 16, 2024

The test case in this issue is not reproducing the described issue since it passes if the consumers consumer2 and consumer3 are handled concurrently for example by using the ackAllMessages method available in ProducerConsumerBase. However, I understand what problem this tries to reproduce.

@lhotari
Copy link
Member

lhotari commented Sep 16, 2024

To address this issue, I'll update PIP-379 so that the updated contract will cover how negative acknowledgements and explicit redeliveries such as redeliverUnacknowledgedMessages are handled in Key_Shared in the updated design.
Currently it's not properly documented what happens when an application uses those methods when a Key_Shared subscription is used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment