Skip to content

Commit

Permalink
Optimize the memory usage of Cache Eviction (#12045)
Browse files Browse the repository at this point in the history
  • Loading branch information
315157973 authored Sep 30, 2021
1 parent c823c08 commit 0c22e0f
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,23 @@ private static class Item {
}
}

// Used to keep track of slowest cursor. Contains all of all the cursors except for non-durable cursors
// Since we do need to keep track of non-durable cursors.
public enum CursorType {
DurableCursor,
NonDurableCursor,
ALL
}

public ManagedCursorContainer() {
cursorType = CursorType.DurableCursor;
}

public ManagedCursorContainer(CursorType cursorType) {
this.cursorType = cursorType;
}

private final CursorType cursorType;

// Used to keep track of slowest cursor. Contains all of all active cursors.
private final ArrayList<Item> heap = Lists.newArrayList();

// Maps a cursor to its position in the heap
Expand All @@ -76,8 +91,7 @@ public void add(ManagedCursor cursor) {
Item item = new Item(cursor, heap.size());
cursors.put(cursor.getName(), item);

// don't need to add non-durable cursors
if (cursor.isDurable()) {
if (shouldTrackInHeap(cursor)) {
heap.add(item);
siftUp(item);
}
Expand All @@ -86,6 +100,16 @@ public void add(ManagedCursor cursor) {
}
}

private boolean shouldTrackInHeap(ManagedCursor cursor) {
return CursorType.ALL.equals(cursorType)
|| (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
|| (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
}

public PositionImpl getSlowestReadPositionForActiveCursors() {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
}

public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
Expand All @@ -101,7 +125,7 @@ public void removeCursor(String name) {
try {
Item item = cursors.remove(name);

if (item.cursor.isDurable()) {
if (shouldTrackInHeap(item.cursor)) {
// Move the item to the right end of the heap to be removed
Item lastItem = heap.get(heap.size() - 1);
swap(item, lastItem);
Expand Down Expand Up @@ -132,7 +156,7 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi
}


if (item.cursor.isDurable()) {
if (shouldTrackInHeap(item.cursor)) {
PositionImpl previousSlowestConsumer = heap.get(0).position;

// When the cursor moves forward, we need to push it toward the
Expand All @@ -146,7 +170,7 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi
}

PositionImpl newSlowestConsumer = heap.get(0).position;
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
}
return null;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -160,6 +159,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

private final ManagedCursorContainer cursors = new ManagedCursorContainer();
private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
private final ManagedCursorContainer nonDurableActiveCursors =
new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);

// Ever increasing counter of entries added
@VisibleForTesting
Expand Down Expand Up @@ -2102,6 +2103,9 @@ void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition)
}

void doCacheEviction(long maxTimestamp) {
if (entryCache.getSize() <= 0) {
return;
}
// Always remove all entries already read by active cursors
PositionImpl slowestReaderPos = getEarlierReadPositionForActiveCursors();
if (slowestReaderPos != null) {
Expand All @@ -2113,17 +2117,15 @@ void doCacheEviction(long maxTimestamp) {
}

private PositionImpl getEarlierReadPositionForActiveCursors() {
PositionImpl smallest = null;
for (ManagedCursor cursor : activeCursors) {
PositionImpl p = (PositionImpl) cursor.getReadPosition();
if (smallest == null) {
smallest = p;
} else if (p.compareTo(smallest) < 0) {
smallest = p;
}
PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors();
if (nonDurablePosition == null) {
return durablePosition;
}

return smallest;
if (durablePosition == null) {
return nonDurablePosition;
}
return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
}

void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
Expand Down Expand Up @@ -3375,6 +3377,9 @@ public void activateCursor(ManagedCursor cursor) {
if (activeCursors.get(cursor.getName()) == null) {
activeCursors.add(cursor);
}
if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) {
nonDurableActiveCursors.add(cursor);
}
}

public void deactivateCursor(ManagedCursor cursor) {
Expand All @@ -3391,6 +3396,9 @@ public void deactivateCursor(ManagedCursor cursor) {
getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition()));
}
}
if (!cursor.isDurable()) {
nonDurableActiveCursors.removeCursor(cursor.getName());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
Expand Down Expand Up @@ -386,6 +387,45 @@ public boolean checkAndUpdateReadPositionChanged() {
}
}

@Test
public void testSlowestReadPositionForActiveCursors() throws Exception {
ManagedCursorContainer container =
new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
assertNull(container.getSlowestReadPositionForActiveCursors());

// Add no durable cursor
PositionImpl position = PositionImpl.get(5,5);
ManagedCursor cursor1 = spy(new MockManagedCursor(container, "test1", position));
doReturn(false).when(cursor1).isDurable();
doReturn(position).when(cursor1).getReadPosition();
container.add(cursor1);
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));

// Add no durable cursor
position = PositionImpl.get(1,1);
ManagedCursor cursor2 = spy(new MockManagedCursor(container, "test2", position));
doReturn(false).when(cursor2).isDurable();
doReturn(position).when(cursor2).getReadPosition();
container.add(cursor2);
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1));

// Move forward cursor, cursor1 = 5:5 , cursor2 = 5:6, slowest is 5:5
position = PositionImpl.get(5,6);
container.cursorUpdated(cursor2, position);
doReturn(position).when(cursor2).getReadPosition();
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));

// Move forward cursor, cursor1 = 5:8 , cursor2 = 5:6, slowest is 5:6
position = PositionImpl.get(5,8);
doReturn(position).when(cursor1).getReadPosition();
container.cursorUpdated(cursor1, position);
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 6));

// Remove cursor, only cursor1 left, cursor1 = 5:8
container.removeCursor(cursor2.getName());
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 8));
}

@Test
public void simple() throws Exception {
ManagedCursorContainer container = new ManagedCursorContainer();
Expand Down

0 comments on commit 0c22e0f

Please sign in to comment.