From 2b9653af0def1af0be588955b00dff4d8aebfc89 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 9 Sep 2024 23:32:30 +0800 Subject: [PATCH] [fix][broker] fix pulsar-admin topics stats-internal caused a BK client thread a deadlock (#23258) (cherry picked from commit 0aaa906cd8c68a212992166221123fd83172ce31) --- .../service/persistent/PersistentTopic.java | 29 ++++++++++++++----- .../pulsar/compaction/CompactedTopicImpl.java | 8 +++-- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9e0c6c180aad2..2eb84a52c10c0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -48,6 +48,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -2809,13 +2810,13 @@ public CompletableFuture getInternalStats(boolean info.entries = -1; info.size = -1; - Optional compactedTopicContext = getCompactedTopicContext(); - if (compactedTopicContext.isPresent()) { - CompactedTopicContext ledgerContext = compactedTopicContext.get(); - info.ledgerId = ledgerContext.getLedger().getId(); - info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1; - info.size = ledgerContext.getLedger().getLength(); - } + futures.add(getCompactedTopicContextAsync().thenAccept(v -> { + if (v != null) { + info.ledgerId = v.getLedger().getId(); + info.entries = v.getLedger().getLastAddConfirmed() + 1; + info.size = v.getLedger().getLength(); + } + })); stats.compactedLedger = info; @@ -2936,12 +2937,24 @@ public Optional getCompactedTopicContext() { if (topicCompactionService instanceof PulsarTopicCompactionService pulsarCompactedService) { return pulsarCompactedService.getCompactedTopic().getCompactedTopicContext(); } - } catch (ExecutionException | InterruptedException e) { + } catch (ExecutionException | InterruptedException | TimeoutException e) { log.warn("[{}]Fail to get ledger information for compacted topic.", topic); } return Optional.empty(); } + public CompletableFuture getCompactedTopicContextAsync() { + if (topicCompactionService instanceof PulsarTopicCompactionService pulsarCompactedService) { + CompletableFuture res = + pulsarCompactedService.getCompactedTopic().getCompactedTopicContextFuture(); + if (res == null) { + return CompletableFuture.completedFuture(null); + } + return res; + } + return CompletableFuture.completedFuture(null); + } + public long getBacklogSize() { return ledger.getEstimatedBacklogSize(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index f67f28733bddb..09dfcb651927f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -32,6 +32,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import javax.annotation.Nullable; import org.apache.bookkeeper.client.BKException; @@ -304,8 +306,10 @@ static CompletableFuture> readEntries(LedgerHandle lh, long from, lo * Getter for CompactedTopicContext. * @return CompactedTopicContext */ - public Optional getCompactedTopicContext() throws ExecutionException, InterruptedException { - return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get()); + public Optional getCompactedTopicContext() throws ExecutionException, InterruptedException, + TimeoutException { + return compactedTopicContext == null ? Optional.empty() : + Optional.of(compactedTopicContext.get(30, TimeUnit.SECONDS)); } @Override