From 8402e079c8f2a037e2b86cdab5ecdfd2faa000dc Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 Aug 2022 10:37:32 +0300 Subject: [PATCH] [fix][broker] Keep sorted list of cursors ordered by read position of active cursors when cacheEvictionByMarkDeletedPosition=false Fixes #16054 - calculate the sorted list of when a read position gets updated - this resolves #9958 in a proper way - #12045 broke the caching solution as explained in #16054 - remove invalid tests - fix tests - add more tests to handle corner cases --- .../mledger/impl/ManagedCursorContainer.java | 95 ++--- .../mledger/impl/ManagedCursorImpl.java | 7 + .../mledger/impl/ManagedLedgerImpl.java | 66 +-- .../mledger/impl/EntryCacheManagerTest.java | 2 +- .../impl/ManagedCursorContainerTest.java | 15 +- .../mledger/impl/ManagedLedgerTest.java | 402 +++++++----------- .../test/MockedBookKeeperTestCase.java | 4 + 7 files changed, 238 insertions(+), 353 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index 5a96ee08de947..5ceadc4934085 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.StampedLock; +import java.util.function.Function; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.tuple.Pair; @@ -50,28 +51,25 @@ private static class Item { PositionImpl position; int idx; - Item(ManagedCursor cursor, int idx) { + Item(ManagedCursor cursor, PositionImpl position, int idx) { this.cursor = cursor; - this.position = (PositionImpl) cursor.getMarkDeletedPosition(); + this.position = position; this.idx = idx; } } - public enum CursorType { - DurableCursor, - NonDurableCursor, - ALL + public ManagedCursorContainer() { + this(cursor -> (PositionImpl) cursor.getMarkDeletedPosition()); } - public ManagedCursorContainer() { - cursorType = CursorType.DurableCursor; + private ManagedCursorContainer(Function positionFunction) { + this.positionFunction = positionFunction; } - public ManagedCursorContainer(CursorType cursorType) { - this.cursorType = cursorType; + public static ManagedCursorContainer createWithReadPositionOrdering() { + return new ManagedCursorContainer(cursor -> (PositionImpl) cursor.getReadPosition()); } - private final CursorType cursorType; // Used to keep track of slowest cursor. Contains all of all active cursors. private final ArrayList heap = new ArrayList(); @@ -81,46 +79,26 @@ public ManagedCursorContainer(CursorType cursorType) { private final StampedLock rwLock = new StampedLock(); + private int durableCursorCount; + + private final Function positionFunction; + public void add(ManagedCursor cursor) { long stamp = rwLock.writeLock(); try { // Append a new entry at the end of the list - Item item = new Item(cursor, heap.size()); + Item item = new Item(cursor, positionFunction.apply(cursor), heap.size()); cursors.put(cursor.getName(), item); - - if (shouldTrackInHeap(cursor)) { - heap.add(item); - siftUp(item); + heap.add(item); + siftUp(item); + if (cursor.isDurable()) { + durableCursorCount++; } } finally { rwLock.unlockWrite(stamp); } } - private boolean shouldTrackInHeap(ManagedCursor cursor) { - return CursorType.ALL.equals(cursorType) - || (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType)) - || (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType)); - } - - public PositionImpl getSlowestReadPositionForActiveCursors() { - long stamp = rwLock.readLock(); - try { - return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition(); - } finally { - rwLock.unlockRead(stamp); - } - } - - public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() { - long stamp = rwLock.readLock(); - try { - return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition(); - } finally { - rwLock.unlockRead(stamp); - } - } - public ManagedCursor get(String name) { long stamp = rwLock.readLock(); try { @@ -135,13 +113,16 @@ public void removeCursor(String name) { long stamp = rwLock.writeLock(); try { Item item = cursors.remove(name); - if (item != null && shouldTrackInHeap(item.cursor)) { + if (item != null) { // Move the item to the right end of the heap to be removed Item lastItem = heap.get(heap.size() - 1); swap(item, lastItem); heap.remove(item.idx); // Update the heap siftDown(lastItem); + if (item.cursor.isDurable()) { + durableCursorCount--; + } } } finally { rwLock.unlockWrite(stamp); @@ -165,24 +146,20 @@ public Pair cursorUpdated(ManagedCursor cursor, Posi return null; } + PositionImpl previousSlowestConsumer = heap.get(0).position; - if (shouldTrackInHeap(item.cursor)) { - PositionImpl previousSlowestConsumer = heap.get(0).position; + // When the cursor moves forward, we need to push it toward the + // bottom of the tree and push it up if a reset was done - // When the cursor moves forward, we need to push it toward the - // bottom of the tree and push it up if a reset was done - - item.position = (PositionImpl) newPosition; - if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) { - siftDown(item); - } else { - siftUp(item); - } - - PositionImpl newSlowestConsumer = heap.get(0).position; - return Pair.of(previousSlowestConsumer, newSlowestConsumer); + item.position = (PositionImpl) newPosition; + if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) { + siftDown(item); + } else { + siftUp(item); } - return null; + + PositionImpl newSlowestConsumer = heap.get(0).position; + return Pair.of(previousSlowestConsumer, newSlowestConsumer); } finally { rwLock.unlockWrite(stamp); } @@ -237,18 +214,18 @@ public boolean isEmpty() { */ public boolean hasDurableCursors() { long stamp = rwLock.tryOptimisticRead(); - boolean isEmpty = heap.isEmpty(); + int count = durableCursorCount; if (!rwLock.validate(stamp)) { // Fallback to read lock stamp = rwLock.readLock(); try { - isEmpty = heap.isEmpty(); + count = durableCursorCount; } finally { rwLock.unlockRead(stamp); } } - return !isEmpty; + return count > 0; } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index de85ac92ee80c..9dd4364744e17 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -635,6 +635,7 @@ private void recoveredCursor(PositionImpl position, Map properties persistentMarkDeletePosition = position; inProgressMarkDeletePersistPosition = null; readPosition = ledger.getNextValidPosition(position); + ledger.updateReadPosition(this, readPosition); lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null); // assign cursor-ledger so, it can be deleted when new ledger will be switched this.cursorLedger = recoveredFromCursorLedger; @@ -1215,6 +1216,7 @@ public void operationComplete() { ledger.getName(), newPosition, oldReadPosition, name); } readPosition = newPosition; + ledger.updateReadPosition(ManagedCursorImpl.this, newPosition); } finally { lock.writeLock().unlock(); } @@ -1701,6 +1703,7 @@ boolean hasMoreEntries(PositionImpl position) { void initializeCursorPosition(Pair lastPositionCounter) { readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft()); + ledger.updateReadPosition(this, readPosition); markDeletePosition = lastPositionCounter.getLeft(); lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null); persistentMarkDeletePosition = null; @@ -1775,6 +1778,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) { log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition); } + ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition); return newReadPosition; } else { return currentReadPosition; @@ -2356,6 +2360,7 @@ public void rewind() { log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition); readPosition = newReadPosition; + ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition); } finally { lock.writeLock().unlock(); } @@ -2373,6 +2378,7 @@ public void seek(Position newReadPositionInt, boolean force) { newReadPosition = ledger.getNextValidPosition(markDeletePosition); } readPosition = newReadPosition; + ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition); } finally { lock.writeLock().unlock(); } @@ -2573,6 +2579,7 @@ void setReadPosition(Position newReadPositionInt) { if (this.markDeletePosition == null || ((PositionImpl) newReadPositionInt).compareTo(this.markDeletePosition) > 0) { this.readPosition = (PositionImpl) newReadPositionInt; + ledger.updateReadPosition(this, newReadPositionInt); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index aa7c19b32bd02..ff5fc8ab2c010 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -166,8 +166,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final ManagedCursorContainer cursors = new ManagedCursorContainer(); private final ManagedCursorContainer activeCursors = new ManagedCursorContainer(); - private final ManagedCursorContainer nonDurableActiveCursors = - new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor); + + private final ManagedCursorContainer activeCursorsSortedByReadPosition = + ManagedCursorContainer.createWithReadPositionOrdering(); // Ever increasing counter of entries added @VisibleForTesting @@ -2182,21 +2183,12 @@ public boolean hasMoreEntries(PositionImpl position) { return result; } - void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition) { - Pair pair = activeCursors.cursorUpdated(cursor, newPosition); - if (pair != null) { - entryCache.invalidateEntries(pair.getRight()); - } - } - public PositionImpl getEvictionPosition(){ PositionImpl evictionPos; if (config.isCacheEvictionByMarkDeletedPosition()) { - PositionImpl earlierMarkDeletedPosition = getEarlierMarkDeletedPositionForActiveCursors(); - evictionPos = earlierMarkDeletedPosition != null ? earlierMarkDeletedPosition.getNext() : null; + evictionPos = activeCursors.getSlowestReaderPosition(); } else { - // Always remove all entries already read by active cursors - evictionPos = getEarlierReadPositionForActiveCursors(); + evictionPos = activeCursorsSortedByReadPosition.getSlowestReaderPosition(); } return evictionPos; } @@ -2213,32 +2205,9 @@ void doCacheEviction(long maxTimestamp) { entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp); } - private PositionImpl getEarlierReadPositionForActiveCursors() { - PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors(); - PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors(); - if (nonDurablePosition == null) { - return durablePosition; - } - if (durablePosition == null) { - return nonDurablePosition; - } - return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; - } - - private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() { - PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors(); - PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors(); - if (nonDurablePosition == null) { - return durablePosition; - } - if (durablePosition == null) { - return nonDurablePosition; - } - return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; - } - void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { Pair pair = cursors.cursorUpdated(cursor, newPosition); + activeCursors.cursorUpdated(cursor, newPosition); if (pair == null) { // Cursor has been removed in the meantime trimConsumedLedgersInBackground(); @@ -2259,6 +2228,12 @@ void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { } } + public void updateReadPosition(ManagedCursorImpl cursor, Position newReadPosition) { + if (!config.isCacheEvictionByMarkDeletedPosition()) { + activeCursorsSortedByReadPosition.cursorUpdated(cursor, newReadPosition); + } + } + PositionImpl startReadOperationOnLedger(PositionImpl position) { Long ledgerId = ledgers.ceilingKey(position.getLedgerId()); if (ledgerId != null && ledgerId != position.getLedgerId()) { @@ -3512,9 +3487,9 @@ Pair getFirstPositionAndCounter() { public void activateCursor(ManagedCursor cursor) { if (activeCursors.get(cursor.getName()) == null) { activeCursors.add(cursor); - } - if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) { - nonDurableActiveCursors.add(cursor); + if (!config.isCacheEvictionByMarkDeletedPosition()) { + activeCursorsSortedByReadPosition.add(cursor); + } } } @@ -3522,19 +3497,14 @@ public void deactivateCursor(ManagedCursor cursor) { synchronized (activeCursors) { if (activeCursors.get(cursor.getName()) != null) { activeCursors.removeCursor(cursor.getName()); + if (!config.isCacheEvictionByMarkDeletedPosition()) { + activeCursorsSortedByReadPosition.removeCursor(cursor.getName()); + } if (!activeCursors.hasDurableCursors()) { // cleanup cache if there is no active subscription entryCache.clear(); - } else { - // if removed subscription was the slowest subscription : update cursor and let it clear cache: - // till new slowest-cursor's read-position - discardEntriesFromCache((ManagedCursorImpl) activeCursors.getSlowestReader(), - getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition())); } } - if (!cursor.isDurable()) { - nonDurableActiveCursors.removeCursor(cursor.getName()); - } } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 5b34dc3eb5918..8ac1ad4d857b9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -332,7 +332,7 @@ public void verifyHitsMisses() throws Exception { PositionImpl pos = (PositionImpl) entries.get(entries.size() - 1).getPosition(); c2.setReadPosition(pos); - ledger.discardEntriesFromCache(c2, pos); + ledger.doCacheEviction(0); entries.forEach(Entry::release); factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 1d9315ee2967d..d63238ea8cb55 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -402,9 +402,8 @@ public boolean isClosed() { @Test public void testSlowestReadPositionForActiveCursors() throws Exception { - ManagedCursorContainer container = - new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor); - assertNull(container.getSlowestReadPositionForActiveCursors()); + ManagedCursorContainer container = ManagedCursorContainer.createWithReadPositionOrdering(); + assertNull(container.getSlowestReaderPosition()); // Add no durable cursor PositionImpl position = PositionImpl.get(5,5); @@ -412,7 +411,7 @@ public void testSlowestReadPositionForActiveCursors() throws Exception { doReturn(false).when(cursor1).isDurable(); doReturn(position).when(cursor1).getReadPosition(); container.add(cursor1); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5)); // Add no durable cursor position = PositionImpl.get(1,1); @@ -420,23 +419,23 @@ public void testSlowestReadPositionForActiveCursors() throws Exception { doReturn(false).when(cursor2).isDurable(); doReturn(position).when(cursor2).getReadPosition(); container.add(cursor2); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1)); // Move forward cursor, cursor1 = 5:5, cursor2 = 5:6, slowest is 5:5 position = PositionImpl.get(5,6); container.cursorUpdated(cursor2, position); doReturn(position).when(cursor2).getReadPosition(); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5)); // Move forward cursor, cursor1 = 5:8, cursor2 = 5:6, slowest is 5:6 position = PositionImpl.get(5,8); doReturn(position).when(cursor1).getReadPosition(); container.cursorUpdated(cursor1, position); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 6)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6)); // Remove cursor, only cursor1 left, cursor1 = 5:8 container.removeCursor(cursor2.getName()); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 8)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 8)); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 37fa56cd989bc..08bbe85756cff 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -106,6 +106,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; @@ -296,265 +297,120 @@ public void acknowledge1() throws Exception { } @Test - public void testDoCacheEviction() throws Throwable { - CompletableFuture result = new CompletableFuture<>(); - ManagedLedgerConfig config = new ManagedLedgerConfig(); - config.setCacheEvictionByMarkDeletedPosition(true); - factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS - .toNanos(30000)); - factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { - @Override - public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { - @Override - public void openCursorComplete(ManagedCursor cursor, Object ctx) { - ManagedLedger ledger = (ManagedLedger) ctx; - String message1 = "test"; - ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() { - @Override - public void addComplete(Position position, ByteBuf entryData, Object ctx) { - try { - @SuppressWarnings("unchecked") - Pair pair = (Pair) ctx; - ManagedLedger ledger = pair.getLeft(); - ManagedCursor cursor = pair.getRight(); - if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { - result.complete(false); - return; - } - - ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger; - ledgerImpl.getActiveCursors().removeCursor(cursor.getName()); - assertNull(ledgerImpl.getEvictionPosition()); - assertTrue(ledgerImpl.getCacheSize() == message1.getBytes(Encoding).length); - ledgerImpl.doCacheEviction(System.nanoTime()); - assertTrue(ledgerImpl.getCacheSize() <= 0); - result.complete(true); - } catch (Throwable e) { - result.completeExceptionally(e); - } - } - - @Override - public void addFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, Pair.of(ledger, cursor)); - } - - @Override - public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, ledger); - } + public void shouldKeepEntriesInCacheByEarliestReadPosition() throws ManagedLedgerException, InterruptedException { + // This test case reproduces issue #16054 - @Override - public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, null, null); - assertTrue(result.get()); - - log.info("Test completed"); - } - - @Test - public void testCacheEvictionByMarkDeletedPosition() throws Throwable { - CompletableFuture result = new CompletableFuture<>(); ManagedLedgerConfig config = new ManagedLedgerConfig(); - config.setCacheEvictionByMarkDeletedPosition(true); factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS .toNanos(30000)); - factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { - @Override - public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { - @Override - public void openCursorComplete(ManagedCursor cursor, Object ctx) { - ManagedLedger ledger = (ManagedLedger) ctx; - String message1 = "test"; - ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() { - @Override - public void addComplete(Position position, ByteBuf entryData, Object ctx) { - @SuppressWarnings("unchecked") - Pair pair = (Pair) ctx; - ManagedLedger ledger = pair.getLeft(); - ManagedCursor cursor = pair.getRight(); - if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { - result.complete(false); - return; - } - cursor.asyncReadEntries(1, new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - ManagedCursor cursor = (ManagedCursor) ctx; - assertEquals(entries.size(), 1); - Entry entry = entries.get(0); - final Position position = entry.getPosition(); - if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) { - result.complete(false); - return; - } - ((ManagedLedgerImpl) ledger).doCacheEviction( - System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); - if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { - result.complete(false); - return; - } - log.debug("Mark-Deleting to position {}", position); - cursor.asyncMarkDelete(position, new MarkDeleteCallback() { - @Override - public void markDeleteComplete(Object ctx) { - log.debug("Mark delete complete"); - ManagedCursor cursor = (ManagedCursor) ctx; - if (cursor.hasMoreEntries()) { - result.complete(false); - return; - } - ((ManagedLedgerImpl) ledger).doCacheEviction( - System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); - result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0); - } + // GIVEN an opened ledger with 10 opened cursors - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } + ManagedLedger ledger = factory.open("test_ledger_for_shouldKeepEntriesInCacheByEarliestReadPosition", + config); + List cursors = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + ManagedCursor cursor = ledger.openCursor("c" + i); + cursors.add(cursor); + } - }, cursor); - } + ManagedLedgerFactoryMXBean cacheStats = factory.getCacheStats(); + int insertedEntriesCountBefore = (int) cacheStats.getCacheInsertedEntriesCount(); - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, cursor, PositionImpl.LATEST); - } + // AND 100 added entries - @Override - public void addFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, Pair.of(ledger, cursor)); - } + for (int i = 0; i < 100; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } - @Override - public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } + int insertedEntriesCount = + (int) cacheStats.getCacheInsertedEntriesCount() - insertedEntriesCountBefore; + // EXPECT that 100 entries should have been inserted to the cache + assertEquals(insertedEntriesCount, 100); - }, ledger); - } + int evictedEntriesCountBefore = (int) cacheStats.getCacheEvictedEntriesCount(); - @Override - public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, null, null); + // WHEN entries are read for the cursors so that the farthest cursor has most entries read + for (int i = 0; i < 10; i++) { + ManagedCursor cursor = cursors.get(i); + // read entries farther of the earliest cursor + List entries = cursor.readEntries(20 - i); + // mark delete the least for the earliest cursor + cursor.markDelete(entries.get(i).getPosition()); + entries.forEach(Entry::release); + } - assertTrue(result.get()); + // THEN it is expected that the cache evicts entries to the earliest read position + Thread.sleep(2 * factory.getConfig().getCacheEvictionIntervalMs()); + int evictedEntriesCount = + (int) cacheStats.getCacheEvictedEntriesCount() - evictedEntriesCountBefore; + assertEquals(evictedEntriesCount, 11, + "It is expected that the cache evicts entries to the earliest read position"); - log.info("Test completed"); + ledger.close(); } @Test - public void testCacheEvictionByReadPosition() throws Throwable { - CompletableFuture result = new CompletableFuture<>(); + public void shouldKeepEntriesInCacheByEarliestMarkDeletePosition() throws ManagedLedgerException, InterruptedException { + // This test case reproduces issue #16054 + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setCacheEvictionByMarkDeletedPosition(true); factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS .toNanos(30000)); - factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { - @Override - public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { - @Override - public void openCursorComplete(ManagedCursor cursor, Object ctx) { - ManagedLedger ledger = (ManagedLedger) ctx; - String message1 = "test"; - ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() { - @Override - public void addComplete(Position position, ByteBuf entryData, Object ctx) { - @SuppressWarnings("unchecked") - Pair pair = (Pair) ctx; - ManagedLedger ledger = pair.getLeft(); - ManagedCursor cursor = pair.getRight(); - if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { - result.complete(false); - return; - } - cursor.asyncReadEntries(1, new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - ManagedCursor cursor = (ManagedCursor) ctx; - assertEquals(entries.size(), 1); - Entry entry = entries.get(0); - final Position position = entry.getPosition(); - if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) { - result.complete(false); - return; - } - ((ManagedLedgerImpl) ledger).doCacheEviction( - System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); - if (((ManagedLedgerImpl) ledger).getCacheSize() != 0) { - result.complete(false); - return; - } + // GIVEN an opened ledger with 10 opened cursors - log.debug("Mark-Deleting to position {}", position); - cursor.asyncMarkDelete(position, new MarkDeleteCallback() { - @Override - public void markDeleteComplete(Object ctx) { - log.debug("Mark delete complete"); - ManagedCursor cursor = (ManagedCursor) ctx; - if (cursor.hasMoreEntries()) { - result.complete(false); - return; - } - result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0); - } + ManagedLedger ledger = factory.open("test_ledger_for_shouldKeepEntriesInCacheByEarliestMarkDeletePosition", + config); + List cursors = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + ManagedCursor cursor = ledger.openCursor("c" + i); + cursors.add(cursor); + } - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } + ManagedLedgerFactoryMXBean cacheStats = factory.getCacheStats(); + int insertedEntriesCountBefore = (int) cacheStats.getCacheInsertedEntriesCount(); - }, cursor); - } + // AND 100 added entries - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, cursor, PositionImpl.LATEST); - } + for (int i = 0; i < 100; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } - @Override - public void addFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, Pair.of(ledger, cursor)); - } + int insertedEntriesCount = + (int) cacheStats.getCacheInsertedEntriesCount() - insertedEntriesCountBefore; + // EXPECT that 100 entries should have been inserted to the cache + assertEquals(insertedEntriesCount, 100); - @Override - public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } + int evictedEntriesCountBefore = (int) cacheStats.getCacheEvictedEntriesCount(); - }, ledger); - } + // WHEN entries are read for the cursors so that the farthest cursor has most entries read + Position lastMarkDeletePos = null; + for (int i = 0; i < 10; i++) { + ManagedCursor cursor = cursors.get(i); + // read 50 (+ index) entries for each cursor + List entries = cursor.readEntries(50 + (5 * i)); + // mark delete the most for the earliest cursor + lastMarkDeletePos = entries.get(20 - i).getPosition(); + cursor.markDelete(lastMarkDeletePos); + entries.forEach(Entry::release); + } - @Override - public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, null, null); + Thread.sleep(1000 + 2 * factory.getConfig().getCacheEvictionIntervalMs()); - assertTrue(result.get()); + ManagedCursorContainer activeCursors = (ManagedCursorContainer) ledger.getActiveCursors(); + assertEquals(activeCursors.getSlowestReaderPosition(), lastMarkDeletePos); - log.info("Test completed"); + // THEN it is expected that the cache evicts entries to the earliest read position + int evictedEntriesCount = + (int) cacheStats.getCacheEvictedEntriesCount() - evictedEntriesCountBefore; + assertEquals(evictedEntriesCount, 11, + "It is expected that the cache evicts entries to the earliest read position"); + + ledger.close(); } @Test(timeOut = 20000) @@ -1783,10 +1639,14 @@ public void testOpenRaceCondition() throws Exception { @Test public void invalidateConsumedEntriesFromCache() throws Exception { - ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger"); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateConsumedEntriesFromCache", + config); EntryCacheManager cacheManager = factory.getEntryCacheManager(); EntryCache entryCache = ledger.entryCache; + entryCache.clear(); ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); ManagedCursorImpl c2 = (ManagedCursorImpl) ledger.openCursor("c2"); @@ -1798,33 +1658,101 @@ public void invalidateConsumedEntriesFromCache() throws Exception { assertEquals(entryCache.getSize(), 7 * 4); assertEquals(cacheManager.getSize(), entryCache.getSize()); + ledger.doCacheEviction(0); + assertEquals(entryCache.getSize(), 7 * 4); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + c2.setReadPosition(p3); - ledger.discardEntriesFromCache(c2, p2); + ledger.doCacheEviction(0); assertEquals(entryCache.getSize(), 7 * 4); assertEquals(cacheManager.getSize(), entryCache.getSize()); c1.setReadPosition(p2); - ledger.discardEntriesFromCache(c1, p2); + ledger.doCacheEviction(0); assertEquals(entryCache.getSize(), 7 * 3); assertEquals(cacheManager.getSize(), entryCache.getSize()); c1.setReadPosition(p3); - ledger.discardEntriesFromCache(c1, p3); - assertEquals(entryCache.getSize(), 7 * 3); + ledger.doCacheEviction(0); + assertEquals(entryCache.getSize(), 7 * 2); assertEquals(cacheManager.getSize(), entryCache.getSize()); ledger.deactivateCursor(c1); - assertEquals(entryCache.getSize(), 7 * 3); // as c2.readPosition=p3 => Cache contains p3,p4 + ledger.doCacheEviction(0); + assertEquals(entryCache.getSize(), 7 * 2); // as c2.readPosition=p3 => Cache contains p3,p4 + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + c2.setReadPosition(p4); + ledger.doCacheEviction(0); + assertEquals(entryCache.getSize(), 7); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + ledger.deactivateCursor(c2); + ledger.doCacheEviction(0); + assertEquals(entryCache.getSize(), 0); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + } + + @Test + public void invalidateEntriesFromCacheByMarkDeletePosition() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setCacheEvictionByMarkDeletedPosition(true); + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateEntriesFromCacheByMarkDeletePosition", + config); + + EntryCacheManager cacheManager = factory.getEntryCacheManager(); + EntryCache entryCache = ledger.entryCache; + entryCache.clear(); + + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); + ManagedCursorImpl c2 = (ManagedCursorImpl) ledger.openCursor("c2"); + + PositionImpl p1 = (PositionImpl) ledger.addEntry("entry-1".getBytes()); + PositionImpl p2 = (PositionImpl) ledger.addEntry("entry-2".getBytes()); + PositionImpl p3 = (PositionImpl) ledger.addEntry("entry-3".getBytes()); + PositionImpl p4 = (PositionImpl) ledger.addEntry("entry-4".getBytes()); + + assertEquals(entryCache.getSize(), 7 * 4); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + ledger.doCacheEviction(0); + assertEquals(entryCache.getSize(), 7 * 4); assertEquals(cacheManager.getSize(), entryCache.getSize()); + c2.setReadPosition(p4); - ledger.discardEntriesFromCache(c2, p4); + c2.markDelete(p3); + ledger.doCacheEviction(0); + + assertEquals(entryCache.getSize(), 7 * 4); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + c1.setReadPosition(p3); + c1.markDelete(p2); + ledger.doCacheEviction(0); + assertEquals(entryCache.getSize(), 7 * 3); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + c1.setReadPosition(p4); + c1.markDelete(p3); + ledger.doCacheEviction(0); + assertEquals(entryCache.getSize(), 7 * 2); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + ledger.deactivateCursor(c1); + ledger.doCacheEviction(0); + assertEquals(entryCache.getSize(), 7 * 2); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + c2.markDelete(p4); + ledger.doCacheEviction(0); assertEquals(entryCache.getSize(), 7); assertEquals(cacheManager.getSize(), entryCache.getSize()); ledger.deactivateCursor(c2); + ledger.doCacheEviction(0); assertEquals(entryCache.getSize(), 0); assertEquals(cacheManager.getSize(), entryCache.getSize()); } @@ -2104,10 +2032,10 @@ public void testMaximumRolloverTime() throws Exception { ledger.addEntry("data".getBytes()); ledger.addEntry("data".getBytes()); - + Awaitility.await().untilAsserted(() -> { assertEquals(ledger.getLedgersInfoAsList().size(), 2); - }); + }); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index 0fd8902f8253e..1e960c32bf678 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -26,6 +26,7 @@ import lombok.SneakyThrows; import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -79,6 +80,9 @@ public final void setUp(Method method) throws Exception { throw e; } + ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); + // increase default cache eviction interval so that caching could be tested with less flakyness + managedLedgerFactoryConfig.setCacheEvictionIntervalMs(200); factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); setUpTestCase();