Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Key_Shared or Shared subscription doesn't always deliver messages from the replay queue after a consumer disconnects and leaves a backlog unless new messages are produced #23845

Open
3 tasks done
lhotari opened this issue Jan 13, 2025 · 6 comments
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@lhotari
Copy link
Member

lhotari commented Jan 13, 2025

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

Pulsar 4.0.1

Minimal reproduce step

Exact steps to reproduce aren't yet confirmed.

This problem was faced in a test where there was a large number of consumers that were scaled in a way where consumers were added and removed. The problem was noticed at the end of the test case, where all messages didn't get delivered to consumers and remained in the backlog.

In the topic stats for the subscription, msgInReplay showed a positive value and in internal stats for the subscription subscriptionHavePendingRead was true. By looking at the code, it seems to be a case that isn't handled for PersistentDispatcherMultipleConsumers/PersistentStickyKeyDispatcherMultipleConsumers.

What did you expect to see?

The cursor shouldn't go into completely into "waiting" state when there are messages in the replay queue.

What did you see instead?

Messages in the replay queue don't get dispatched to consumers.

Anything else?

Possible workaround is to set dispatcherDispatchMessagesInSubscriptionThread=false in broker.conf to prevent the race condition causing this issue from happening.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari
Copy link
Member Author

lhotari commented Jan 13, 2025

There's already a solution to cancel a pending read when a hash gets unblocked:

private void stickyKeyHashUnblocked(int stickyKeyHash) {
if (log.isDebugEnabled()) {
if (stickyKeyHash > -1) {
log.debug("[{}] Sticky key hash {} is unblocked", getName(), stickyKeyHash);
} else {
log.debug("[{}] Some sticky key hashes are unblocked", getName());
}
}
reScheduleReadWithKeySharedUnblockingInterval();
}
private void reScheduleReadWithKeySharedUnblockingInterval() {
rescheduleReadHandler.rescheduleRead();
}

There seems to be a bug in this behavior so that it didn't catch the case that was encountered. One possible reason for this is that a consumer didn't have permits when the unblocking happened. There would need to be some logic to handle that case.

@walkinggo
Copy link

It looks like this piece of code triggers a re-read here. I tried the related test cases, but no errors occurred. Could you provide more details about what you meant by "permit"? @lhotari

@lhotari
Copy link
Member Author

lhotari commented Jan 14, 2025

It looks like this piece of code triggers a re-read here. I tried the related test cases, but no errors occurred. Could you provide more details about what you meant by "permit"? @lhotari

@walkinggo You can read more about permits here: https://pulsar.apache.org/docs/4.0.x/developing-binary-protocol/#flow-control . Just to be clear, I'm not looking for contributions to address this particular issue, I've assigned it to myself and currently working it.

@walkinggo
Copy link

ok,i got it.

@lhotari
Copy link
Member Author

lhotari commented Jan 18, 2025

I think I finally found a potential race condition by analysing the code.
It should be possible to first modify the code by introducing delays to verify that the race condition is possible.

When a consumer is removed, the pending messages get added to the replay queue a new read gets triggered:

MutableBoolean notifyAddedToReplay = new MutableBoolean(false);
consumer.getPendingAcks().forEachAndClose((ledgerId, entryId, batchSize, stickyKeyHash) -> {
boolean addedToReplay = addMessageToReplay(ledgerId, entryId, stickyKeyHash);
if (addedToReplay) {
notifyAddedToReplay.setTrue();
}
});
totalAvailablePermits -= consumer.getAvailablePermits();
if (log.isDebugEnabled()) {
log.debug("[{}] Decreased totalAvailablePermits by {} in PersistentDispatcherMultipleConsumers. "
+ "New dispatcher permit count is {}", name, consumer.getAvailablePermits(),
totalAvailablePermits);
}
if (notifyAddedToReplay.booleanValue()) {
notifyRedeliveryMessageAdded();
}

This calls readMoreEntriesAsync:

/**
* Notify the dispatcher that a message has been added to the redelivery list.
*/
private void notifyRedeliveryMessageAdded() {
readMoreEntriesAsync();
}

In the "classic" implementation, there's a direct readMoreEntries call here:

consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
addMessageToReplay(ledgerId, entryId, stickyKeyHash);
});
totalAvailablePermits -= consumer.getAvailablePermits();
if (log.isDebugEnabled()) {
log.debug("[{}] Decreased totalAvailablePermits by {} in PersistentDispatcherMultipleConsumers. "
+ "New dispatcher permit count is {}", name, consumer.getAvailablePermits(),
totalAvailablePermits);
}
readMoreEntries();

This also supports this theory, that this problem appears in 4.0, but not with 3.x Key_Shared implementation.

The reason why the readMoreEntriesAsync is a problem could be explained this way.
When readMoreEntries gets called, it could drop out of the method here:

if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to sending in-progress.",
topic.getName(), getSubscriptionName());
}
return;
}

This is the code for sendInProgress related ones:

protected synchronized void acquireSendInProgress() {
sendInProgress = true;
}
protected synchronized void releaseSendInProgress() {
sendInProgress = false;
}
protected synchronized boolean isSendInProgress() {
return sendInProgress;
}
protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries,
boolean needAcquireSendInProgress) {
if (needAcquireSendInProgress) {
acquireSendInProgress();
}
try {
return trySendMessagesToConsumers(readType, entries);
} finally {
releaseSendInProgress();
}
}

By default, the sendMessagesToConsumers method gets called asynchronously:

// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
// that may be quite expensive
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// setting sendInProgress here, because sendMessagesToConsumers will be executed
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
dispatchMessagesThread.execute(() -> {
handleSendingMessagesAndReadingMore(readType, entries, false, totalBytesSize);
});

It will first set the sendInProgress flag and then schedule the call. Any readMoreEntries calls happening before handleSendingMessagesAndReadingMore is called, will be dropped. If the handleSendingMessagesAndReadingMore doesn't trigger a new call to readMoreEntries (like it should and can validly do), the problem described in the issue can occur.

A similar race condition problem could also happen with the Shared subscription type, this is not specific to Key_Shared.

@lhotari lhotari changed the title [Bug] Key_Shared subscription doesn't deliver messages in the replay queue when no new messages are produced [Bug] Key_Shared or Shared subscription doesn't always deliver messages from the replay queue and leaves a backlog unless new messages are produced Jan 18, 2025
@lhotari lhotari changed the title [Bug] Key_Shared or Shared subscription doesn't always deliver messages from the replay queue and leaves a backlog unless new messages are produced [Bug] Key_Shared or Shared subscription doesn't always deliver messages from the replay queue after a consumer disconnects and leaves a backlog unless new messages are produced Jan 18, 2025
@lhotari
Copy link
Member Author

lhotari commented Jan 18, 2025

I also noticed another subtle problem regarding Key_Shared subscription.
If the cursor is already in waiting mode when messages are added to the replay queue, it would be necessary to call cancelPendingRead so that new messages produced to the topic don't trigger a read while the replay queue messages are read. This would lead to the recently added messages to be dropped and added to the replay queue here:

// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
// This may happen when consumer closed. See issue #12885 for details.
Optional<Position> firstReplayPosition = getFirstPositionInReplay();
if (firstReplayPosition.isPresent()) {
Position replayPosition = firstReplayPosition.get();
if (this.minReplayedPosition != null) {
// If relayPosition is a new entry wither smaller position is inserted for redelivery during this
// async read, it is possible that this relayPosition should dispatch to consumer first. So in
// order to preserver order delivery, we need to discard this read result, and try to trigger a
// replay read, that containing "relayPosition", by calling readMoreEntries.
if (replayPosition.compareTo(minReplayedPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, "
+ "discard this read and retry with readMoreEntries.",
name, replayPosition, minReplayedPosition, readType);
}
if (readType == ReadType.Normal) {
entries.forEach(this::addEntryToReplay);
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
skipNextBackoff = true;
return true;
}
}
}

For Shared subscription, it's not necessary to cancel the pending read, since ordering doesn't matter. The same applies when Key_Shared subscription is running in allowOutOfOrderDelivery mode.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Development

No branches or pull requests

2 participants