Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Key-shared subscription must follow consumer redelivery as per shared sub semantic #21657

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,25 +355,8 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
return maxMessages;
}

// If the read type is Replay, we should avoid send messages that hold by other consumer to the new consumers,
// For example, we have 10 messages [0,1,2,3,4,5,6,7,8,9]
// If the consumer0 get message 0 and 1, and does not acked message 0, then consumer1 joined,
// when consumer1 get message 2,3, the broker will not dispatch messages to consumer1
// because of the mark delete position did not move forward.
// So message 2,3 will stored in the redeliver tracker.
// Now, consumer2 joined, it will read new messages from the cursor,
// so the recentJoinedPosition is 4 for consumer2
// Because of there are messages need to redeliver, so the broker will read the redelivery message first [2,3]
// message [2,3] is lower than the recentJoinedPosition 4,
// so the message [2,3] will dispatched to the consumer2
// But the message [2,3] should not dispatch to consumer2.

if (readType == ReadType.Replay) {
Copy link
Contributor

@poorbarcode poorbarcode Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, the key-shared subscription is incorrectly handling redelivered messages by reading redelivered messages, discarding them, and not dispatching any single messages to the consumer by incorrectly changing the semantics of the consumer delivery order. Broker doesn't dispatch redelivery message if that message-id is smaller than the consumer's assigned offset-message-id when joined.

These codes guarantee the ordering of messages during the scenario below:

No. Consumer 1 Consumer 2 Consumer 3 Consumer 4
stat handling k1,k2, recent-join: null handling k3,k4, recent-join: null
1 received M1(k1), M2(k2) received 1000 messages (M3(k3)...M1002(k3))
2 added
description assigned k2 which from Consumer 1
stat handling k1 handling k3,k4 handling k2, recent-join: M1002
3 closed
description assigned k3 which from Consumer 2 assigned k4 which from Consumer 2
stat handling k1,k3, recent-join: null handling k2, k4, recent-join: M1002
4 received M3(k3)...M1000(k3), the incoming queue is full now.
5 added
description assigned k3 which from Consumer 1
state handling k1, recent-join: null handling k2, k4, recent-join: M1002 handling k3, recent-join: M1002
6 received M1001(k3)...M1002(k3)

I think we should solve the issue above first, then try to improve here.

Related to #20776, please take a look

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode can you share the URL where we have defined the contract of key-shared sub.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode can you share the URL where we have defined the contract of the key-shared sub?

I do not know what URL you wanted, is the doc of Key_Share Subscription Doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode I mean do we have any document where we have shared what kind of ordering guarantee we provide to users? because as I said in the issue, once one consumer is closed, broker can redeliver unack messages of that consumers without considering the ordering instead blocking forever.
So, I just want to see if we have any doc where we have defined what user can expect in terms ordering for key-shared sub. I had checked earlier URL which you shared but that doesn't talk about the ordering or redelivery ordering.
In this PR with latest commit, It maintains the ordering guarantee but it also handles redelivery of unack messages of closed consumer without blocking dispatcher forever.

So, if we have any contract defined then we can check if this PR violates the user contract for the key-shared subscription because right now, key-shared sub is not usable and it is wasting lot of broker/bookie resources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@rdhabalia rdhabalia Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @Technoboy- for sharing the link and I was searching for this documentation where it talks about message ordering guarantee for key-shared.

The broker will start delivering messages to the new consumer only when all messages up to the read position have been acknowledged. This will guarantee that a certain key is processed by a single consumer at any given time. The trade-off is that if one of the existing consumers is stuck and no time-out was defined (acknowledging for you), the new consumer won't receive any messages until the stuck consumer resumes or gets disconnected.

As we have documented new consumer won't receive any messages until the stuck consumer resumes or gets disconnected. So, it must receive if other consumer gets disconnected.
However, right now, dispatching gets stuck when consumer gets disconnected and this PR has the test to reproduce it.
and this PR exactly fixes that issue to unblock dispatching if consumer disconnects and redeliver that consumer's unack messages.

So, this PR should fix that fundamental issue to unblock stuck consumers when they should not be stuck.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this PR should fix that fundamental issue to unblock stuck consumers when they should not be stuck.

Great points @rdhabalia . We can continue to resolve this issue as part of PIP-379, #23309 is the PIP document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari you have updated a contract similar to this PR and tried the same behavior in #23309 . then why this PR was blocked for 5 months and then you closed it?
I really don't have words to mention what's going on. does it really look good to people who blocked the PR and came up with a similar approach and did not let other people's work move forward? You also know that the same things keep happening again and again in other PRs as well and you have also witnessed this kind of thing in Pulsar very recently.
@lhotari I don't want to target anyone here but want to ask a simple question: does it look good to do such kind of actions? does it make any difference to their lives by doing it? Because I really don't understand what's going on in this Project recently.

PositionImpl minReadPositionForRecentJoinedConsumer = recentlyJoinedConsumers.values().iterator().next();
if (minReadPositionForRecentJoinedConsumer != null
&& minReadPositionForRecentJoinedConsumer.compareTo(maxReadPosition) < 0) {
maxReadPosition = minReadPositionForRecentJoinedConsumer;
}
return maxMessages;
}
// Here, the consumer is one that has recently joined, so we can only send messages that were
// published before it has joined.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@

for (int i = 0; i < 20; i++) {
Message<Integer> msg = c2.receive();
assertEquals(msg.getValue().intValue(), i);

Check failure on line 605 in pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java

View workflow job for this annotation

GitHub Actions / Flaky tests suite

KeySharedSubscriptionTest.testOrderingWhenAddingConsumers

expected [0] but found [10]

c2.acknowledge(msg);
}
Expand Down Expand Up @@ -706,7 +706,7 @@
}

// C2 will not be able to receive any messages until C1 is done processing whatever he got prefetched
assertNull(c2.receive(100, TimeUnit.MILLISECONDS));

Check failure on line 709 in pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java

View workflow job for this annotation

GitHub Actions / Flaky tests suite

KeySharedSubscriptionTest.testRemoveFirstConsumer

expected [null] but found [org.apache.pulsar.client.impl.MessageImpl@100e2c1d]

c1.close();

Expand Down Expand Up @@ -881,7 +881,7 @@
received = consumer2.receive(1, TimeUnit.SECONDS);
} catch (PulsarClientException ignore) {
}
Assert.assertNull(received);

Check failure on line 884 in pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java

View workflow job for this annotation

GitHub Actions / Flaky tests suite

KeySharedSubscriptionTest.testContinueDispatchMessagesWhenMessageTTL

expected [null] but found [org.apache.pulsar.client.impl.MessageImpl@3750f88b]

@Cleanup
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
Expand Down Expand Up @@ -1630,4 +1630,63 @@
log.info("Got {} other messages...", sum);
Assert.assertEquals(sum, delayedMessages + messages);
}

@Test
public void test()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a more meaningful test ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops.. sorry, I just added this test to create an issue. let me fix tests and naming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

throws Exception {
String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
boolean enableBatch = false;
Set<Integer> values = new HashSet<>();

@Cleanup
Consumer<Integer> consumer1 = createConsumer(topic);

@Cleanup
Producer<Integer> producer = createProducer(topic, enableBatch);
int count = 0;
for (int i = 0; i < 10; i++) {
// Send the same key twice so that we'll have a batch message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that enableBatch is false, did you want to also add that case ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed those comments.

String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
producer.newMessage().key(key).value(count++).send();
}

@Cleanup
Consumer<Integer> consumer2 = createConsumer(topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer2 will be closed by lombok after last usage.

The point in time you close the consumer may alter the execution of the test
what about closing the consumers explicitly and not use Lombok ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer2 needs to be open for the test to consume all messages and then let it be cleaned by Lombok, we don't have to close it explicitly and it won't impact the test as well.


for (int i = 0; i < 10; i++) {
// Send the same key twice so that we'll have a batch message
String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
producer.newMessage().key(key).value(count++).send();
}

@Cleanup
Consumer<Integer> consumer3 = createConsumer(topic);

consumer2.redeliverUnacknowledgedMessages();

for (int i = 0; i < 10; i++) {
// Send the same key twice so that we'll have a batch message
String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
producer.newMessage().key(key).value(count++).send();
}
consumer1.close();

for(int i = 0; i < count; i++) {
Message<Integer> msg = consumer2.receive(100, TimeUnit.MILLISECONDS);
if (msg!=null) {
values.add(msg.getValue());
} else {
break;
}
}
for(int i = 0; i < count; i++) {
Message<Integer> msg = consumer3.receive(1, TimeUnit.MILLISECONDS);
if (msg!=null) {
values.add(msg.getValue());
} else {
break;
}
}
Comment on lines +1676 to +1691
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is currently invalid. Messages would need to be acknowledged and consumed concurrently. This test passes at least in branch-3.3 when making the changes.

assertEquals(values.size(), count);
}
}
Loading