diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java index c2d002ad19cb0..93eb3ebbc77d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java @@ -119,10 +119,9 @@ public CompletableFuture cleanResidualSnapshots(ManagedCursor cursor) { FutureUtil.Sequencer sequencer = FutureUtil.Sequencer.create(); cursorProperties.forEach((k, v) -> { if (k != null && v != null && k.startsWith(BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX)) { - CompletableFuture future = sequencer.sequential(() -> { - return cursor.removeCursorProperty(k) - .thenCompose(__ -> bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v))); - }); + CompletableFuture future = sequencer.sequential(() -> + bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v)) + .thenCompose(__ -> cursor.removeCursorProperty(k))); futures.add(future); } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index 8dcfe8d39a8b4..0d90e5e1d980f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -205,7 +205,10 @@ private CompletableFuture openLedger(Long ledgerId) { BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()), LedgerPassword, (rc, handle, ctx) -> { - if (rc != BKException.Code.OK) { + if (rc == BKException.Code.NoSuchLedgerExistsException) { + // If the ledger does not exist, throw BucketNotExistException + future.completeExceptionally(noSuchLedgerException("Open ledger", ledgerId)); + } else if (rc != BKException.Code.OK) { future.completeExceptionally(bkException("Open ledger", rc, ledgerId)); } else { future.complete(handle); @@ -265,10 +268,11 @@ CompletableFuture> getLedgerEntry(LedgerHandle ledger, private CompletableFuture deleteLedger(long ledgerId) { CompletableFuture future = new CompletableFuture<>(); bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> { - if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Delete ledger", rc, ledgerId)); - } else { + if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.OK) { + // If the ledger does not exist or has been deleted, we can treat it as success future.complete(null); + } else { + future.completeExceptionally(bkException("Delete ledger", rc, ledgerId)); } }, null); return future; @@ -279,4 +283,10 @@ private static BucketSnapshotPersistenceException bkException(String operation, + " - ledger=" + ledgerId + " - operation=" + operation; return new BucketSnapshotPersistenceException(message); } + + private static BucketNotExistException noSuchLedgerException(String operation, long ledgerId) { + String message = BKException.getMessage(BKException.Code.NoSuchLedgerExistsException) + + " - ledger=" + ledgerId + " - operation=" + operation; + return new BucketNotExistException(message); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 0091bf5b9bd30..08f3ae1fa6e8a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -180,8 +180,7 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT futures = new HashMap<>(immutableBucketMap.size()); for (Map.Entry, ImmutableBucket> entry : immutableBucketMap.entrySet()) { Range key = entry.getKey(); - ImmutableBucket immutableBucket = entry.getValue(); - futures.put(key, immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime)); + futures.put(key, handleRecoverBucketSnapshotEntry(entry.getValue())); } try { @@ -232,6 +231,33 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT return numberDelayedMessages.getValue(); } + /** + * Handle the BucketNotExistException when recover bucket snapshot entry. + * The non exist bucket will be added to `toBeDeletedBucketMap` and deleted from `immutableBuckets` + * in the next step. + * + * @param bucket + * @return + */ + private CompletableFuture> handleRecoverBucketSnapshotEntry(ImmutableBucket bucket) { + CompletableFuture> f = new CompletableFuture<>(); + bucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime) + .whenComplete((v, e) -> { + if (e == null) { + f.complete(v); + } else { + if (e instanceof BucketNotExistException) { + // If the bucket does not exist, return an empty list, + // the bucket will be deleted from `immutableBuckets` in the next step. + f.complete(Collections.emptyList()); + } else { + f.completeExceptionally(e); + } + } + }); + return f; + } + private synchronized void putAndCleanOverlapRange(Range range, ImmutableBucket immutableBucket, Map, ImmutableBucket> toBeDeletedBucketMap) { RangeMap subRangeMap = immutableBuckets.subRangeMap(range); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java new file mode 100644 index 0000000000000..f6c16a1595f54 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import org.apache.pulsar.broker.service.BrokerServiceException; + +public class BucketNotExistException extends BrokerServiceException.PersistenceException { + + public BucketNotExistException(Throwable t) { + super(t); + } + + public BucketNotExistException(String msg) { + super(msg); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 0932f51f350ce..a1944a21ea794 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -193,9 +193,10 @@ CompletableFuture asyncDeleteBucketSnapshot(BucketDelayedMessageIndexStats stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.delete); String bucketKey = bucketKey(); long bucketId = getAndUpdateBucketId(); - return removeBucketCursorProperty(bucketKey).thenCompose(__ -> - executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId), - BucketSnapshotPersistenceException.class, MaxRetryTimes)).whenComplete((__, ex) -> { + + return executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId), + BucketSnapshotPersistenceException.class, MaxRetryTimes) + .whenComplete((__, ex) -> { if (ex != null) { log.error("[{}] Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}", dispatcherName, bucketId, bucketKey, ex); @@ -208,7 +209,8 @@ CompletableFuture asyncDeleteBucketSnapshot(BucketDelayedMessageIndexStats stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete, System.currentTimeMillis() - deleteStartTime); } - }); + }) + .thenCompose(__ -> removeBucketCursorProperty(bucketKey)); } CompletableFuture clear(BucketDelayedMessageIndexStats stats) {