Skip to content

Commit

Permalink
[fix][broker] Avoid orphan ledgers in BucketDelayedDeliveryTracker (#…
Browse files Browse the repository at this point in the history
…22802)

(cherry picked from commit 8b6b337)
  • Loading branch information
dao-jun authored and lhotari committed Oct 10, 2024
1 parent 0457bde commit f3403de
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ public CompletableFuture<Void> cleanResidualSnapshots(ManagedCursor cursor) {
FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
cursorProperties.forEach((k, v) -> {
if (k != null && v != null && k.startsWith(BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX)) {
CompletableFuture<Void> future = sequencer.sequential(() -> {
return cursor.removeCursorProperty(k)
.thenCompose(__ -> bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v)));
});
CompletableFuture<Void> future = sequencer.sequential(() ->
bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v))
.thenCompose(__ -> cursor.removeCursorProperty(k)));
futures.add(future);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
if (rc == BKException.Code.NoSuchLedgerExistsException) {
// If the ledger does not exist, throw BucketNotExistException
future.completeExceptionally(noSuchLedgerException("Open ledger", ledgerId));
} else if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Open ledger", rc, ledgerId));
} else {
future.complete(handle);
Expand Down Expand Up @@ -265,10 +268,11 @@ CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntry(LedgerHandle ledger,
private CompletableFuture<Void> deleteLedger(long ledgerId) {
CompletableFuture<Void> future = new CompletableFuture<>();
bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Delete ledger", rc, ledgerId));
} else {
if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.OK) {
// If the ledger does not exist or has been deleted, we can treat it as success
future.complete(null);
} else {
future.completeExceptionally(bkException("Delete ledger", rc, ledgerId));
}
}, null);
return future;
Expand All @@ -279,4 +283,10 @@ private static BucketSnapshotPersistenceException bkException(String operation,
+ " - ledger=" + ledgerId + " - operation=" + operation;
return new BucketSnapshotPersistenceException(message);
}

private static BucketNotExistException noSuchLedgerException(String operation, long ledgerId) {
String message = BKException.getMessage(BKException.Code.NoSuchLedgerExistsException)
+ " - ledger=" + ledgerId + " - operation=" + operation;
return new BucketNotExistException(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT
futures = new HashMap<>(immutableBucketMap.size());
for (Map.Entry<Range<Long>, ImmutableBucket> entry : immutableBucketMap.entrySet()) {
Range<Long> key = entry.getKey();
ImmutableBucket immutableBucket = entry.getValue();
futures.put(key, immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime));
futures.put(key, handleRecoverBucketSnapshotEntry(entry.getValue()));
}

try {
Expand Down Expand Up @@ -231,6 +230,33 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT
return numberDelayedMessages.getValue();
}

/**
* Handle the BucketNotExistException when recover bucket snapshot entry.
* The non exist bucket will be added to `toBeDeletedBucketMap` and deleted from `immutableBuckets`
* in the next step.
*
* @param bucket
* @return
*/
private CompletableFuture<List<DelayedIndex>> handleRecoverBucketSnapshotEntry(ImmutableBucket bucket) {
CompletableFuture<List<DelayedIndex>> f = new CompletableFuture<>();
bucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime)
.whenComplete((v, e) -> {
if (e == null) {
f.complete(v);
} else {
if (e instanceof BucketNotExistException) {
// If the bucket does not exist, return an empty list,
// the bucket will be deleted from `immutableBuckets` in the next step.
f.complete(Collections.emptyList());
} else {
f.completeExceptionally(e);
}
}
});
return f;
}

private synchronized void putAndCleanOverlapRange(Range<Long> range, ImmutableBucket immutableBucket,
Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap) {
RangeMap<Long, ImmutableBucket> subRangeMap = immutableBuckets.subRangeMap(range);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed.bucket;

import org.apache.pulsar.broker.service.BrokerServiceException;

public class BucketNotExistException extends BrokerServiceException.PersistenceException {

public BucketNotExistException(Throwable t) {
super(t);
}

public BucketNotExistException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ CompletableFuture<Void> asyncDeleteBucketSnapshot(BucketDelayedMessageIndexStats
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.delete);
String bucketKey = bucketKey();
long bucketId = getAndUpdateBucketId();
return removeBucketCursorProperty(bucketKey).thenCompose(__ ->
executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId),
BucketSnapshotPersistenceException.class, MaxRetryTimes)).whenComplete((__, ex) -> {

return executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId),
BucketSnapshotPersistenceException.class, MaxRetryTimes)
.whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}",
dispatcherName, bucketId, bucketKey, ex);
Expand All @@ -208,7 +209,8 @@ CompletableFuture<Void> asyncDeleteBucketSnapshot(BucketDelayedMessageIndexStats
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete,
System.currentTimeMillis() - deleteStartTime);
}
});
})
.thenCompose(__ -> removeBucketCursorProperty(bucketKey));
}

CompletableFuture<Void> clear(BucketDelayedMessageIndexStats stats) {
Expand Down

0 comments on commit f3403de

Please sign in to comment.