Skip to content

Commit

Permalink
[fix] [ml] fix wrong msg backlog of non-durable cursor after trim led…
Browse files Browse the repository at this point in the history
…gers (apache#21250)

### Background
- But after trimming ledgers, `ml.lastConfirmedPosition` relies on a deleted ledger when the current ledger of ML is empty.
- Cursor prevents setting `markDeletedPosition` to a value larger than `ml.lastConfirmedPosition`, but there are no entries to read<sup>[1]</sup>.
- The code description in the method `advanceCursors` said: do not make `cursor.markDeletedPosition` larger than `ml.lastConfirmedPosition`<sup>[2]</sup>

### Issue
If there is no durable cursor, the `markDeletedPosition` might be set to `{current_ledger, -1}`, and `async mark delete` will be prevented by the `rule-2` above. So he `backlog`, `readPosition`, and `markDeletedPosition` of the cursor will be in an incorrect position after trimming the ledger. You can reproduce it by the test `testTrimLedgerIfNoDurableCursor`

### Modifications
Do not make `cursor.markDeletedPosition` larger than `ml.lastConfirmedPosition` when advancing non-durable cursors.

(cherry picked from commit ca77982)
(cherry picked from commit 6895919)
  • Loading branch information
poorbarcode authored and mukesh-ctds committed Apr 17, 2024
1 parent 89dcb8c commit feb163b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2895,15 +2895,14 @@ void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) throws LedgerNo
return;
}

// need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
// calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return
// incorrect results
Long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
if (firstNonDeletedLedger == null) {
throw new LedgerNotExistException("First non deleted Ledger is not found");
// Just ack messages like a consumer. Normally, consumers will not confirm a position that does not exist, so
// find the latest existing position to ack.
PositionImpl highestPositionToDelete = calculateLastEntryInLedgerList(ledgersToDelete);
if (highestPositionToDelete == null) {
log.warn("[{}] The ledgers to be trim are all empty, skip to advance non-durable cursors: {}",
name, ledgersToDelete);
return;
}
PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);

cursors.forEach(cursor -> {
// move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
// to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be
Expand All @@ -2927,6 +2926,19 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
});
}

/**
* @return null if all ledgers is empty.
*/
private PositionImpl calculateLastEntryInLedgerList(List<LedgerInfo> ledgersToDelete) {
for (int i = ledgersToDelete.size() - 1; i >= 0; i--) {
LedgerInfo ledgerInfo = ledgersToDelete.get(i);
if (ledgerInfo != null && ledgerInfo.hasEntries() && ledgerInfo.getEntries() > 0) {
return PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
}
}
return null;
}

/**
* Delete this ManagedLedger completely from the system.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -311,7 +312,7 @@ private void switchLedgerManually(final String tpName) throws Exception {
}

@Test
public void testTrimLedgerIfNoDurableCursor() throws Exception {
public void testHasMessageAvailableIfIncomingQueueNotEmpty() throws Exception {
final String nonDurableCursor = "non-durable-cursor";
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
Expand Down

0 comments on commit feb163b

Please sign in to comment.