Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [ml] fix wrong msg backlog of non-durable cursor after trim ledgers #21250

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2850,15 +2850,14 @@ void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) throws LedgerNo
return;
}

// need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
// calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return
// incorrect results
Long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
if (firstNonDeletedLedger == null) {
throw new LedgerNotExistException("First non deleted Ledger is not found");
// Just ack messages like a consumer. Normally, consumers will not confirm a position that does not exist, so
// find the latest existing position to ack.
PositionImpl highestPositionToDelete = calculateLastEntryInLedgerList(ledgersToDelete);
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
if (highestPositionToDelete == null) {
log.warn("[{}] The ledgers to be trim are all empty, skip to advance non-durable cursors: {}",
name, ledgersToDelete);
return;
}
PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);

cursors.forEach(cursor -> {
// move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
// to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be
Expand All @@ -2882,6 +2881,19 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
});
}

/**
* @return null if all ledgers is empty.
*/
private PositionImpl calculateLastEntryInLedgerList(List<LedgerInfo> ledgersToDelete) {
for (int i = ledgersToDelete.size() - 1; i >= 0; i--) {
LedgerInfo ledgerInfo = ledgersToDelete.get(i);
if (ledgerInfo != null && ledgerInfo.hasEntries() && ledgerInfo.getEntries() > 0) {
return PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
}
}
return null;
}

/**
* Delete this ManagedLedger completely from the system.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -55,7 +56,7 @@

@Test(groups = "broker-api")
@Slf4j
public class NonDurableSubscriptionTest extends ProducerConsumerBase {
public class NonDurableSubscriptionTest extends ProducerConsumerBase {

private final AtomicInteger numFlow = new AtomicInteger(0);

Expand Down Expand Up @@ -316,7 +317,7 @@ private void switchLedgerManually(final String tpName) throws Exception {
}

@Test
public void testTrimLedgerIfNoDurableCursor() throws Exception {
public void testHasMessageAvailableIfIncomingQueueNotEmpty() throws Exception {
final String nonDurableCursor = "non-durable-cursor";
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
Expand Down Expand Up @@ -557,4 +558,114 @@ public void testReaderInitAtDeletedPosition() throws Exception {
producer.close();
admin.topics().delete(topicName, false);
}

@Test
public void testTrimLedgerIfNoDurableCursor() throws Exception {
final String nonDurableCursor = "non-durable-cursor";
final String durableCursor = "durable-cursor";
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(topicName);
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
.subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).receiverQueueSize(1)
.subscriptionName(durableCursor).subscribe();
consumer.close();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
producer.send("1");
producer.send("2");
producer.send("3");
producer.send("4");
MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) producer.send("5");

Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg1.getValue(), "1");
Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg2.getValue(), "2");
Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg3.getValue(), "3");

// Unsubscribe durable cursor.
// Trigger a trim ledgers task, and verify trim ledgers successful.
admin.topics().unload(topicName);
Thread.sleep(3 * 1000);
admin.topics().deleteSubscription(topicName, durableCursor);
// Trim ledgers after release durable cursor.
trimLedgers(topicName);
List<ManagedLedgerInternalStats.LedgerInfo> ledgers = admin.topics().getInternalStats(topicName).ledgers;
assertEquals(ledgers.size(), 1);
assertNotEquals(ledgers.get(0).ledgerId, msgIdInDeletedLedger5.getLedgerId());

// Verify backlog and markDeletePosition is correct.
Awaitility.await().untilAsserted(() -> {
SubscriptionStats subscriptionStats = admin.topics().getStats(topicName, true, true, true)
.getSubscriptions().get(nonDurableCursor);
log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
assertEquals(subscriptionStats.getMsgBacklog(), 0);
ManagedLedgerInternalStats.CursorStats cursorStats =
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":");
PositionImpl actMarkDeletedPos =
PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
PositionImpl expectedMarkDeletedPos =
PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId());
log.info("Expected mark deleted position: {}", expectedMarkDeletedPos);
log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition);
Assert.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
});

// Clear the incoming queue of the reader for next test.
while (true) {
Message<String> msg = reader.readNext(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
log.info("clear msg: {}", msg.getValue());
}

// The following tests are designed to verify the api "getNumberOfEntries" and "consumedEntries" still work
// after changes.See the code-description added with the PR https://github.com/apache/pulsar/pull/10667.
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(nonDurableCursor);

// Verify "getNumberOfEntries" if there is no entries to consume.
assertEquals(0, cursor.getNumberOfEntries());
assertEquals(0, ml.getNumberOfEntries());

// Verify "getNumberOfEntries" if there is 1 entry to consume.
producer.send("6");
producer.send("7");
Awaitility.await().untilAsserted(() -> {
assertEquals(2, ml.getNumberOfEntries());
// Since there is one message has been pulled into the incoming queue of reader. There is only one messages
// waiting to cursor read.
assertEquals(1, cursor.getNumberOfEntries());
});

// Verify "consumedEntries" is correct.
ManagedLedgerInternalStats.CursorStats cursorStats =
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
// "messagesConsumedCounter" should be 0 after unload the topic.
// Note: "topic_internal_stat.cursor.messagesConsumedCounter" means how many messages were acked on this
// cursor. The similar one "topic_stats.lastConsumedTimestamp" means the last time of sending messages to
// the consumer.
assertEquals(0, cursorStats.messagesConsumedCounter);
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
Message<String> msg6 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg6.getValue(), "6");
Message<String> msg7 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg7.getValue(), "7");
Awaitility.await().untilAsserted(() -> {
// "messagesConsumedCounter" should be 2 after consumed 2 message.
ManagedLedgerInternalStats.CursorStats cStat =
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
assertEquals(2, cStat.messagesConsumedCounter);
});

// cleanup.
reader.close();
producer.close();
admin.topics().delete(topicName, false);
}
}