From 97c461c28e19b285039e8ede988b30053757119b Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Sep 2024 10:37:34 +0300 Subject: [PATCH 1/2] Add failing test case --- ...ckyKeyDispatcherMultipleConsumersTest.java | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index b78d1e554c32d..dcd852f409dbb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -46,6 +46,8 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -996,6 +998,86 @@ protected void reScheduleReadInMs(long readAfterMs) { ); } + + @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched") + public void testNoBackoffDelayWhenDelayedMessages(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared) + throws Exception { + persistentDispatcher.close(); + + doReturn(dispatchMessagesInSubscriptionThread).when(configMock) + .isDispatcherDispatchMessagesInSubscriptionThread(); + + AtomicInteger readMoreEntriesCalled = new AtomicInteger(0); + AtomicInteger reScheduleReadInMsCalled = new AtomicInteger(0); + AtomicBoolean delayAllMessages = new AtomicBoolean(true); + + PersistentDispatcherMultipleConsumers dispatcher; + if (isKeyShared) { + dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( + topicMock, cursorMock, subscriptionMock, configMock, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { + @Override + protected void reScheduleReadInMs(long readAfterMs) { + reScheduleReadInMsCalled.incrementAndGet(); + } + + @Override + public synchronized void readMoreEntries() { + readMoreEntriesCalled.incrementAndGet(); + } + + @Override + public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { + if (delayAllMessages.get()) { + // simulate delayed message + return true; + } + return super.trackDelayedDelivery(ledgerId, entryId, msgMetadata); + } + }; + } else { + dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { + @Override + protected void reScheduleReadInMs(long readAfterMs) { + reScheduleReadInMsCalled.incrementAndGet(); + } + + @Override + public synchronized void readMoreEntries() { + readMoreEntriesCalled.incrementAndGet(); + } + + @Override + public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { + if (delayAllMessages.get()) { + // simulate delayed message + return true; + } + return super.trackDelayedDelivery(ledgerId, entryId, msgMetadata); + } + }; + } + + doAnswer(invocationOnMock -> { + GenericFutureListener> listener = invocationOnMock.getArgument(0); + Future future = mock(Future.class); + when(future.isDone()).thenReturn(true); + listener.operationComplete(future); + return channelMock; + }).when(channelMock).addListener(any()); + + // add a consumer with permits + consumerMockAvailablePermits.set(1000); + dispatcher.addConsumer(consumerMock); + + List entries = new ArrayList<>(List.of(EntryImpl.create(1, 1, createMessage("message1", 1)))); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + Awaitility.await().untilAsserted(() -> { + assertEquals(reScheduleReadInMsCalled.get(), 0, "reScheduleReadInMs should not be called"); + assertTrue(readMoreEntriesCalled.get() >= 1); + }); + } + private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); } From 957117de4855e2a52c9114be8019cd67566b88fc Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 23 Sep 2024 23:20:03 +0300 Subject: [PATCH 2/2] [fix][broker] Avoid introducing delay when building the delayed messages index --- ...PersistentDispatcherMultipleConsumers.java | 34 +++++++++++++------ ...tStickyKeyDispatcherMultipleConsumers.java | 11 +++--- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 8fdb65e7b3076..73ad2cf0a3dee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -134,7 +134,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false); protected final ExecutorService dispatchMessagesThread; private final SharedConsumerAssignor assignor; - protected int lastNumberOfEntriesDispatched; + // tracks how many entries were processed by consumers in the last trySendMessagesToConsumers call + // the number includes also delayed messages, marker messages, aborted txn messages and filtered messages + // When no messages were processed, the value is 0. This is also an indication that the dispatcher didn't + // make progress in the last trySendMessagesToConsumers call. + protected int lastNumberOfEntriesProcessed; protected boolean skipNextBackoff; private final Backoff retryBackoff; protected enum ReadType { @@ -727,19 +731,22 @@ private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, boolean needAcquireSendInProgress, long totalBytesSize) { boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress); - int entriesDispatched = lastNumberOfEntriesDispatched; + int entriesProcessed = lastNumberOfEntriesProcessed; updatePendingBytesToDispatch(-totalBytesSize); - if (entriesDispatched > 0) { - // Reset the backoff when we successfully dispatched messages + boolean canReadMoreImmediately = false; + if (entriesProcessed > 0 || skipNextBackoff) { + // Reset the backoff when messages were processed retryBackoff.reset(); + // Reset the possible flag to skip the backoff delay + skipNextBackoff = false; + canReadMoreImmediately = true; } if (triggerReadingMore) { - if (entriesDispatched > 0 || skipNextBackoff) { - skipNextBackoff = false; + if (canReadMoreImmediately) { // Call readMoreEntries in the same thread to trigger the next read readMoreEntries(); - } else if (entriesDispatched == 0) { - // If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay + } else { + // reschedule a new read with an increasing backoff delay reScheduleReadWithBackoff(); } } @@ -779,7 +786,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis if (needTrimAckedMessages()) { cursor.trimDeletedEntries(entries); } - lastNumberOfEntriesDispatched = 0; + lastNumberOfEntriesProcessed = 0; int entriesToDispatch = entries.size(); // Trigger read more messages @@ -809,6 +816,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis long totalMessagesSent = 0; long totalBytesSent = 0; long totalEntries = 0; + long totalEntriesProcessed = 0; int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1; // If the dispatcher is closed, firstAvailableConsumerPermits will be 0, which skips dispatching the @@ -820,6 +828,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis log.info("[{}] rewind because no available consumer found from total {}", name, consumerList.size()); entries.subList(start, entries.size()).forEach(Entry::release); cursor.rewind(); + lastNumberOfEntriesProcessed = (int) totalEntriesProcessed; return false; } @@ -863,6 +872,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis totalEntries += filterEntriesForConsumer(metadataArray, start, entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay, c); + totalEntriesProcessed += entriesForThisConsumer.size(); c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker); @@ -882,7 +892,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis totalBytesSent += sendMessageInfo.getTotalBytes(); } - lastNumberOfEntriesDispatched = (int) totalEntries; + lastNumberOfEntriesProcessed = (int) totalEntriesProcessed; acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); if (entriesToDispatch > 0) { @@ -917,6 +927,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType, long totalMessagesSent = 0; long totalBytesSent = 0; long totalEntries = 0; + long totalEntriesProcessed = 0; final AtomicInteger numConsumers = new AtomicInteger(assignResult.size()); for (Map.Entry> current : assignResult.entrySet()) { final Consumer consumer = current.getKey(); @@ -947,6 +958,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType, totalEntries += filterEntriesForConsumer(entryAndMetadataList, batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay, consumer); + totalEntriesProcessed += entryAndMetadataList.size(); consumer.sendMessages(entryAndMetadataList, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker() @@ -962,7 +974,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType, totalBytesSent += sendMessageInfo.getTotalBytes(); } - lastNumberOfEntriesDispatched = (int) totalEntries; + lastNumberOfEntriesProcessed = (int) totalEntriesProcessed; acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); return numConsumers.get() == 0; // trigger a new readMoreEntries() call diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 26463ba902c58..ecd3f19a14028 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -190,10 +190,11 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE @Override protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { - lastNumberOfEntriesDispatched = 0; + lastNumberOfEntriesProcessed = 0; long totalMessagesSent = 0; long totalBytesSent = 0; long totalEntries = 0; + long totalEntriesProcessed = 0; int entriesCount = entries.size(); // Trigger read more messages @@ -233,6 +234,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } else if (readType == ReadType.Replay) { entries.forEach(Entry::release); } + skipNextBackoff = true; return true; } } @@ -298,6 +300,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForConsumer.size()); totalEntries += filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay, consumer); + totalEntriesProcessed += entriesForConsumer.size(); consumer.sendMessages(entriesForConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), @@ -368,7 +371,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } } - lastNumberOfEntriesDispatched = (int) totalEntries; + lastNumberOfEntriesProcessed = (int) totalEntriesProcessed; // acquire message-dispatch permits for already delivered messages acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); @@ -387,8 +390,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis return true; } - // if no messages were sent, we should retry after a backoff delay - if (entriesByConsumerForDispatching.size() == 0) { + // if no messages were sent to consumers, we should retry + if (totalEntries == 0) { return true; }