Skip to content

Commit

Permalink
Add a cache eviction policy:Evicting cache data by the slowest markDe…
Browse files Browse the repository at this point in the history
…letedPosition (apache#14985)
  • Loading branch information
lordcheng10 authored and nicklixinyang committed Apr 20, 2022
1 parent 04185f7 commit dd69e00
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand Down Expand Up @@ -75,6 +77,9 @@ public class ManagedLedgerConfig {
private ManagedLedgerInterceptor managedLedgerInterceptor;
private Map<String, String> properties;
private int inactiveLedgerRollOverTimeMs = 0;
@Getter
@Setter
private boolean cacheEvictionByMarkDeletedPosition = false;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public PositionImpl getSlowestReadPositionForActiveCursors() {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
}

public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition();
}

public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2166,10 +2166,15 @@ void doCacheEviction(long maxTimestamp) {
if (entryCache.getSize() <= 0) {
return;
}
// Always remove all entries already read by active cursors
PositionImpl slowestReaderPos = getEarlierReadPositionForActiveCursors();
if (slowestReaderPos != null) {
entryCache.invalidateEntries(slowestReaderPos);
PositionImpl evictionPos;
if (config.isCacheEvictionByMarkDeletedPosition()) {
evictionPos = getEarlierMarkDeletedPositionForActiveCursors().getNext();
} else {
// Always remove all entries already read by active cursors
evictionPos = getEarlierReadPositionForActiveCursors();
}
if (evictionPos != null) {
entryCache.invalidateEntries(evictionPos);
}

// Remove entries older than the cutoff threshold
Expand All @@ -2188,6 +2193,18 @@ private PositionImpl getEarlierReadPositionForActiveCursors() {
return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
}

private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
if (nonDurablePosition == null) {
return durablePosition;
}
if (durablePosition == null) {
return nonDurablePosition;
}
return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
}

void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, newPosition);
if (pair == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,204 @@ public void acknowledge1() throws Exception {
ledger.close();
}

@Test
public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
CompletableFuture<Boolean> result = new CompletableFuture<>();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setCacheEvictionByMarkDeletedPosition(true);
factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
.toNanos(30000));
factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
ManagedLedger ledger = (ManagedLedger) ctx;
String message1 = "test";
ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
@SuppressWarnings("unchecked")
Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
ManagedLedger ledger = pair.getLeft();
ManagedCursor cursor = pair.getRight();
if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
result.complete(false);
return;
}
cursor.asyncReadEntries(1, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
ManagedCursor cursor = (ManagedCursor) ctx;
assertEquals(entries.size(), 1);
Entry entry = entries.get(0);
final Position position = entry.getPosition();
if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) {
result.complete(false);
return;
}
((ManagedLedgerImpl) ledger).doCacheEviction(
System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
result.complete(false);
return;
}

log.debug("Mark-Deleting to position {}", position);
cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
log.debug("Mark delete complete");
ManagedCursor cursor = (ManagedCursor) ctx;
if (cursor.hasMoreEntries()) {
result.complete(false);
return;
}
((ManagedLedgerImpl) ledger).doCacheEviction(
System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
result.completeExceptionally(exception);
}

}, cursor);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
result.completeExceptionally(exception);
}
}, cursor, PositionImpl.LATEST);
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
result.completeExceptionally(exception);
}
}, Pair.of(ledger, cursor));
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
result.completeExceptionally(exception);
}

}, ledger);
}

@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
result.completeExceptionally(exception);
}
}, null, null);

assertTrue(result.get());

log.info("Test completed");
}

@Test
public void testCacheEvictionByReadPosition() throws Throwable {
CompletableFuture<Boolean> result = new CompletableFuture<>();
ManagedLedgerConfig config = new ManagedLedgerConfig();
factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
.toNanos(30000));
factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
ManagedLedger ledger = (ManagedLedger) ctx;
String message1 = "test";
ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
@SuppressWarnings("unchecked")
Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
ManagedLedger ledger = pair.getLeft();
ManagedCursor cursor = pair.getRight();
if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
result.complete(false);
return;
}

cursor.asyncReadEntries(1, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
ManagedCursor cursor = (ManagedCursor) ctx;
assertEquals(entries.size(), 1);
Entry entry = entries.get(0);
final Position position = entry.getPosition();
if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) {
result.complete(false);
return;
}
((ManagedLedgerImpl) ledger).doCacheEviction(
System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
if (((ManagedLedgerImpl) ledger).getCacheSize() != 0) {
result.complete(false);
return;
}

log.debug("Mark-Deleting to position {}", position);
cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
log.debug("Mark delete complete");
ManagedCursor cursor = (ManagedCursor) ctx;
if (cursor.hasMoreEntries()) {
result.complete(false);
return;
}
result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
result.completeExceptionally(exception);
}

}, cursor);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
result.completeExceptionally(exception);
}
}, cursor, PositionImpl.LATEST);
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
result.completeExceptionally(exception);
}
}, Pair.of(ledger, cursor));
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
result.completeExceptionally(exception);
}

}, ledger);
}

@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
result.completeExceptionally(exception);
}
}, null, null);

assertTrue(result.get());

log.info("Test completed");
}

@Test(timeOut = 20000)
public void asyncAPI() throws Throwable {
final CountDownLatch counter = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2436,6 +2436,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0;

@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. "
+ "The default is to evict through readPosition."
)
private boolean cacheEvictionByMarkDeletedPosition = false;

/**** --- Transaction config variables. --- ****/
@FieldContext(
category = CATEGORY_TRANSACTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1599,6 +1599,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
managedLedgerConfig.setInactiveLedgerRollOverTime(
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
serviceConfig.isCacheEvictionByMarkDeletedPosition());

OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
Expand Down

0 comments on commit dd69e00

Please sign in to comment.