From dd69e0077fbfb2539541fe0c22ea7cf83c516ce7 Mon Sep 17 00:00:00 2001 From: LinChen <1572139390@qq.com> Date: Wed, 6 Apr 2022 15:53:58 +0800 Subject: [PATCH] =?UTF-8?q?Add=20a=20cache=20eviction=20policy=EF=BC=9AEvi?= =?UTF-8?q?cting=20cache=20data=20by=20the=20slowest=20markDeletedPosition?= =?UTF-8?q?=20(#14985)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mledger/ManagedLedgerConfig.java | 5 + .../mledger/impl/ManagedCursorContainer.java | 4 + .../mledger/impl/ManagedLedgerImpl.java | 25 ++- .../mledger/impl/ManagedLedgerTest.java | 198 ++++++++++++++++++ .../pulsar/broker/ServiceConfiguration.java | 7 + .../pulsar/broker/service/BrokerService.java | 2 + 6 files changed, 237 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 1d30e169bfe205..8b0375d23f3f02 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -24,6 +24,8 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.common.annotation.InterfaceAudience; @@ -75,6 +77,9 @@ public class ManagedLedgerConfig { private ManagedLedgerInterceptor managedLedgerInterceptor; private Map properties; private int inactiveLedgerRollOverTimeMs = 0; + @Getter + @Setter + private boolean cacheEvictionByMarkDeletedPosition = false; public boolean isCreateIfMissing() { return createIfMissing; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index 65d254112d1571..ef9a546a507814 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -108,6 +108,10 @@ public PositionImpl getSlowestReadPositionForActiveCursors() { return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition(); } + public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() { + return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition(); + } + public ManagedCursor get(String name) { long stamp = rwLock.readLock(); try { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 37858afe3d8a71..28d6b63a99f439 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2166,10 +2166,15 @@ void doCacheEviction(long maxTimestamp) { if (entryCache.getSize() <= 0) { return; } - // Always remove all entries already read by active cursors - PositionImpl slowestReaderPos = getEarlierReadPositionForActiveCursors(); - if (slowestReaderPos != null) { - entryCache.invalidateEntries(slowestReaderPos); + PositionImpl evictionPos; + if (config.isCacheEvictionByMarkDeletedPosition()) { + evictionPos = getEarlierMarkDeletedPositionForActiveCursors().getNext(); + } else { + // Always remove all entries already read by active cursors + evictionPos = getEarlierReadPositionForActiveCursors(); + } + if (evictionPos != null) { + entryCache.invalidateEntries(evictionPos); } // Remove entries older than the cutoff threshold @@ -2188,6 +2193,18 @@ private PositionImpl getEarlierReadPositionForActiveCursors() { return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; } + private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() { + PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors(); + PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors(); + if (nonDurablePosition == null) { + return durablePosition; + } + if (durablePosition == null) { + return nonDurablePosition; + } + return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; + } + void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { Pair pair = cursors.cursorUpdated(cursor, newPosition); if (pair == null) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 601913c7dfdaf7..2cb14c6c37f4f2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -291,6 +291,204 @@ public void acknowledge1() throws Exception { ledger.close(); } + @Test + public void testCacheEvictionByMarkDeletedPosition() throws Throwable { + CompletableFuture result = new CompletableFuture<>(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setCacheEvictionByMarkDeletedPosition(true); + factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS + .toNanos(30000)); + factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { + @Override + public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + ManagedLedger ledger = (ManagedLedger) ctx; + String message1 = "test"; + ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + @SuppressWarnings("unchecked") + Pair pair = (Pair) ctx; + ManagedLedger ledger = pair.getLeft(); + ManagedCursor cursor = pair.getRight(); + if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { + result.complete(false); + return; + } + cursor.asyncReadEntries(1, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + ManagedCursor cursor = (ManagedCursor) ctx; + assertEquals(entries.size(), 1); + Entry entry = entries.get(0); + final Position position = entry.getPosition(); + if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) { + result.complete(false); + return; + } + ((ManagedLedgerImpl) ledger).doCacheEviction( + System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); + if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { + result.complete(false); + return; + } + + log.debug("Mark-Deleting to position {}", position); + cursor.asyncMarkDelete(position, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + log.debug("Mark delete complete"); + ManagedCursor cursor = (ManagedCursor) ctx; + if (cursor.hasMoreEntries()) { + result.complete(false); + return; + } + ((ManagedLedgerImpl) ledger).doCacheEviction( + System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); + result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + result.completeExceptionally(exception); + } + + }, cursor); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + result.completeExceptionally(exception); + } + }, cursor, PositionImpl.LATEST); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + result.completeExceptionally(exception); + } + }, Pair.of(ledger, cursor)); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + result.completeExceptionally(exception); + } + + }, ledger); + } + + @Override + public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { + result.completeExceptionally(exception); + } + }, null, null); + + assertTrue(result.get()); + + log.info("Test completed"); + } + + @Test + public void testCacheEvictionByReadPosition() throws Throwable { + CompletableFuture result = new CompletableFuture<>(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS + .toNanos(30000)); + factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { + @Override + public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + ManagedLedger ledger = (ManagedLedger) ctx; + String message1 = "test"; + ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + @SuppressWarnings("unchecked") + Pair pair = (Pair) ctx; + ManagedLedger ledger = pair.getLeft(); + ManagedCursor cursor = pair.getRight(); + if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { + result.complete(false); + return; + } + + cursor.asyncReadEntries(1, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + ManagedCursor cursor = (ManagedCursor) ctx; + assertEquals(entries.size(), 1); + Entry entry = entries.get(0); + final Position position = entry.getPosition(); + if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) { + result.complete(false); + return; + } + ((ManagedLedgerImpl) ledger).doCacheEviction( + System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); + if (((ManagedLedgerImpl) ledger).getCacheSize() != 0) { + result.complete(false); + return; + } + + log.debug("Mark-Deleting to position {}", position); + cursor.asyncMarkDelete(position, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + log.debug("Mark delete complete"); + ManagedCursor cursor = (ManagedCursor) ctx; + if (cursor.hasMoreEntries()) { + result.complete(false); + return; + } + result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + result.completeExceptionally(exception); + } + + }, cursor); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + result.completeExceptionally(exception); + } + }, cursor, PositionImpl.LATEST); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + result.completeExceptionally(exception); + } + }, Pair.of(ledger, cursor)); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + result.completeExceptionally(exception); + } + + }, ledger); + } + + @Override + public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { + result.completeExceptionally(exception); + } + }, null, null); + + assertTrue(result.get()); + + log.info("Test completed"); + } + @Test(timeOut = 20000) public void asyncAPI() throws Throwable { final CountDownLatch counter = new CountDownLatch(1); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 355e252db19ea3..48ab4c1f982f88 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2436,6 +2436,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0; + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. " + + "The default is to evict through readPosition." + ) + private boolean cacheEvictionByMarkDeletedPosition = false; + /**** --- Transaction config variables. --- ****/ @FieldContext( category = CATEGORY_TRANSACTION, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8829841db6c49e..6f1ccc3b1ace19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1599,6 +1599,8 @@ public CompletableFuture getManagedLedgerConfig(TopicName t managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery()); managedLedgerConfig.setInactiveLedgerRollOverTime( serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS); + managedLedgerConfig.setCacheEvictionByMarkDeletedPosition( + serviceConfig.isCacheEvictionByMarkDeletedPosition()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);