-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Add limits for Key_Shared Subscription look ahead in dispatching #23231
[improve][broker] Add limits for Key_Shared Subscription look ahead in dispatching #23231
Conversation
This PR is ready for initial review feedback. I'm looking into ways of adding tests for the key_shared look ahead limits. |
.../apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
- take more messages until maximum amount of messages has been picked
- discarding entries at this point is a waste - it's unnecessary to be accurate with permits. The message sending will handle subtracting permits.
62a9fbf
to
a263ad4
Compare
.../apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
Show resolved
Hide resolved
Between |
I'm sorry @poorbarcode. Please let me explain the misunderstanding.
For me, the "request for changes" comment was "Please do not merge this PR fast, I will finish my review tomorrow" (#23231 (review)). That's why I dismissed it with "Outdated comment". I had already addressed also the other review comments you had made.
I'm sorry for that. I did request a review, but you didn't respond. You can check the timeline. The PR was in draft mode for some time, but the reason for that was that the PR was already approved and I didn't want anyone to merge it. I didn't realize that you wouldn't be reviewing it because of the draft status. The changes in this PR aren't final, if you have review feedback, it's possible also address that after the PR has been merged. Please add comments to this PR of any concerns you may have. |
Sure, thanks for replying this comment. ❤️ |
…n dispatching (apache#23231) Co-authored-by: Matteo Merli <mmerli@apache.org> Co-authored-by: Yuri Mizushima <equanz324@gmail.com>
Fixes #23200
Motivation
This PR introduces enhancements to the Key_Shared subscription dispatching logic in the Pulsar broker. The changes aim to optimize message dispatching, improve the handling of slow consumers, and address several identified issues related to message replay and message "look ahead" when consumers are blocked in delivery due to key ordering restrictions.
This PR adds limits to the Key_Shared Subscription look-ahead feature, which was introduced in PR #7105. Additionally, there are several improvements to the previous logic.
Since there wasn't any named concept for the feature added in PR #7105, it has been named "key shared look ahead" in this PR. This references the feature added in PR #7105, where the dispatcher will skip replaying messages and read more messages from the backlog. The changes in PR #7105 didn't add a limit for reading more messages. The comments in the PR mention that it will be limited by the unacked message limits. However, this isn't the case since the read messages aren't considered unacked messages until they have been dispatched to consumers.
Reading ahead would be triggered in multiple cases:
Since consumers were blocked unnecessarily, reading ahead would often just continue reading the complete backlog into the replay queue. The impact is described in #23200
In addition to fix the infinite key shared look ahead issue, this PR seems to fix a problem in the existing implementation where this code blocks progress of a consumer:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
Lines 453 to 458 in d7e8ea1
What this means is that when the dispatcher has read a batch, it will discard all messages in the read messages if any hash is present in the replay queue (redelivery messages). This is an unoptimal solution for ensuring that ordering is preserved. This PR addresses the issue while attempting to keep backwards compatibility from a single consumer's perspective.
The "look ahead" limit introduced in this PR will add backpressure to how the dispatcher works. Eventually consumers will be slower than the dispatcher sending messages to consumers and that's why it's necessary to have the limit, also in cases where the hashes wouldn't be blocked or "allowOutOfOrderDelivery" mode is used.
There's also another related problem that this change addresses. The head of the replay queue might get blocked in the release Pulsar versions.
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Lines 348 to 350 in ed14f21
In this PR, the solution to this is to take as many entries that match the criteria from the replay queue. This required changing the getMessagesToReplayNow to accept a Predicate for filtering:
Modifications
Configuration Properties:
keySharedLookAheadMsgInReplayThresholdPerConsumer
keySharedLookAheadMsgInReplayThresholdPerSubscription
keySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist
Broker Configuration:
broker.conf
to include descriptions and default values for the new configuration properties.Key_Shared Subscription Logic:
filterOutEntriesWillBeDiscarded
while preserving the previous logic of filtering.localGroupedEntries
and the unusedlocalGroupedPositions
.getMessagesToReplayNow(1)
withgetFirstPositionInReplay()
.canReplayMessages
to make the look-ahead logic more explicit. The previous solution of returning an empty set from thegetMessagesToReplayNow
method made it hard to understand how the look-ahead behavior is implemented.isDispatcherStuckOnReplays
field toskipNextReplayToTriggerLookAhead
to make the meaning of the field explicit and self-descriptive.!havePendingReplayRead
status wasn't checked before starting a new read.!havePendingPendingRead
, but for Key_Shared it isn't.keyNumbers
variable toremainingConsumersToFinishSending
so that the meaning of the variable is explicit and self-descriptive.groupedEntries
variable toentriesByConsumerForDispatching
so that the meaning of the variable is explicit and self-descriptive.entriesWithSameKey
variable toentriesForConsumer
so that the variable is self-descriptive.maxUnackedMessages
andunackedMessages
. The unit is in individual messages and permits are in entries. These 2 cannot be mixed.Documentation
doc
doc-required
doc-not-needed
doc-complete