Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Avoid introducing delay when there are delayed messages or marker messages #23343

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;
protected int lastNumberOfEntriesDispatched;
// tracks how many entries were processed by consumers in the last trySendMessagesToConsumers call
// the number includes also delayed messages, marker messages, aborted txn messages and filtered messages
// When no messages were processed, the value is 0. This is also an indication that the dispatcher didn't
// make progress in the last trySendMessagesToConsumers call.
protected int lastNumberOfEntriesProcessed;
protected boolean skipNextBackoff;
private final Backoff retryBackoff;
protected enum ReadType {
Expand Down Expand Up @@ -727,19 +731,22 @@ private synchronized void handleSendingMessagesAndReadingMore(ReadType readType,
boolean needAcquireSendInProgress,
long totalBytesSize) {
boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress);
int entriesDispatched = lastNumberOfEntriesDispatched;
int entriesProcessed = lastNumberOfEntriesProcessed;
updatePendingBytesToDispatch(-totalBytesSize);
if (entriesDispatched > 0) {
// Reset the backoff when we successfully dispatched messages
boolean canReadMoreImmediately = false;
if (entriesProcessed > 0 || skipNextBackoff) {
// Reset the backoff when messages were processed
retryBackoff.reset();
// Reset the possible flag to skip the backoff delay
skipNextBackoff = false;
canReadMoreImmediately = true;
}
if (triggerReadingMore) {
if (entriesDispatched > 0 || skipNextBackoff) {
skipNextBackoff = false;
if (canReadMoreImmediately) {
// Call readMoreEntries in the same thread to trigger the next read
readMoreEntries();
} else if (entriesDispatched == 0) {
// If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay
} else {
// reschedule a new read with an increasing backoff delay
reScheduleReadWithBackoff();
}
}
Expand Down Expand Up @@ -779,7 +786,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
lastNumberOfEntriesDispatched = 0;
lastNumberOfEntriesProcessed = 0;

int entriesToDispatch = entries.size();
// Trigger read more messages
Expand Down Expand Up @@ -809,6 +816,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
long totalEntriesProcessed = 0;
int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;

// If the dispatcher is closed, firstAvailableConsumerPermits will be 0, which skips dispatching the
Expand All @@ -820,6 +828,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
log.info("[{}] rewind because no available consumer found from total {}", name, consumerList.size());
entries.subList(start, entries.size()).forEach(Entry::release);
cursor.rewind();
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
return false;
}

Expand Down Expand Up @@ -863,6 +872,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalEntries += filterEntriesForConsumer(metadataArray, start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay, c);
totalEntriesProcessed += entriesForThisConsumer.size();

c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
Expand All @@ -882,7 +892,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (entriesToDispatch > 0) {
Expand Down Expand Up @@ -917,6 +927,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
long totalEntriesProcessed = 0;
final AtomicInteger numConsumers = new AtomicInteger(assignResult.size());
for (Map.Entry<Consumer, List<EntryAndMetadata>> current : assignResult.entrySet()) {
final Consumer consumer = current.getKey();
Expand Down Expand Up @@ -947,6 +958,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,

totalEntries += filterEntriesForConsumer(entryAndMetadataList, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay, consumer);
totalEntriesProcessed += entryAndMetadataList.size();
consumer.sendMessages(entryAndMetadataList, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker()
Expand All @@ -962,7 +974,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

return numConsumers.get() == 0; // trigger a new readMoreEntries() call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,11 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE

@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
lastNumberOfEntriesDispatched = 0;
lastNumberOfEntriesProcessed = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
long totalEntriesProcessed = 0;
int entriesCount = entries.size();

// Trigger read more messages
Expand Down Expand Up @@ -233,6 +234,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
skipNextBackoff = true;
return true;
}
}
Expand Down Expand Up @@ -298,6 +300,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForConsumer.size());
totalEntries += filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay, consumer);
totalEntriesProcessed += entriesForConsumer.size();
consumer.sendMessages(entriesForConsumer, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
Expand Down Expand Up @@ -368,7 +371,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}
}

lastNumberOfEntriesDispatched = (int) totalEntries;
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;

// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
Expand All @@ -387,8 +390,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
return true;
}

// if no messages were sent, we should retry after a backoff delay
if (entriesByConsumerForDispatching.size() == 0) {
// if no messages were sent to consumers, we should retry
if (totalEntries == 0) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -996,6 +998,86 @@ protected void reScheduleReadInMs(long readAfterMs) {
);
}


@Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
public void testNoBackoffDelayWhenDelayedMessages(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
throws Exception {
persistentDispatcher.close();

doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
.isDispatcherDispatchMessagesInSubscriptionThread();

AtomicInteger readMoreEntriesCalled = new AtomicInteger(0);
AtomicInteger reScheduleReadInMsCalled = new AtomicInteger(0);
AtomicBoolean delayAllMessages = new AtomicBoolean(true);

PersistentDispatcherMultipleConsumers dispatcher;
if (isKeyShared) {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
reScheduleReadInMsCalled.incrementAndGet();
}

@Override
public synchronized void readMoreEntries() {
readMoreEntriesCalled.incrementAndGet();
}

@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (delayAllMessages.get()) {
// simulate delayed message
return true;
}
return super.trackDelayedDelivery(ledgerId, entryId, msgMetadata);
}
};
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
reScheduleReadInMsCalled.incrementAndGet();
}

@Override
public synchronized void readMoreEntries() {
readMoreEntriesCalled.incrementAndGet();
}

@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (delayAllMessages.get()) {
// simulate delayed message
return true;
}
return super.trackDelayedDelivery(ledgerId, entryId, msgMetadata);
}
};
}

doAnswer(invocationOnMock -> {
GenericFutureListener<Future<Void>> listener = invocationOnMock.getArgument(0);
Future<Void> future = mock(Future.class);
when(future.isDone()).thenReturn(true);
listener.operationComplete(future);
return channelMock;
}).when(channelMock).addListener(any());

// add a consumer with permits
consumerMockAvailablePermits.set(1000);
dispatcher.addConsumer(consumerMock);

List<Entry> entries = new ArrayList<>(List.of(EntryImpl.create(1, 1, createMessage("message1", 1))));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(reScheduleReadInMsCalled.get(), 0, "reScheduleReadInMs should not be called");
assertTrue(readMoreEntriesCalled.get() >= 1);
});
}

private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
Expand Down
Loading