diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 347a380d7eb5c..0dc820ec46d72 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -80,6 +80,10 @@ public ManagedLedgerFencedException() { super(new Exception("Attempted to use a fenced managed ledger")); } + public ManagedLedgerFencedException(String message) { + super(message); + } + public ManagedLedgerFencedException(Exception e) { super(e); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3b416997a6ead..71bc8cad6f0b3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -431,6 +431,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { @Override public void operationFailed(MetaStoreException e) { + handleBadVersion(e); if (e instanceof MetadataNotFoundException) { callback.initializeFailed(new ManagedLedgerNotFoundException(e)); } else { @@ -481,6 +482,7 @@ public void operationComplete(Void v, Stat stat) { @Override public void operationFailed(MetaStoreException e) { + handleBadVersion(e); callback.initializeFailed(new ManagedLedgerException(e)); } }; @@ -1022,6 +1024,7 @@ public void operationComplete(Void result, Stat stat) { @Override public void operationFailed(MetaStoreException e) { + handleBadVersion(e); callback.deleteCursorFailed(e, ctx); } @@ -1312,6 +1315,7 @@ public void operationComplete(Void result, Stat stat) { @Override public void operationFailed(MetaStoreException e) { log.error("[{}] Failed to terminate managed ledger: {}", name, e.getMessage()); + handleBadVersion(e); callback.terminateFailed(new ManagedLedgerException(e), ctx); } }); @@ -1396,6 +1400,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { public synchronized void asyncClose(final CloseCallback callback, final Object ctx) { State state = STATE_UPDATER.get(this); if (state == State.Fenced) { + cancelScheduledTasks(); factory.close(this); callback.closeFailed(new ManagedLedgerFencedException(), ctx); return; @@ -1519,6 +1524,7 @@ public void operationComplete(Void v, Stat stat) { @Override public void operationFailed(MetaStoreException e) { log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage()); + handleBadVersion(e); mbean.startDataLedgerDeleteOp(); bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> { mbean.endDataLedgerDeleteOp(); @@ -1527,14 +1533,12 @@ public void operationFailed(MetaStoreException e) { BKException.getMessage(rc1)); } }, null); - if (e instanceof BadVersionException) { synchronized (ManagedLedgerImpl.this) { log.error( "[{}] Failed to update ledger list. z-node version mismatch. Closing managed ledger", name); lastLedgerCreationFailureTimestamp = clock.millis(); - STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced); // Return ManagedLedgerFencedException to addFailed callback // to indicate that the ledger is now fenced and topic needs to be closed clearPendingAddEntries(new ManagedLedgerFencedException(e)); @@ -1557,6 +1561,12 @@ public void operationFailed(MetaStoreException e) { updateLedgersListAfterRollover(cb, newLedger); } } + + private void handleBadVersion(Throwable e) { + if (e instanceof BadVersionException) { + setFenced(); + } + } private void updateLedgersListAfterRollover(MetaStoreCallback callback, LedgerInfo newLedger) { if (!metadataMutex.tryLock()) { // Defer update for later @@ -2463,12 +2473,19 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(), TOTAL_SIZE_UPDATER.get(this)); } - if (STATE_UPDATER.get(this) == State.Closed) { + State currentState = STATE_UPDATER.get(this); + if (currentState == State.Closed) { log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name); trimmerMutex.unlock(); promise.completeExceptionally(new ManagedLedgerAlreadyClosedException("Can't trim closed ledger")); return; } + if (currentState == State.Fenced) { + log.debug("[{}] Ignoring trimming request since the managed ledger was already fenced", name); + trimmerMutex.unlock(); + promise.completeExceptionally(new ManagedLedgerFencedException("Can't trim fenced ledger")); + return; + } long slowestReaderLedgerId = -1; if (!cursors.hasDurableCursors()) { @@ -2557,7 +2574,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { return; } - if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming + if (currentState == State.CreatingLedger // Give up now and schedule a new trimming || !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list scheduleDeferredTrimming(isTruncate, promise); trimmerMutex.unlock(); @@ -2624,6 +2641,7 @@ public void operationFailed(MetaStoreException e) { log.warn("[{}] Failed to update the list of ledgers after trimming", name, e); metadataMutex.unlock(); trimmerMutex.unlock(); + handleBadVersion(e); promise.completeExceptionally(e); } @@ -2708,7 +2726,7 @@ public void deleteLedgerFailed(ManagedLedgerException e, Object ctx) { public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) { // Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and // ledgers - STATE_UPDATER.set(this, State.Fenced); + setFenced(); cancelScheduledTasks(); List cursors = Lists.newArrayList(this.cursors); @@ -2957,7 +2975,7 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct promise.whenComplete((result, exception) -> { offloadMutex.unlock(); if (exception != null) { - callback.offloadFailed(new ManagedLedgerException(exception), ctx); + callback.offloadFailed(ManagedLedgerException.getManagedLedgerException(exception), ctx); } else { callback.offloadComplete(result, ctx); } @@ -2971,11 +2989,17 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct private void offloadLoop(CompletableFuture promise, Queue ledgersToOffload, PositionImpl firstUnoffloaded, Optional firstError) { - if (getState() == State.Closed) { + State currentState = getState(); + if (currentState == State.Closed) { promise.completeExceptionally(new ManagedLedgerAlreadyClosedException( String.format("managed ledger [%s] has already closed", name))); return; } + if (currentState == State.Fenced) { + promise.completeExceptionally(new ManagedLedgerFencedException( + String.format("managed ledger [%s] is fenced", name))); + return; + } LedgerInfo info = ledgersToOffload.poll(); if (info == null) { if (firstError.isPresent()) { @@ -3117,6 +3141,7 @@ public void operationComplete(Void result, Stat stat) { @Override public void operationFailed(MetaStoreException e) { + handleBadVersion(e); unlockingPromise.completeExceptionally(e); } }); @@ -3639,6 +3664,7 @@ private void checkManagedLedgerIsOpen() throws ManagedLedgerException { } synchronized void setFenced() { + log.info("{} Moving to Fenced state", name); STATE_UPDATER.set(this, State.Fenced); } @@ -3842,12 +3868,21 @@ private void scheduleTimeoutTask() { ? Math.max(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds()) : timeoutSec; this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> { - checkAddTimeout(); - checkReadTimeout(); + checkTimeouts(); }), timeoutSec, timeoutSec, TimeUnit.SECONDS); } } + private void checkTimeouts() { + final State state = STATE_UPDATER.get(this); + if (state == State.Closed + || state == State.Fenced) { + return; + } + checkAddTimeout(); + checkReadTimeout(); + } + private void checkAddTimeout() { long timeoutSec = config.getAddEntryTimeoutSeconds(); if (timeoutSec < 1) { @@ -4004,6 +4039,7 @@ public void operationComplete(Void result, Stat version) { @Override public void operationFailed(MetaStoreException e) { log.error("[{}] Update managedLedger's properties failed", name, e); + handleBadVersion(e); callback.updatePropertiesFailed(e, ctx); metadataMutex.unlock(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java index 8fd2fd0003d35..5a52c29d544a6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java @@ -18,15 +18,19 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; @@ -387,6 +391,72 @@ public void recoverAfterZnodeVersionError() throws Exception { } } + @Test + public void recoverAfterZnodeVersionErrorWhileTrimming() throws Exception { + ManagedLedger ledger = factory.open("my_test_ledger_trim", + new ManagedLedgerConfig() + .setMaxEntriesPerLedger(2)); + ledger.addEntry("test".getBytes()); + ledger.addEntry("test".getBytes()); + ledger.addEntry("test".getBytes()); + + metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) -> + path.equals("/managed-ledgers/my_test_ledger_trim") + && op == FaultInjectionMetadataStore.OperationType.PUT + ); + + CompletableFuture handle = new CompletableFuture<>(); + ledger.trimConsumedLedgersInBackground(handle); + assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(), + instanceOf(ManagedLedgerException.BadVersionException.class)); + + assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState()); + + // if the task started after the ML moved to Fenced state, it must fail + CompletableFuture handleAlreadyFenced = new CompletableFuture<>(); + ledger.trimConsumedLedgersInBackground(handleAlreadyFenced); + assertThat(expectThrows(ExecutionException.class, () -> handleAlreadyFenced.get()).getCause(), + instanceOf(ManagedLedgerException.ManagedLedgerFencedException.class)); + + try { + ledger.addEntry("entry".getBytes()); + fail("should fail"); + } catch (ManagedLedgerFencedException e) { + assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage()); + } + + assertFalse(factory.ledgers.isEmpty()); + try { + ledger.close(); + } catch (ManagedLedgerFencedException e) { + assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage()); + } + + // verify that the ManagedLedger has been unregistered even if it was fenced + assertTrue(factory.ledgers.isEmpty()); + } + + @Test + public void badVersionErrorDuringTruncateLedger() throws Exception { + ManagedLedger ledger = factory.open("my_test_ledger_trim", + new ManagedLedgerConfig() + .setMaxEntriesPerLedger(2)); + ledger.addEntry("test".getBytes()); + ledger.addEntry("test".getBytes()); + ledger.addEntry("test".getBytes()); + + metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) -> + path.equals("/managed-ledgers/my_test_ledger_trim") + && op == FaultInjectionMetadataStore.OperationType.PUT + ); + + CompletableFuture handle = ledger.asyncTruncate(); + assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(), + instanceOf(ManagedLedgerException.BadVersionException.class)); + + assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState()); + } + @Test public void recoverAfterWriteError() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 3f1bfcca9400e..ae0e53456e2d7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.lang.reflect.Field; @@ -48,6 +49,8 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; @@ -125,6 +128,51 @@ public void testOffload() throws Exception { .filter(e -> e.getOffloadContext().getComplete()) .map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers()); + + // ledgers should be marked as offloaded + ledger.getLedgersInfoAsList().stream().allMatch(l -> l.hasOffloadContext()); + } + + @Test + public void testOffloadFenced() throws Exception { + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + config.setLedgerOffloader(offloader); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + + int i = 0; + for (; i < 25; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + assertEquals(ledger.getLedgersInfoAsList().size(), 3); + + metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) -> + path.equals("/managed-ledgers/my_test_ledger") + && op == FaultInjectionMetadataStore.OperationType.PUT + ); + + assertThrows(ManagedLedgerException.ManagedLedgerFencedException.class, () -> + ledger.offloadPrefix(ledger.getLastConfirmedEntry())); + + assertEquals(ledger.getLedgersInfoAsList().size(), 3); + + // the offloader actually wrote the data on the storage + assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()) + .map(e -> e.getLedgerId()).collect(Collectors.toSet()), + offloader.offloadedLedgers()); + + // but the ledgers should not be marked as offloaded in local memory, as the write to metadata failed + ledger.getLedgersInfoAsList().stream().allMatch(l -> !l.hasOffloadContext()); + + // check that the ledger is fenced + assertEquals(ManagedLedgerImpl.State.Fenced, ledger.getState()); + } @Test diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 30150341b3618..49f2ac2f548cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1321,31 +1321,13 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect @Override public void closeComplete(Object ctx) { // Everything is now closed, remove the topic from map - brokerService.removeTopicFromCache(PersistentTopic.this) - .thenRun(() -> { - replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); - - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - - unregisterTopicPolicyListener(); - log.info("[{}] Topic closed", topic); - cancelFencedTopicMonitoringTask(); - closeFuture.complete(null); - }) - .exceptionally(ex -> { - closeFuture.completeExceptionally(ex); - return null; - }); + disposeTopic(closeFuture); } @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - brokerService.removeTopicFromCache(PersistentTopic.this); - unregisterTopicPolicyListener(); - closeFuture.complete(null); + disposeTopic(closeFuture); } }, null); }).exceptionally(exception -> { @@ -1358,6 +1340,26 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { return closeFuture; } + private void disposeTopic(CompletableFuture closeFuture) { + brokerService.removeTopicFromCache(topic) + .thenRun(() -> { + replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); + + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + + unregisterTopicPolicyListener(); + log.info("[{}] Topic closed", topic); + cancelFencedTopicMonitoringTask(); + closeFuture.complete(null); + }) + .exceptionally(ex -> { + closeFuture.completeExceptionally(ex); + return null; + }); + } + @VisibleForTesting CompletableFuture checkReplicationAndRetryOnFailure() { CompletableFuture result = new CompletableFuture(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 612d9368b8c70..8b0dbb76eecb7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -370,8 +370,6 @@ public void testTruncateCorruptDataLedger() throws Exception { producer.send(message.getBytes()); } - ml.delete(); - // Admin should be able to truncate the topic admin.topics().truncate(topic1);