Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Key-shared subscription must follow consumer redelivery as per shared sub semantic #21657

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

rdhabalia
Copy link
Contributor

@rdhabalia rdhabalia commented Dec 2, 2023

Fixes #21656

Motivation

SHARED or key-SHARED subscription must dispatch redelivered messages in any scenario. every shared subscription should dispatch already delivered unack messages. You can follow strict ordering for new messages which broker is reading first time by advancing readPosition of the cursor but broker can dispatch already delivered unack messages when its required without restricting any scenario.

However, key-shared subscription is incorrectly handling redelivered messages by keep reading redelivered messages , discarding them and not dispatching any single messages to the consumer by incorrectly changing the semantics of consumer delivery ordering. broker doesn't dispatch redelivery message if that message id is smaller than consumer's assigned offset-message-id when it joined. broker assigns cursor's current read position as consumer's min-message-id offset to manage ordering but delivered messageId can be smaller than that position and redelivery should not be restricted by ordering as we already discussed semantics of shared subscription earlier. But as broker handles it incorrectly in key-shared because of that key-shared subscription topics which have connected consumers with positive permits are not able to receive any messages and dispatching is stuck also broker is keep performing same cold reads across those stuck topics and wasting storage and CPU resources by discarding read messages. which impacts application, broker and bookies and such buggy handling is semantically and practically invalid.

Right now, such multiple topics with key-shared subscription and redelivery messages can significantly impact broker and bookies by keep reading large number of messages without dispatching them and client application are not able to consume any messages which also impacts application significantly.

Modifications

Allow dispatching of redelivered messages, avoid reading and discarding duplicate messages, and fix broken stuck dispatcher on unack messages.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@rdhabalia rdhabalia added area/broker doc-not-needed Your PR changes do not impact docs ready-to-test labels Dec 2, 2023
@rdhabalia rdhabalia self-assigned this Dec 2, 2023
@github-actions github-actions bot added doc-label-missing and removed doc-not-needed Your PR changes do not impact docs labels Dec 2, 2023
Copy link

github-actions bot commented Dec 2, 2023

@rdhabalia Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Dec 2, 2023
Copy link

@joeCarf joeCarf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, seems that there are test failures ~

Producer<Integer> producer = createProducer(topic, enableBatch);
int count = 0;
for (int i = 0; i < 10; i++) {
// Send the same key twice so that we'll have a batch message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that enableBatch is false, did you want to also add that case ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed those comments.

}

@Cleanup
Consumer<Integer> consumer2 = createConsumer(topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer2 will be closed by lombok after last usage.

The point in time you close the consumer may alter the execution of the test
what about closing the consumers explicitly and not use Lombok ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer2 needs to be open for the test to consume all messages and then let it be cleaned by Lombok, we don't have to close it explicitly and it won't impact the test as well.

@@ -1630,4 +1630,63 @@ public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
log.info("Got {} other messages...", sum);
Assert.assertEquals(sum, delayedMessages + messages);
}

@Test
public void test()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a more meaningful test ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops.. sorry, I just added this test to create an issue. let me fix tests and naming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@codelipenghui codelipenghui added this to the 3.2.0 milestone Dec 5, 2023
// message [2,3] is lower than the recentJoinedPosition 4,
// so the message [2,3] will dispatched to the consumer2
// But the message [2,3] should not dispatch to consumer2.

if (readType == ReadType.Replay) {
Copy link
Contributor

@poorbarcode poorbarcode Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, the key-shared subscription is incorrectly handling redelivered messages by reading redelivered messages, discarding them, and not dispatching any single messages to the consumer by incorrectly changing the semantics of the consumer delivery order. Broker doesn't dispatch redelivery message if that message-id is smaller than the consumer's assigned offset-message-id when joined.

These codes guarantee the ordering of messages during the scenario below:

No. Consumer 1 Consumer 2 Consumer 3 Consumer 4
stat handling k1,k2, recent-join: null handling k3,k4, recent-join: null
1 received M1(k1), M2(k2) received 1000 messages (M3(k3)...M1002(k3))
2 added
description assigned k2 which from Consumer 1
stat handling k1 handling k3,k4 handling k2, recent-join: M1002
3 closed
description assigned k3 which from Consumer 2 assigned k4 which from Consumer 2
stat handling k1,k3, recent-join: null handling k2, k4, recent-join: M1002
4 received M3(k3)...M1000(k3), the incoming queue is full now.
5 added
description assigned k3 which from Consumer 1
state handling k1, recent-join: null handling k2, k4, recent-join: M1002 handling k3, recent-join: M1002
6 received M1001(k3)...M1002(k3)

I think we should solve the issue above first, then try to improve here.

Related to #20776, please take a look

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode can you share the URL where we have defined the contract of key-shared sub.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode can you share the URL where we have defined the contract of the key-shared sub?

I do not know what URL you wanted, is the doc of Key_Share Subscription Doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode I mean do we have any document where we have shared what kind of ordering guarantee we provide to users? because as I said in the issue, once one consumer is closed, broker can redeliver unack messages of that consumers without considering the ordering instead blocking forever.
So, I just want to see if we have any doc where we have defined what user can expect in terms ordering for key-shared sub. I had checked earlier URL which you shared but that doesn't talk about the ordering or redelivery ordering.
In this PR with latest commit, It maintains the ordering guarantee but it also handles redelivery of unack messages of closed consumer without blocking dispatcher forever.

So, if we have any contract defined then we can check if this PR violates the user contract for the key-shared subscription because right now, key-shared sub is not usable and it is wasting lot of broker/bookie resources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@rdhabalia rdhabalia Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @Technoboy- for sharing the link and I was searching for this documentation where it talks about message ordering guarantee for key-shared.

The broker will start delivering messages to the new consumer only when all messages up to the read position have been acknowledged. This will guarantee that a certain key is processed by a single consumer at any given time. The trade-off is that if one of the existing consumers is stuck and no time-out was defined (acknowledging for you), the new consumer won't receive any messages until the stuck consumer resumes or gets disconnected.

As we have documented new consumer won't receive any messages until the stuck consumer resumes or gets disconnected. So, it must receive if other consumer gets disconnected.
However, right now, dispatching gets stuck when consumer gets disconnected and this PR has the test to reproduce it.
and this PR exactly fixes that issue to unblock dispatching if consumer disconnects and redeliver that consumer's unack messages.

So, this PR should fix that fundamental issue to unblock stuck consumers when they should not be stuck.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this PR should fix that fundamental issue to unblock stuck consumers when they should not be stuck.

Great points @rdhabalia . We can continue to resolve this issue as part of PIP-379, #23309 is the PIP document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari you have updated a contract similar to this PR and tried the same behavior in #23309 . then why this PR was blocked for 5 months and then you closed it?
I really don't have words to mention what's going on. does it really look good to people who blocked the PR and came up with a similar approach and did not let other people's work move forward? You also know that the same things keep happening again and again in other PRs as well and you have also witnessed this kind of thing in Pulsar very recently.
@lhotari I don't want to target anyone here but want to ask a simple question: does it look good to do such kind of actions? does it make any difference to their lives by doing it? Because I really don't understand what's going on in this Project recently.

@codecov-commenter
Copy link

codecov-commenter commented Dec 6, 2023

Codecov Report

Attention: Patch coverage is 75.00000% with 3 lines in your changes missing coverage. Please review.

Project coverage is 73.35%. Comparing base (2bf1354) to head (f419701).
Report is 793 commits behind head on master.

Files with missing lines Patch % Lines
...ersistentStickyKeyDispatcherMultipleConsumers.java 72.72% 1 Missing and 2 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #21657      +/-   ##
============================================
+ Coverage     73.24%   73.35%   +0.11%     
- Complexity    32752    32759       +7     
============================================
  Files          1893     1893              
  Lines        140730   140760      +30     
  Branches      15500    15504       +4     
============================================
+ Hits         103071   103260     +189     
+ Misses        29563    29394     -169     
- Partials       8096     8106      +10     
Flag Coverage Δ
inttests 24.11% <0.00%> (?)
systests 24.70% <0.00%> (+0.03%) ⬆️
unittests 72.66% <75.00%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...a/org/apache/bookkeeper/mledger/ManagedCursor.java 42.85% <ø> (ø)
...che/bookkeeper/mledger/impl/ManagedCursorImpl.java 78.95% <ø> (-0.18%) ⬇️
...ava/org/apache/pulsar/broker/service/Consumer.java 86.30% <100.00%> (+0.02%) ⬆️
...ersistentStickyKeyDispatcherMultipleConsumers.java 81.73% <72.72%> (-2.93%) ⬇️

... and 103 files with indirect coverage changes

@codelipenghui
Copy link
Contributor

codelipenghui commented Dec 7, 2023

Hi @rdhabalia,

I can confirm this is an issue that we need to fix it.
I tried on my laptop to find a simpler solution for this issue.

The main idea of this solution is to remove the consumer from the recently joined consumer map if there are redelivered messages(from the redeliver method and disconnect) greater than the joined position of the consumer. PTAL.

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 8f05530f58..b1ffe596b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,6 +139,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
+        consumer.getPendingAcks().keys().stream().max(ConcurrentLongLongPairHashMap.LongPair::compareTo).ifPresent(longPair -> {
+            removeConsumerFromRecentlyJoinedConsumersByPosition(PositionImpl.get(longPair.first, longPair.second));
+        });
         // The consumer must be removed from the selector before calling the superclass removeConsumer method.
         // In the superclass removeConsumer method, the pending acks that the consumer has are added to
         // redeliveryMessages. If the consumer has not been removed from the selector at this point,
@@ -327,7 +331,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             isDispatcherStuckOnReplays = true;
             return true;
         }  else if (currentThreadKeyNumber == 0) {
-            return true;
+            return totalBytesSent != 0;
         }
         return false;
     }
@@ -404,26 +408,37 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         });
     }
 
-    private boolean removeConsumersFromRecentJoinedConsumers() {
-        Iterator<Map.Entry<Consumer, PositionImpl>> itr = recentlyJoinedConsumers.entrySet().iterator();
+    @Override
+    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
+        positions.stream().max(PositionImpl::compareTo).ifPresent(this::removeConsumerFromRecentlyJoinedConsumersByPosition);
+        super.redeliverUnacknowledgedMessages(consumer, positions);
+    }
+
+    private boolean removeConsumerFromRecentlyJoinedConsumersByPosition(PositionImpl position) {
         boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
-        PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition();
-        if (mdp != null) {
-            PositionImpl nextPositionOfTheMarkDeletePosition =
-                    ((ManagedLedgerImpl) cursor.getManagedLedger()).getNextValidPosition(mdp);
-            while (itr.hasNext()) {
-                Map.Entry<Consumer, PositionImpl> entry = itr.next();
-                if (entry.getValue().compareTo(nextPositionOfTheMarkDeletePosition) <= 0) {
-                    itr.remove();
-                    hasConsumerRemovedFromTheRecentJoinedConsumers = true;
-                } else {
-                    break;
-                }
+        PositionImpl positionToRemove =
+                ((ManagedLedgerImpl) cursor.getManagedLedger()).getNextValidPosition(position);
+        Iterator<Map.Entry<Consumer, PositionImpl>> itr = recentlyJoinedConsumers.entrySet().iterator();
+        while (itr.hasNext()) {
+            Map.Entry<Consumer, PositionImpl> entry = itr.next();
+            if (entry.getValue().compareTo(positionToRemove) <= 0) {
+                itr.remove();
+                hasConsumerRemovedFromTheRecentJoinedConsumers = true;
+            } else {
+                break;
             }
         }
         return hasConsumerRemovedFromTheRecentJoinedConsumers;
     }
 
+    private boolean removeConsumersFromRecentJoinedConsumers() {
+        PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition();
+        if (mdp != null) {
+            return removeConsumerFromRecentlyJoinedConsumersByPosition(mdp);
+        }
+        return false;
+    }
+
     @Override
     protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
         if (isDispatcherStuckOnReplays) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 18fb141be3..1d462ee884 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -48,6 +48,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
+import io.swagger.util.Json;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Topic;
@@ -1630,4 +1632,67 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         log.info("Got {} other messages...", sum);
         Assert.assertEquals(sum, delayedMessages + messages);
     }
+
+    @Test(invocationCount = 10)
+    public void test()
+            throws Exception {
+        String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
+        boolean enableBatch = false;
+        Set<Integer> values = new HashSet<>();
+
+        @Cleanup
+        Consumer<Integer> consumer1 = createConsumer(topic);
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic, enableBatch);
+        int count = 0;
+        for (int i = 0; i < 10; i++) {
+            // Send the same key twice so that we'll have a batch message
+            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
+            producer.newMessage().key(key).value(count++).send();
+        }
+
+        @Cleanup
+        Consumer<Integer> consumer2 = createConsumer(topic);
+
+        for (int i = 0; i < 10; i++) {
+            // Send the same key twice so that we'll have a batch message
+            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
+            producer.newMessage().key(key).value(count++).send();
+        }
+
+        @Cleanup
+        Consumer<Integer> consumer3 = createConsumer(topic);
+
+        consumer2.redeliverUnacknowledgedMessages();
+
+        for (int i = 0; i < 10; i++) {
+            // Send the same key twice so that we'll have a batch message
+            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
+            producer.newMessage().key(key).value(count++).send();
+        }
+        consumer1.close();
+
+        for(int i = 0; i < count; i++) {
+            Message<Integer> msg = consumer2.receive(10, TimeUnit.SECONDS);
+            if (msg!=null) {
+                values.add(msg.getValue());
+            } else {
+                break;
+            }
+        }
+        for(int i = 0; i < count; i++) {
+            Message<Integer> msg = consumer3.receive(10, TimeUnit.SECONDS);
+            if (msg!=null) {
+                values.add(msg.getValue());
+            } else {
+                break;
+            }
+        }
+        System.out.println(Json.pretty(admin.topics().getStats(topic)));
+        System.out.println(Json.pretty(admin.topics().getInternalStats(topic)));
+
+        assertEquals(values.size(), count);
+
+    }
 }

All the Key_Shared subscription tests get passed.

image

cc @poorbarcode @Technoboy-

@rdhabalia
Copy link
Contributor Author

@codelipenghui
simply adding removed consuner's unack messages into redelivery/replay list will not work because there are scenarios where key-shared dispatcher adds additionally filtered messages(due to max-message limit, or ordering) into replay list which are associated with already connected consumers and we have to differentiate those messages and actually unacked messages, and this PR addresses that differentiation.

@rdhabalia
Copy link
Contributor Author

rdhabalia commented Dec 7, 2023

@poorbarcode
can we remove the blocker if there is no concern about the PR and can we merge it as it's really needed for various production systems.

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simply adding removed consuner's unack messages into redelivery/replay list will not work because there are scenarios where key-shared dispatcher adds additionally filtered messages(due to max-message limit, or ordering) into replay list which are associated with already connected consumers and we have to differentiate those messages and actually unacked messages, and this PR addresses that differentiation.

@rdhabalia Could you please update the test to reflect your concern? And the solution I provided is not "adding removed consuner's unack messages into redelivery/replay list." And I think my solution is essentially the same idea as yours. If a consumer calls redeliver or a connection is closed triggers the redelivery message which is greater than the position of the consumer joined, which means differentiating the "additionally filtered messages" and "messages sent to consumers before".

And could you please revert all the test related changes except for your newly added tests? Or provide an explanation for why the test should be changed? I'm afraid the change will introduce new regressions even if it can fix some issues.

I will leave change request here to make sure everything is clear on this PR before merge it.

Comment on lines +379 to +388
private boolean isEntryPendingAck(long ledgerId, long entryId) {
int size = consumerList.size();
for (int i = 0; i < size; i++) {
Consumer consumer = consumerList.get(i);
if (consumer != null && consumer.isPendingAck(ledgerId, entryId)) {
return true;
}
}
return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's super expensive if you have high traffic and many consumers, No?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking pending-ack is not expensive as it checks from the map. also, it's not being executed in every single message but it gets checked if broker sees filtered messages and also only if message is not deleted. so, it won't be expensive and frequently executed.

regarding test cases, I had to change them because most of them were assuming consumers were stuck on unack messages on closed consumer and some of them are flaky as this test class is part of flaky test pipeline.

I'm afraid the change will introduce new regressions even if it can fix some issues. I will leave change request here to make sure everything is clear on this PR before merge it.

Sure, let's make sure, there are no regression issues and change doesn't violate the user contract. this seems a fundamental fix to avoid all hack and stuck issue. but you guys can verify and merge if you don't see any issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simply adding removed consuner's unack messages into redelivery/replay list will not work because there are scenarios where key-shared dispatcher adds additionally filtered messages(due to max-message limit, or ordering) into replay list

@rdhabalia How about this one? I do not fully understand what is the exact issue that my approach can't resolve. It changed less and doesn't need to add APIs to the managed ledger, and it is good for performance because we don't check any message-level status. And it will not require us to change any existing tests. IMO, it's safe for us at least, even if it cannot resolve all the potential issues, but we can fix them case by case, not just fix potential issues with a more complex solution and with potential risk (test is changed). WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you assent with my approach to only fix #21656. I can also create a PR and we can keep this PR open to find a solution for the potential issues that you pointed out (simply adding removed consuner's unack messages into redelivery/replay list will not work because there are scenarios where key-shared dispatcher adds additionally filtered messages(due to max-message limit, or ordering) into replay list).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let's move forward with the best approach and let's make sure to fix to stuck dispatching as it's impacting many production systems right now and not giving positive experience about Pulsar.

* @throws Exception
*/
@Test
public void testKeySharedMessageRedeliveryWithoutStuck()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the blocker if there is no concern about the PR and can we merge it as it's really needed for various production systems.

I created a channel to trace the context more easily.


Menu:

  • 1.The concerns of the test testKeySharedMessageRedeliveryWithoutStuck
  • 2.Clear the problem the PR is supposed to solve
  • 3.This PR will bring issues

1.The concerns of the test testKeySharedMessageRedeliveryWithoutStuck

The test works like this:

  • create consumer1
  • send 10 msgs
  • create consumer2
  • send 10 msgs
  • create consumer3
  • redeliver all messages of consumer2
  • send 10 msgs
  • close consumer1
  • receive all messages for consumer2
  • receive all messages for consumer3

Concens

  • The Step "redeliver all messages of consumer2" is not meaningful. It did nothing.
  • We should run "receive all messages for consumer2" and "receive all messages for consumer3" in different threads, so they could receive all messages(Acknowledge received messages in time).

What did the test(without my suggestion) prove

  • It proved that one stuck consumer can stuck another one.
  • It does not prove that ultimately there will be messages that will not be consumed(see Concern 2 above).

2.Clear the problem the PR is supposed to solve

  • You just want to improve performance and prevent idle reading, right? (The test can not prove the subscription would be stuck, see Concern 2 above ).

3.This PR will bring issues

No. Consumer 1 Consumer 2 Consumer 3 Consumer 4
stat handling k1,k2, recent-join: null handling k3,k4, recent-join: null
1 received M1(k1), M2(k2)
2 added
description assigned k2 which from Consumer 1
stat handling k1 handling k3,k4 handling k2, recent-join: M2
3 received 1000 messages (M3(k3)...M1002(k3))
4 closed
description assigned k3 which from Consumer 2 assigned k4 which from Consumer 2
stat handling k1,k3, recent-join: null handling k2, k4, recent-join: M2
5 received M3(k3)...M1000(k3), the incoming queue is full now.
6 added
description assigned k3 which from Consumer 1
state handling k1, recent-join: null handling k2, k4, recent-join: M2 handling k3, recent-join: M1002
7 Since the message M2 has not been acked, M1001(k3)...M1002(k3) will be filter out

In Step 7, the code you're removing alleviates some of the message-ordering problems, even if these codes don't solve all of the cases. #20776 is trying to solve all the cases of this problem. But if you remove this mechanism before #20776 is complete, then the out-of-order problem will be worse.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

Comment on lines +1676 to +1691
for(int i = 0; i < count; i++) {
Message<Integer> msg = consumer2.receive(100, TimeUnit.MILLISECONDS);
if (msg!=null) {
values.add(msg.getValue());
} else {
break;
}
}
for(int i = 0; i < count; i++) {
Message<Integer> msg = consumer3.receive(1, TimeUnit.MILLISECONDS);
if (msg!=null) {
values.add(msg.getValue());
} else {
break;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is currently invalid. Messages would need to be acknowledged and consumed concurrently. This test passes at least in branch-3.3 when making the changes.

@lhotari
Copy link
Member

lhotari commented Sep 16, 2024

The problem that this PR is addressing will be covered in PIP-379: Key_Shared Draining Hashes for Improved Message Ordering.
To address this issue, I'll update PIP-379 so that the updated contract will cover how negative acknowledgements and explicit redeliveries such as redeliverUnacknowledgedMessages are handled in Key_Shared in the updated design.
Currently it's not properly documented what happens when an application uses those methods when a Key_Shared subscription is used.
If an application uses "nacks", it should either fail with an exception or there should be a clearly defined behavior. Since failing with an exception on the client side isn't a real option, the remaining option is to clearly define the behavior and make it consistent.

@lhotari
Copy link
Member

lhotari commented Sep 16, 2024

I'm closing this PR since the problem will be covered by PIP-379 and it's implementation. Please see the previous comment.

@lhotari lhotari closed this Sep 16, 2024
@rdhabalia rdhabalia reopened this Oct 7, 2024
@lhotari lhotari modified the milestones: 4.0.0, 4.1.0 Oct 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
9 participants