From 463bdb301f4a7160ba44a31c5c3ba3cf83becb87 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 9 Sep 2024 23:32:30 +0800 Subject: [PATCH] [fix][broker] fix pulsar-admin topics stats-internal caused a BK client thread a deadlock (#23258) (cherry picked from commit 0aaa906cd8c68a212992166221123fd83172ce31) (cherry picked from commit f5737e616962442d3d0a06dfc53ef68e44d0055d) --- .../service/persistent/PersistentTopic.java | 26 +++++++++++++------ .../pulsar/compaction/CompactedTopicImpl.java | 8 ++++-- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 5d0d837bf9ac4..a394897e1773b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -2657,13 +2658,13 @@ public CompletableFuture getInternalStats(boolean info.entries = -1; info.size = -1; - Optional compactedTopicContext = getCompactedTopicContext(); - if (compactedTopicContext.isPresent()) { - CompactedTopicContext ledgerContext = compactedTopicContext.get(); - info.ledgerId = ledgerContext.getLedger().getId(); - info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1; - info.size = ledgerContext.getLedger().getLength(); - } + futures.add(getCompactedTopicContextAsync().thenAccept(v -> { + if (v != null) { + info.ledgerId = v.getLedger().getId(); + info.entries = v.getLedger().getLastAddConfirmed() + 1; + info.size = v.getLedger().getLength(); + } + })); stats.compactedLedger = info; @@ -2782,12 +2783,21 @@ public Optional getCompactedTopicContext() { if (topicCompactionService instanceof PulsarTopicCompactionService pulsarCompactedService) { return pulsarCompactedService.getCompactedTopic().getCompactedTopicContext(); } - } catch (ExecutionException | InterruptedException e) { + } catch (ExecutionException | InterruptedException | TimeoutException e) { log.warn("[{}]Fail to get ledger information for compacted topic.", topic); } return Optional.empty(); } + public CompletableFuture getCompactedTopicContextAsync() { + CompletableFuture res = + ((CompactedTopicImpl) topicCompactionService).getCompactedTopicContextFuture(); + if (res == null) { + return CompletableFuture.completedFuture(null); + } + return res; + } + public long getBacklogSize() { return ledger.getEstimatedBacklogSize(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 0352808110190..1850f62028649 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -32,6 +32,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import javax.annotation.Nullable; import org.apache.bookkeeper.client.BKException; @@ -304,8 +306,10 @@ static CompletableFuture> readEntries(LedgerHandle lh, long from, lo * Getter for CompactedTopicContext. * @return CompactedTopicContext */ - public Optional getCompactedTopicContext() throws ExecutionException, InterruptedException { - return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get()); + public Optional getCompactedTopicContext() throws ExecutionException, InterruptedException, + TimeoutException { + return compactedTopicContext == null ? Optional.empty() : + Optional.of(compactedTopicContext.get(30, TimeUnit.SECONDS)); } @Override