diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index 4c3d413437944..22fef9621b4be 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -19,7 +19,6 @@ package org.apache.bookkeeper.mledger.impl; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; @@ -31,17 +30,16 @@ import org.apache.commons.lang3.tuple.Pair; /** - * Contains all the cursors for a ManagedLedger. + * Contains cursors for a ManagedLedger. * *

The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep. * - *

This data structure maintains a list and a map of cursors. The map is used to relate a cursor name with an entry - * in the linked-list. The list is a sorted double linked-list of cursors. + *

This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with + * an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented + * in a single array. More details about heap implementations: + * https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation * - *

When a cursor is markDeleted, this list is updated and the cursor is moved in its new position. - * - *

To minimize the moving around, the order is maintained using the ledgerId, but not the entryId, since we only - * care about ledgers to be deleted. + *

The heap is updated and kept sorted when a cursor is updated. * */ public class ManagedCursorContainer implements Iterable { @@ -51,67 +49,53 @@ private static class Item { PositionImpl position; int idx; - Item(ManagedCursor cursor, int idx) { + Item(ManagedCursor cursor, PositionImpl position, int idx) { this.cursor = cursor; - this.position = (PositionImpl) cursor.getMarkDeletedPosition(); + this.position = position; this.idx = idx; } } - public enum CursorType { - DurableCursor, - NonDurableCursor, - ALL - } - public ManagedCursorContainer() { - cursorType = CursorType.DurableCursor; - } - public ManagedCursorContainer(CursorType cursorType) { - this.cursorType = cursorType; } - - private final CursorType cursorType; - - // Used to keep track of slowest cursor. Contains all of all active cursors. - private final ArrayList heap = Lists.newArrayList(); + // Used to keep track of slowest cursor. + private final ArrayList heap = new ArrayList<>(); // Maps a cursor to its position in the heap private final ConcurrentMap cursors = new ConcurrentSkipListMap<>(); private final StampedLock rwLock = new StampedLock(); - public void add(ManagedCursor cursor) { + private int durableCursorCount; + + + /** + * Add a cursor to the container. The cursor will be optionally tracked for the slowest reader when + * a position is passed as the second argument. It is expected that the position is updated with + * {@link #cursorUpdated(ManagedCursor, Position)} method when the position changes. + * + * @param cursor cursor to add + * @param position position of the cursor to use for ordering, pass null if the cursor's position shouldn't be + * tracked for the slowest reader. + */ + public void add(ManagedCursor cursor, Position position) { long stamp = rwLock.writeLock(); try { - // Append a new entry at the end of the list - Item item = new Item(cursor, heap.size()); + Item item = new Item(cursor, (PositionImpl) position, position != null ? heap.size() : -1); cursors.put(cursor.getName(), item); - - if (shouldTrackInHeap(cursor)) { + if (position != null) { heap.add(item); siftUp(item); } + if (cursor.isDurable()) { + durableCursorCount++; + } } finally { rwLock.unlockWrite(stamp); } } - private boolean shouldTrackInHeap(ManagedCursor cursor) { - return CursorType.ALL.equals(cursorType) - || (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType)) - || (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType)); - } - - public PositionImpl getSlowestReadPositionForActiveCursors() { - return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition(); - } - - public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() { - return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition(); - } - public ManagedCursor get(String name) { long stamp = rwLock.readLock(); try { @@ -122,17 +106,25 @@ public ManagedCursor get(String name) { } } - public void removeCursor(String name) { + public boolean removeCursor(String name) { long stamp = rwLock.writeLock(); try { Item item = cursors.remove(name); - if (item != null && shouldTrackInHeap(item.cursor)) { - // Move the item to the right end of the heap to be removed - Item lastItem = heap.get(heap.size() - 1); - swap(item, lastItem); - heap.remove(item.idx); - // Update the heap - siftDown(lastItem); + if (item != null) { + if (item.idx >= 0) { + // Move the item to the right end of the heap to be removed + Item lastItem = heap.get(heap.size() - 1); + swap(item, lastItem); + heap.remove(item.idx); + // Update the heap + siftDown(lastItem); + } + if (item.cursor.isDurable()) { + durableCursorCount--; + } + return true; + } else { + return false; } } finally { rwLock.unlockWrite(stamp); @@ -140,10 +132,15 @@ public void removeCursor(String name) { } /** - * Signal that a cursor position has been updated and that the container must re-order the cursor list. + * Signal that a cursor position has been updated and that the container must re-order the cursor heap + * tracking the slowest reader. + * Only those cursors are tracked and can be updated which were added to the container with the + * {@link #add(ManagedCursor, Position)} method that specified the initial position in the position + * parameter. * - * @param cursor - * @return a pair of positions, representing the previous slowest consumer and the new slowest consumer (after the + * @param cursor the cursor to update the position for + * @param newPosition the updated position for the cursor + * @return a pair of positions, representing the previous slowest reader and the new slowest reader (after the * update). */ public Pair cursorUpdated(ManagedCursor cursor, Position newPosition) { @@ -152,35 +149,33 @@ public Pair cursorUpdated(ManagedCursor cursor, Posi long stamp = rwLock.writeLock(); try { Item item = cursors.get(cursor.getName()); - if (item == null) { + if (item == null || item.idx == -1) { return null; } + PositionImpl previousSlowestConsumer = heap.get(0).position; - if (shouldTrackInHeap(item.cursor)) { - PositionImpl previousSlowestConsumer = heap.get(0).position; - + item.position = (PositionImpl) newPosition; + if (heap.size() > 1) { // When the cursor moves forward, we need to push it toward the // bottom of the tree and push it up if a reset was done - item.position = (PositionImpl) newPosition; if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) { siftDown(item); } else { siftUp(item); } - - PositionImpl newSlowestConsumer = heap.get(0).position; - return Pair.of(previousSlowestConsumer, newSlowestConsumer); } - return null; + + PositionImpl newSlowestConsumer = heap.get(0).position; + return Pair.of(previousSlowestConsumer, newSlowestConsumer); } finally { rwLock.unlockWrite(stamp); } } /** - * Get the slowest reader position, meaning older acknowledged position between all the cursors. + * Get the slowest reader position for the cursors that are ordered. * * @return the slowest reader position */ @@ -228,18 +223,18 @@ public boolean isEmpty() { */ public boolean hasDurableCursors() { long stamp = rwLock.tryOptimisticRead(); - boolean isEmpty = heap.isEmpty(); + int count = durableCursorCount; if (!rwLock.validate(stamp)) { // Fallback to read lock stamp = rwLock.readLock(); try { - isEmpty = heap.isEmpty(); + count = durableCursorCount; } finally { rwLock.unlockRead(stamp); } } - return !isEmpty; + return count > 0; } @Override @@ -282,7 +277,7 @@ public ManagedCursor next() { @Override public void remove() { - throw new IllegalArgumentException("Cannot remove ManagedCursor form container"); + throw new IllegalArgumentException("Cannot remove ManagedCursor from container"); } }; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index ab97c10b68791..442bece636cb6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -572,6 +572,7 @@ private void recoveredCursor(PositionImpl position, Map properties persistentMarkDeletePosition = position; inProgressMarkDeletePersistPosition = null; readPosition = ledger.getNextValidPosition(position); + ledger.onCursorReadPositionUpdated(this, readPosition); lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null); // assign cursor-ledger so, it can be deleted when new ledger will be switched this.cursorLedger = recoveredFromCursorLedger; @@ -1129,6 +1130,7 @@ public void operationComplete() { ledger.getName(), newPosition, oldReadPosition, name); } readPosition = newPosition; + ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition); } finally { lock.writeLock().unlock(); } @@ -1613,6 +1615,7 @@ boolean hasMoreEntries(PositionImpl position) { void initializeCursorPosition(Pair lastPositionCounter) { readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft()); + ledger.onCursorReadPositionUpdated(this, readPosition); markDeletePosition = lastPositionCounter.getLeft(); lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null); persistentMarkDeletePosition = null; @@ -1687,6 +1690,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) { log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition); } + ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition); return newReadPosition; } else { return currentReadPosition; @@ -1905,7 +1909,7 @@ public void operationComplete() { lock.writeLock().unlock(); } - ledger.updateCursor(ManagedCursorImpl.this, mdEntry.newPosition); + ledger.onCursorMarkDeletePositionUpdated(ManagedCursorImpl.this, mdEntry.newPosition); decrementPendingMarkDeleteCount(); @@ -2269,6 +2273,7 @@ public void rewind() { log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition); readPosition = newReadPosition; + ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition); } finally { lock.writeLock().unlock(); } @@ -2286,6 +2291,7 @@ public void seek(Position newReadPositionInt, boolean force) { newReadPosition = ledger.getNextValidPosition(markDeletePosition); } readPosition = newReadPosition; + ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition); } finally { lock.writeLock().unlock(); } @@ -2485,6 +2491,7 @@ void setReadPosition(Position newReadPositionInt) { if (this.markDeletePosition == null || ((PositionImpl) newReadPositionInt).compareTo(this.markDeletePosition) > 0) { this.readPosition = (PositionImpl) newReadPositionInt; + ledger.onCursorReadPositionUpdated(this, newReadPositionInt); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f26698856d266..9c5c21d5f150d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -162,10 +162,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected final NavigableMap ledgers = new ConcurrentSkipListMap<>(); private volatile Stat ledgersStat; + // contains all cursors, where durable cursors are ordered by mark delete position private final ManagedCursorContainer cursors = new ManagedCursorContainer(); + // contains active cursors eligible for caching, + // ordered by read position (when cacheEvictionByMarkDeletedPosition=false) or by mark delete position + // (when cacheEvictionByMarkDeletedPosition=true) private final ManagedCursorContainer activeCursors = new ManagedCursorContainer(); - private final ManagedCursorContainer nonDurableActiveCursors = - new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor); + // Ever increasing counter of entries added @VisibleForTesting @@ -540,7 +543,7 @@ public void operationComplete() { log.info("[{}] Recovery for cursor {} completed. pos={} -- todo={}", name, cursorName, cursor.getMarkDeletedPosition(), cursorCount.get() - 1); cursor.setActive(); - cursors.add(cursor); + addCursor(cursor); if (cursorCount.decrementAndGet() == 0) { // The initialization is now completed, register the jmx mbean @@ -574,7 +577,7 @@ public void operationComplete() { cursor.getMarkDeletedPosition(), cursorCount.get() - 1); cursor.setActive(); synchronized (ManagedLedgerImpl.this) { - cursors.add(cursor); + addCursor(cursor); uninitializedCursors.remove(cursor.getName()).complete(cursor); } } @@ -601,6 +604,17 @@ public void operationFailed(MetaStoreException e) { }); } + private void addCursor(ManagedCursorImpl cursor) { + Position positionForOrdering = null; + if (cursor.isDurable()) { + positionForOrdering = cursor.getMarkDeletedPosition(); + if (positionForOrdering == null) { + positionForOrdering = PositionImpl.earliest; + } + } + cursors.add(cursor, positionForOrdering); + } + @Override public String getName() { return name; @@ -935,7 +949,7 @@ public void operationComplete() { : getFirstPositionAndCounter()); synchronized (ManagedLedgerImpl.this) { - cursors.add(cursor); + addCursor(cursor); uninitializedCursors.remove(cursorName).complete(cursor); } callback.openCursorComplete(cursor, ctx); @@ -963,6 +977,7 @@ public synchronized void asyncDeleteCursor(final String consumerName, final Dele return; } else if (!cursor.isDurable()) { cursors.removeCursor(consumerName); + deactivateCursorByName(consumerName); callback.deleteCursorComplete(ctx); return; } @@ -974,17 +989,7 @@ public synchronized void asyncDeleteCursor(final String consumerName, final Dele public void operationComplete(Void result, Stat stat) { cursor.asyncDeleteCursorLedger(); cursors.removeCursor(consumerName); - - // Redo invalidation of entries in cache - PositionImpl slowestConsumerPosition = cursors.getSlowestReaderPosition(); - if (slowestConsumerPosition != null) { - if (log.isDebugEnabled()) { - log.debug("Doing cache invalidation up to {}", slowestConsumerPosition); - } - entryCache.invalidateEntries(slowestConsumerPosition); - } else { - entryCache.clear(); - } + deactivateCursorByName(consumerName); trimConsumedLedgersInBackground(); @@ -1067,7 +1072,7 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu log.info("[{}] Opened new cursor: {}", name, cursor); synchronized (this) { - cursors.add(cursor); + addCursor(cursor); } return cursor; @@ -2123,57 +2128,36 @@ public boolean hasMoreEntries(PositionImpl position) { return result; } - void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition) { - Pair pair = activeCursors.cursorUpdated(cursor, newPosition); - if (pair != null) { - entryCache.invalidateEntries(pair.getRight()); + void doCacheEviction(long maxTimestamp) { + if (entryCache.getSize() > 0) { + entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp); } } - void doCacheEviction(long maxTimestamp) { + // slowest reader position is earliest mark delete position when cacheEvictionByMarkDeletedPosition=true + // it is the earliest read position when cacheEvictionByMarkDeletedPosition=false + private void invalidateEntriesUpToSlowestReaderPosition() { if (entryCache.getSize() <= 0) { return; } - PositionImpl evictionPos; - if (config.isCacheEvictionByMarkDeletedPosition()) { - evictionPos = getEarlierMarkDeletedPositionForActiveCursors().getNext(); + if (!activeCursors.isEmpty()) { + PositionImpl evictionPos = activeCursors.getSlowestReaderPosition(); + if (evictionPos != null) { + entryCache.invalidateEntries(evictionPos); + } } else { - // Always remove all entries already read by active cursors - evictionPos = getEarlierReadPositionForActiveCursors(); - } - if (evictionPos != null) { - entryCache.invalidateEntries(evictionPos); - } - - // Remove entries older than the cutoff threshold - entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp); - } - - private PositionImpl getEarlierReadPositionForActiveCursors() { - PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors(); - PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors(); - if (nonDurablePosition == null) { - return durablePosition; - } - if (durablePosition == null) { - return nonDurablePosition; + entryCache.clear(); } - return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; } - private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() { - PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors(); - PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors(); - if (nonDurablePosition == null) { - return durablePosition; + void onCursorMarkDeletePositionUpdated(ManagedCursorImpl cursor, PositionImpl newPosition) { + if (config.isCacheEvictionByMarkDeletedPosition()) { + updateActiveCursor(cursor, newPosition); } - if (durablePosition == null) { - return nonDurablePosition; + if (!cursor.isDurable()) { + // non-durable cursors aren't tracked for trimming + return; } - return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; - } - - void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { Pair pair = cursors.cursorUpdated(cursor, newPosition); if (pair == null) { // Cursor has been removed in the meantime @@ -2195,6 +2179,20 @@ void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { } } + private void updateActiveCursor(ManagedCursorImpl cursor, Position newPosition) { + Pair slowestPositions = activeCursors.cursorUpdated(cursor, newPosition); + if (slowestPositions != null + && !slowestPositions.getLeft().equals(slowestPositions.getRight())) { + invalidateEntriesUpToSlowestReaderPosition(); + } + } + + public void onCursorReadPositionUpdated(ManagedCursorImpl cursor, Position newReadPosition) { + if (!config.isCacheEvictionByMarkDeletedPosition()) { + updateActiveCursor(cursor, newReadPosition); + } + } + PositionImpl startReadOperationOnLedger(PositionImpl position) { Long ledgerId = ledgers.ceilingKey(position.getLedgerId()); if (ledgerId != null && ledgerId != position.getLedgerId()) { @@ -2255,7 +2253,7 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { if (!lastAckedPosition.equals((PositionImpl) cursor.getMarkDeletedPosition())) { try { log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); - updateCursor((ManagedCursorImpl) cursor, lastAckedPosition); + onCursorMarkDeletePositionUpdated((ManagedCursorImpl) cursor, lastAckedPosition); } catch (Exception e) { log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.", cursor, cursor.getMarkDeletedPosition(), lastAckedPosition); @@ -3474,34 +3472,32 @@ Pair getFirstPositionAndCounter() { } public void activateCursor(ManagedCursor cursor) { - if (activeCursors.get(cursor.getName()) == null) { - activeCursors.add(cursor); - } - if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) { - nonDurableActiveCursors.add(cursor); + synchronized (activeCursors) { + if (activeCursors.get(cursor.getName()) == null) { + Position positionForOrdering = config.isCacheEvictionByMarkDeletedPosition() + ? cursor.getMarkDeletedPosition() + : cursor.getReadPosition(); + if (positionForOrdering == null) { + positionForOrdering = PositionImpl.earliest; + } + activeCursors.add(cursor, positionForOrdering); + } } } public void deactivateCursor(ManagedCursor cursor) { + deactivateCursorByName(cursor.getName()); + } + + private void deactivateCursorByName(String cursorName) { synchronized (activeCursors) { - if (activeCursors.get(cursor.getName()) != null) { - activeCursors.removeCursor(cursor.getName()); - if (!activeCursors.hasDurableCursors()) { - // cleanup cache if there is no active subscription - entryCache.clear(); - } else { - // if removed subscription was the slowest subscription : update cursor and let it clear cache: - // till new slowest-cursor's read-position - discardEntriesFromCache((ManagedCursorImpl) activeCursors.getSlowestReader(), - getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition())); - } - } - if (!cursor.isDurable()) { - nonDurableActiveCursors.removeCursor(cursor.getName()); + if (activeCursors.removeCursor(cursorName)) { + invalidateEntriesUpToSlowestReaderPosition(); } } } + public void removeWaitingCursor(ManagedCursor cursor) { this.waitingCursors.remove(cursor); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 4625f5b58006a..c74a7299eea51 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -103,7 +103,7 @@ protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map e.release()); + + entries.forEach(Entry::release); factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); - assertEquals(factory2.getMbean().getCacheUsedSize(), 7); + assertEquals(factory2.getMbean().getCacheUsedSize(), 0); assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0); assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0); assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 56de80308e5d1..8a4d00a722248 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -395,41 +395,40 @@ public boolean isClosed() { @Test public void testSlowestReadPositionForActiveCursors() throws Exception { - ManagedCursorContainer container = - new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor); - assertNull(container.getSlowestReadPositionForActiveCursors()); + ManagedCursorContainer container = new ManagedCursorContainer(); + assertNull(container.getSlowestReaderPosition()); // Add no durable cursor PositionImpl position = PositionImpl.get(5,5); ManagedCursor cursor1 = spy(new MockManagedCursor(container, "test1", position)); doReturn(false).when(cursor1).isDurable(); doReturn(position).when(cursor1).getReadPosition(); - container.add(cursor1); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5)); + container.add(cursor1, position); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5)); // Add no durable cursor position = PositionImpl.get(1,1); ManagedCursor cursor2 = spy(new MockManagedCursor(container, "test2", position)); doReturn(false).when(cursor2).isDurable(); doReturn(position).when(cursor2).getReadPosition(); - container.add(cursor2); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1)); + container.add(cursor2, position); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1)); // Move forward cursor, cursor1 = 5:5 , cursor2 = 5:6, slowest is 5:5 position = PositionImpl.get(5,6); container.cursorUpdated(cursor2, position); doReturn(position).when(cursor2).getReadPosition(); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5)); // Move forward cursor, cursor1 = 5:8 , cursor2 = 5:6, slowest is 5:6 position = PositionImpl.get(5,8); doReturn(position).when(cursor1).getReadPosition(); container.cursorUpdated(cursor1, position); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 6)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6)); // Remove cursor, only cursor1 left, cursor1 = 5:8 container.removeCursor(cursor2.getName()); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 8)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 8)); } @Test @@ -438,25 +437,25 @@ public void simple() throws Exception { assertNull(container.getSlowestReaderPosition()); ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5)); - container.add(cursor1); + container.add(cursor1, cursor1.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5)); ManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2)); - container.add(cursor2); + container.add(cursor2, cursor2.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2)); ManagedCursor cursor3 = new MockManagedCursor(container, "test3", new PositionImpl(2, 0)); - container.add(cursor3); + container.add(cursor3, cursor3.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0)); assertEquals(container.toString(), "[test1=5:5, test2=2:2, test3=2:0]"); ManagedCursor cursor4 = new MockManagedCursor(container, "test4", new PositionImpl(4, 0)); - container.add(cursor4); + container.add(cursor4, cursor4.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0)); ManagedCursor cursor5 = new MockManagedCursor(container, "test5", new PositionImpl(3, 5)); - container.add(cursor5); + container.add(cursor5, cursor5.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0)); cursor3.markDelete(new PositionImpl(3, 0)); @@ -481,7 +480,7 @@ public void simple() throws Exception { assertFalse(container.hasDurableCursors()); ManagedCursor cursor6 = new MockManagedCursor(container, "test6", new PositionImpl(6, 5)); - container.add(cursor6); + container.add(cursor6, cursor6.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(6, 5)); assertEquals(container.toString(), "[test6=6:5]"); @@ -492,11 +491,11 @@ public void updatingCursorOutsideContainer() throws Exception { ManagedCursorContainer container = new ManagedCursorContainer(); ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5)); - container.add(cursor1); + container.add(cursor1, cursor1.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5)); MockManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2)); - container.add(cursor2); + container.add(cursor2, cursor2.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2)); cursor2.position = new PositionImpl(8, 8); @@ -514,17 +513,17 @@ public void removingCursor() throws Exception { ManagedCursorContainer container = new ManagedCursorContainer(); ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5)); - container.add(cursor1); + container.add(cursor1, cursor1.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5)); assertEquals(container.get("test1"), cursor1); MockManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2)); - container.add(cursor2); + container.add(cursor2, cursor2.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2)); assertEquals(container.get("test2"), cursor2); MockManagedCursor cursor3 = new MockManagedCursor(container, "test3", new PositionImpl(1, 1)); - container.add(cursor3); + container.add(cursor3, cursor3.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1)); assertEquals(container.get("test3"), cursor3); @@ -556,11 +555,11 @@ public void ordering() throws Exception { ManagedCursor cursor4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4)); ManagedCursor cursor5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0)); - container.add(cursor1); - container.add(cursor2); - container.add(cursor3); - container.add(cursor4); - container.add(cursor5); + container.add(cursor1, cursor1.getMarkDeletedPosition()); + container.add(cursor2, cursor2.getMarkDeletedPosition()); + container.add(cursor3, cursor3.getMarkDeletedPosition()); + container.add(cursor4, cursor4.getMarkDeletedPosition()); + container.add(cursor5, cursor5.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1)); container.removeCursor("test2"); @@ -590,11 +589,11 @@ public void orderingWithUpdates() throws Exception { MockManagedCursor c4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4)); MockManagedCursor c5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0)); - container.add(c1); - container.add(c2); - container.add(c3); - container.add(c4); - container.add(c5); + container.add(c1, c1.getMarkDeletedPosition()); + container.add(c2, c2.getMarkDeletedPosition()); + container.add(c3, c3.getMarkDeletedPosition()); + container.add(c4, c4.getMarkDeletedPosition()); + container.add(c5, c5.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1)); @@ -655,11 +654,11 @@ public void orderingWithUpdatesAndReset() throws Exception { MockManagedCursor c4 = new MockManagedCursor(container, "test4", new PositionImpl(6, 4)); MockManagedCursor c5 = new MockManagedCursor(container, "test5", new PositionImpl(7, 0)); - container.add(c1); - container.add(c2); - container.add(c3); - container.add(c4); - container.add(c5); + container.add(c1, c1.getMarkDeletedPosition()); + container.add(c2, c2.getMarkDeletedPosition()); + container.add(c3, c3.getMarkDeletedPosition()); + container.add(c4, c4.getMarkDeletedPosition()); + container.add(c5, c5.getMarkDeletedPosition()); assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 1)); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index ec73bc23807f4..4fe53b5f9ea97 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -107,6 +107,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; @@ -297,202 +298,120 @@ public void acknowledge1() throws Exception { } @Test - public void testCacheEvictionByMarkDeletedPosition() throws Throwable { - CompletableFuture result = new CompletableFuture<>(); + public void shouldKeepEntriesInCacheByEarliestReadPosition() throws ManagedLedgerException, InterruptedException { + // This test case reproduces issue #16054 + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS + .toNanos(30000)); + + // GIVEN an opened ledger with 10 opened cursors + + ManagedLedger ledger = factory.open("test_ledger_for_shouldKeepEntriesInCacheByEarliestReadPosition", + config); + List cursors = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + ManagedCursor cursor = ledger.openCursor("c" + i); + cursors.add(cursor); + } + + ManagedLedgerFactoryMXBean cacheStats = factory.getCacheStats(); + int insertedEntriesCountBefore = (int) cacheStats.getCacheInsertedEntriesCount(); + // AND 100 added entries + + for (int i = 0; i < 100; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + + int insertedEntriesCount = + (int) cacheStats.getCacheInsertedEntriesCount() - insertedEntriesCountBefore; + // EXPECT that 100 entries should have been inserted to the cache + assertEquals(insertedEntriesCount, 100); + + int evictedEntriesCountBefore = (int) cacheStats.getCacheEvictedEntriesCount(); + + // WHEN entries are read for the cursors so that the farthest cursor has most entries read + for (int i = 0; i < 10; i++) { + ManagedCursor cursor = cursors.get(i); + // read entries farther of the earliest cursor + List entries = cursor.readEntries(20 - i); + // mark delete the least for the earliest cursor + cursor.markDelete(entries.get(i).getPosition()); + entries.forEach(Entry::release); + } + + // THEN it is expected that the cache evicts entries to the earliest read position + Thread.sleep(2 * factory.getConfig().getCacheEvictionIntervalMs()); + int evictedEntriesCount = + (int) cacheStats.getCacheEvictedEntriesCount() - evictedEntriesCountBefore; + assertEquals(evictedEntriesCount, 11, + "It is expected that the cache evicts entries to the earliest read position"); + + ledger.close(); + } + + @Test + public void shouldKeepEntriesInCacheByEarliestMarkDeletePosition() throws ManagedLedgerException, InterruptedException { + // This test case reproduces issue #16054 + ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setCacheEvictionByMarkDeletedPosition(true); factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS .toNanos(30000)); - factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { - @Override - public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { - @Override - public void openCursorComplete(ManagedCursor cursor, Object ctx) { - ManagedLedger ledger = (ManagedLedger) ctx; - String message1 = "test"; - ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() { - @Override - public void addComplete(Position position, ByteBuf entryData, Object ctx) { - @SuppressWarnings("unchecked") - Pair pair = (Pair) ctx; - ManagedLedger ledger = pair.getLeft(); - ManagedCursor cursor = pair.getRight(); - if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { - result.complete(false); - return; - } - cursor.asyncReadEntries(1, new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - ManagedCursor cursor = (ManagedCursor) ctx; - assertEquals(entries.size(), 1); - Entry entry = entries.get(0); - final Position position = entry.getPosition(); - if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) { - result.complete(false); - return; - } - ((ManagedLedgerImpl) ledger).doCacheEviction( - System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); - if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { - result.complete(false); - return; - } - log.debug("Mark-Deleting to position {}", position); - cursor.asyncMarkDelete(position, new MarkDeleteCallback() { - @Override - public void markDeleteComplete(Object ctx) { - log.debug("Mark delete complete"); - ManagedCursor cursor = (ManagedCursor) ctx; - if (cursor.hasMoreEntries()) { - result.complete(false); - return; - } - ((ManagedLedgerImpl) ledger).doCacheEviction( - System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); - result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0); - } + // GIVEN an opened ledger with 10 opened cursors - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } + ManagedLedger ledger = factory.open("test_ledger_for_shouldKeepEntriesInCacheByEarliestMarkDeletePosition", + config); + List cursors = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + ManagedCursor cursor = ledger.openCursor("c" + i); + cursors.add(cursor); + } - }, cursor); - } + ManagedLedgerFactoryMXBean cacheStats = factory.getCacheStats(); + int insertedEntriesCountBefore = (int) cacheStats.getCacheInsertedEntriesCount(); - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, cursor, PositionImpl.latest); - } + // AND 100 added entries - @Override - public void addFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, Pair.of(ledger, cursor)); - } + for (int i = 0; i < 100; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } - @Override - public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } + int insertedEntriesCount = + (int) cacheStats.getCacheInsertedEntriesCount() - insertedEntriesCountBefore; + // EXPECT that 100 entries should have been inserted to the cache + assertEquals(insertedEntriesCount, 100); - }, ledger); - } + int evictedEntriesCountBefore = (int) cacheStats.getCacheEvictedEntriesCount(); - @Override - public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { - result.completeExceptionally(exception); - } - }, null, null); + // WHEN entries are read for the cursors so that the farthest cursor has most entries read + Position lastMarkDeletePos = null; + for (int i = 0; i < 10; i++) { + ManagedCursor cursor = cursors.get(i); + // read 50 (+ index) entries for each cursor + List entries = cursor.readEntries(50 + (5 * i)); + // mark delete the most for the earliest cursor + lastMarkDeletePos = entries.get(20 - i).getPosition(); + cursor.markDelete(lastMarkDeletePos); + entries.forEach(Entry::release); + } - assertTrue(result.get()); + Thread.sleep(1000 + 2 * factory.getConfig().getCacheEvictionIntervalMs()); - log.info("Test completed"); - } + ManagedCursorContainer activeCursors = (ManagedCursorContainer) ledger.getActiveCursors(); + assertEquals(activeCursors.getSlowestReaderPosition(), lastMarkDeletePos); -// @Test -// public void testCacheEvictionByReadPosition() throws Throwable { -// CompletableFuture result = new CompletableFuture<>(); -// ManagedLedgerConfig config = new ManagedLedgerConfig(); -// factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS -// .toNanos(30000)); -// factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { -// @Override -// public void openLedgerComplete(ManagedLedger ledger, Object ctx) { -// ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { -// @Override -// public void openCursorComplete(ManagedCursor cursor, Object ctx) { -// ManagedLedger ledger = (ManagedLedger) ctx; -// String message1 = "test"; -// ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() { -// @Override -// public void addComplete(Position position, ByteBuf entryData, Object ctx) { -// @SuppressWarnings("unchecked") -// Pair pair = (Pair) ctx; -// ManagedLedger ledger = pair.getLeft(); -// ManagedCursor cursor = pair.getRight(); -// if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { -// result.complete(false); -// return; -// } -// -// cursor.asyncReadEntries(1, new ReadEntriesCallback() { -// @Override -// public void readEntriesComplete(List entries, Object ctx) { -// ManagedCursor cursor = (ManagedCursor) ctx; -// assertEquals(entries.size(), 1); -// Entry entry = entries.get(0); -// final Position position = entry.getPosition(); -// if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) { -// result.complete(false); -// return; -// } -// ((ManagedLedgerImpl) ledger).doCacheEviction( -// System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); -// if (((ManagedLedgerImpl) ledger).getCacheSize() != 0) { -// result.complete(false); -// return; -// } -// -// log.debug("Mark-Deleting to position {}", position); -// cursor.asyncMarkDelete(position, new MarkDeleteCallback() { -// @Override -// public void markDeleteComplete(Object ctx) { -// log.debug("Mark delete complete"); -// ManagedCursor cursor = (ManagedCursor) ctx; -// if (cursor.hasMoreEntries()) { -// result.complete(false); -// return; -// } -// result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0); -// } -// -// @Override -// public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { -// result.completeExceptionally(exception); -// } -// -// }, cursor); -// } -// -// @Override -// public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { -// result.completeExceptionally(exception); -// } -// }, cursor, PositionImpl.earliest); -// } -// -// @Override -// public void addFailed(ManagedLedgerException exception, Object ctx) { -// result.completeExceptionally(exception); -// } -// }, Pair.of(ledger, cursor)); -// } -// -// @Override -// public void openCursorFailed(ManagedLedgerException exception, Object ctx) { -// result.completeExceptionally(exception); -// } -// -// }, ledger); -// } -// -// @Override -// public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { -// result.completeExceptionally(exception); -// } -// }, null, null); -// -// assertTrue(result.get()); -// -// log.info("Test completed"); -// } + // THEN it is expected that the cache evicts entries to the earliest read position + int evictedEntriesCount = + (int) cacheStats.getCacheEvictedEntriesCount() - evictedEntriesCountBefore; + assertEquals(evictedEntriesCount, 11, + "It is expected that the cache evicts entries to the earliest read position"); + + ledger.close(); + } @Test(timeOut = 20000) public void asyncAPI() throws Throwable { @@ -1720,10 +1639,14 @@ public void testOpenRaceCondition() throws Exception { @Test public void invalidateConsumedEntriesFromCache() throws Exception { - ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger"); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateConsumedEntriesFromCache", + config); EntryCacheManager cacheManager = factory.getEntryCacheManager(); EntryCache entryCache = ledger.entryCache; + entryCache.clear(); ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); ManagedCursorImpl c2 = (ManagedCursorImpl) ledger.openCursor("c2"); @@ -1736,28 +1659,78 @@ public void invalidateConsumedEntriesFromCache() throws Exception { assertEquals(entryCache.getSize(), 7 * 4); assertEquals(cacheManager.getSize(), entryCache.getSize()); + c2.setReadPosition(p3); - ledger.discardEntriesFromCache(c2, p2); assertEquals(entryCache.getSize(), 7 * 4); assertEquals(cacheManager.getSize(), entryCache.getSize()); c1.setReadPosition(p2); - ledger.discardEntriesFromCache(c1, p2); assertEquals(entryCache.getSize(), 7 * 3); assertEquals(cacheManager.getSize(), entryCache.getSize()); c1.setReadPosition(p3); - ledger.discardEntriesFromCache(c1, p3); - assertEquals(entryCache.getSize(), 7 * 3); + assertEquals(entryCache.getSize(), 7 * 2); assertEquals(cacheManager.getSize(), entryCache.getSize()); ledger.deactivateCursor(c1); - assertEquals(entryCache.getSize(), 7 * 3); // as c2.readPosition=p3 => Cache contains p3,p4 + assertEquals(entryCache.getSize(), 7 * 2); // as c2.readPosition=p3 => Cache contains p3,p4 assertEquals(cacheManager.getSize(), entryCache.getSize()); c2.setReadPosition(p4); - ledger.discardEntriesFromCache(c2, p4); + assertEquals(entryCache.getSize(), 7); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + ledger.deactivateCursor(c2); + assertEquals(entryCache.getSize(), 0); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + } + + @Test + public void invalidateEntriesFromCacheByMarkDeletePosition() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setCacheEvictionByMarkDeletedPosition(true); + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateEntriesFromCacheByMarkDeletePosition", + config); + + EntryCacheManager cacheManager = factory.getEntryCacheManager(); + EntryCache entryCache = ledger.entryCache; + entryCache.clear(); + + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); + ManagedCursorImpl c2 = (ManagedCursorImpl) ledger.openCursor("c2"); + + PositionImpl p1 = (PositionImpl) ledger.addEntry("entry-1".getBytes()); + PositionImpl p2 = (PositionImpl) ledger.addEntry("entry-2".getBytes()); + PositionImpl p3 = (PositionImpl) ledger.addEntry("entry-3".getBytes()); + PositionImpl p4 = (PositionImpl) ledger.addEntry("entry-4".getBytes()); + + assertEquals(entryCache.getSize(), 7 * 4); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + + c2.setReadPosition(p4); + c2.markDelete(p3); + + assertEquals(entryCache.getSize(), 7 * 4); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + c1.setReadPosition(p3); + c1.markDelete(p2); + assertEquals(entryCache.getSize(), 7 * 3); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + c1.setReadPosition(p4); + c1.markDelete(p3); + assertEquals(entryCache.getSize(), 7 * 2); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + ledger.deactivateCursor(c1); + assertEquals(entryCache.getSize(), 7 * 2); + assertEquals(cacheManager.getSize(), entryCache.getSize()); + + c2.markDelete(p4); assertEquals(entryCache.getSize(), 7); assertEquals(cacheManager.getSize(), entryCache.getSize()); @@ -2009,11 +1982,13 @@ public void testMaximumRolloverTime() throws Exception { assertEquals(ledger.getLedgersInfoAsList().size(), 1); - Thread.sleep(2000); ledger.addEntry("data".getBytes()); ledger.addEntry("data".getBytes()); - assertEquals(ledger.getLedgersInfoAsList().size(), 2); + + Awaitility.await().untilAsserted(() -> { + assertEquals(ledger.getLedgersInfoAsList().size(), 2); + }); } @Test @@ -2635,8 +2610,8 @@ public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Excepti } // (3) Validate: cache should remove all entries read by both active cursors - log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize()); - assertEquals((5 * totalInsertedEntries), entryCache.getSize()); + log.info("expected, found : {}, {}", 5 * (totalInsertedEntries - readEntries), entryCache.getSize()); + assertEquals(entryCache.getSize(), 5 * (totalInsertedEntries - readEntries)); final int remainingEntries = totalInsertedEntries - readEntries; entries1 = cursor1.readEntries(remainingEntries); @@ -2650,7 +2625,7 @@ public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Excepti // (4) Validate: cursor2 is active cursor and has not read these entries yet: so, cache should not remove these // entries - assertEquals((5 * totalInsertedEntries), entryCache.getSize()); + assertEquals(entryCache.getSize(), 5 * (totalInsertedEntries - readEntries)); ledger.deactivateCursor(cursor1); ledger.deactivateCursor(cursor2); @@ -2675,7 +2650,7 @@ public void testActiveDeactiveCursor() throws Exception { } // (1) Validate: cache not stores entries as no active cursor - assertEquals(0, entryCache.getSize()); + assertEquals(entryCache.getSize(), 0); // Open Cursor also adds cursor into activeCursor-container ManagedCursor cursor1 = ledger.openCursor("c1"); @@ -2688,7 +2663,7 @@ public void testActiveDeactiveCursor() throws Exception { } // (2) Validate: cache stores entries as active cursor has not read message - assertEquals((5 * totalInsertedEntries), entryCache.getSize()); + assertEquals(entryCache.getSize(), 5 * totalInsertedEntries); // read 20 entries List entries1 = cursor1.readEntries(totalInsertedEntries); @@ -2699,7 +2674,7 @@ public void testActiveDeactiveCursor() throws Exception { // (3) Validate: cache discards all entries after all cursors are deactivated ledger.deactivateCursor(cursor1); - assertEquals(0, entryCache.getSize()); + assertEquals(entryCache.getSize(), 0); ledger.close(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index 43241053d72f2..845acb305e124 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -79,6 +80,9 @@ public final void setUp(Method method) throws Exception { throw e; } + ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); + // increase default cache eviction interval so that caching could be tested with less flakyness + managedLedgerFactoryConfig.setCacheEvictionIntervalMs(200); factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); setUpTestCase(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 95c007f0051b1..1e9ed80307dd9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -580,7 +580,7 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{ field.setAccessible(true); ManagedCursorContainer managedCursors = (ManagedCursorContainer) field.get(persistentTopic.getManagedLedger()); managedCursors.removeCursor("transaction-buffer-sub"); - managedCursors.add(managedCursor); + managedCursors.add(managedCursor, managedCursor.getMarkDeletedPosition()); TransactionBuffer buffer1 = new TopicTransactionBuffer(persistentTopic); Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> @@ -603,7 +603,7 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{ return null; }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any()); - managedCursors.add(managedCursor); + managedCursors.add(managedCursor, managedCursor.getMarkDeletedPosition()); TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic); Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(buffer3.getStats().state, "Ready"));