From d6d08bed0e0a21910133e0b79a2bb69a981ba9db Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 26 Aug 2024 13:09:27 +0300 Subject: [PATCH] [improve][broker] Add msgInReplay subscription stat and metric to improve Key_Shared observability --- .../service/persistent/MessageRedeliveryController.java | 9 +++++++++ .../PersistentDispatcherMultipleConsumers.java | 5 ++++- .../service/persistent/PersistentSubscription.java | 1 + .../stats/prometheus/AggregatedNamespaceStats.java | 4 ++++ .../stats/prometheus/AggregatedSubscriptionStats.java | 2 ++ .../stats/prometheus/NamespaceStatsAggregator.java | 3 +++ .../pulsar/broker/stats/prometheus/TopicStats.java | 2 ++ .../pulsar/common/policies/data/SubscriptionStats.java | 3 +++ .../policies/data/stats/SubscriptionStatsImpl.java | 6 ++++++ 9 files changed, 34 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index 526874a7ae34b..9d29b93ca450d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -149,4 +149,13 @@ public boolean containsStickyKeyHashes(Set stickyKeyHashes) { public NavigableSet getMessagesToReplayNow(int maxMessagesToRead) { return messagesToRedeliver.items(maxMessagesToRead, PositionFactory::create); } + + /** + * Get the number of messages registered for replay in the redelivery controller. + * + * @return number of messages + */ + public int size() { + return messagesToRedeliver.size(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 274bdd9947a07..20dbc4925d152 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -135,7 +135,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected final ExecutorService dispatchMessagesThread; private final SharedConsumerAssignor assignor; - protected enum ReadType { Normal, Replay } @@ -1352,5 +1351,9 @@ public Subscription getSubscription() { return subscription; } + public long getNumberOfMessagesInReplay() { + return redeliveryMessages.size(); + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index f59ea18ce8ea7..ea1b7d7602be7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1276,6 +1276,7 @@ public CompletableFuture getStatsAsync(GetStatsOptions ge subStats.unackedMessages = d.getTotalUnackedMessages(); subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs(); subStats.msgDelayed = d.getNumberOfDelayedMessages(); + subStats.msgInReplay = d.getNumberOfMessagesInReplay(); } } subStats.msgBacklog = getNumberOfEntriesInBacklog(getStatsOptions.isGetPreciseBacklog()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index 85ff15c915aa7..aaaea7b493e45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -43,6 +43,7 @@ public class AggregatedNamespaceStats { public ManagedLedgerStats managedLedgerStats = new ManagedLedgerStats(); public long msgBacklog; public long msgDelayed; + public long msgInReplay; public long ongoingTxnCount; public long abortedTxnCount; @@ -141,10 +142,12 @@ void updateStats(TopicStats stats) { AggregatedSubscriptionStats subsStats = subscriptionStats.computeIfAbsent(n, k -> new AggregatedSubscriptionStats()); msgDelayed += as.msgDelayed; + msgInReplay += as.msgInReplay; subsStats.blockedSubscriptionOnUnackedMsgs = as.blockedSubscriptionOnUnackedMsgs; subsStats.msgBacklog += as.msgBacklog; subsStats.msgBacklogNoDelayed += as.msgBacklogNoDelayed; subsStats.msgDelayed += as.msgDelayed; + subsStats.msgInReplay += as.msgInReplay; subsStats.msgRateRedeliver += as.msgRateRedeliver; subsStats.unackedMessages += as.unackedMessages; subsStats.filterProcessedMsgCount += as.filterProcessedMsgCount; @@ -200,6 +203,7 @@ public void reset() { msgBacklog = 0; msgDelayed = 0; + msgInReplay = 0; ongoingTxnCount = 0; abortedTxnCount = 0; committedTxnCount = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java index da0324c55655c..b713146f58bac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java @@ -43,6 +43,8 @@ public class AggregatedSubscriptionStats { public long msgDelayed; + public long msgInReplay; + long msgOutCounter; long bytesOutCounter; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index f0d11167e65fe..25c875778c05c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -134,6 +134,7 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subsStats.msgOutCounter = subscriptionStats.msgOutCounter; subsStats.msgBacklog = subscriptionStats.msgBacklog; subsStats.msgDelayed = subscriptionStats.msgDelayed; + subsStats.msgInReplay = subscriptionStats.msgInReplay; subsStats.msgRateExpired = subscriptionStats.msgRateExpired; subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired; subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed; @@ -424,6 +425,8 @@ private static void printNamespaceStats(PrometheusMetricStreams stream, Aggregat writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, cluster, namespace); + writeMetric(stream, "pulsar_subscription_in_replay", stats.msgInReplay, cluster, namespace); + writeMetric(stream, "pulsar_delayed_message_index_size_bytes", stats.delayedMessageIndexSizeInBytes, cluster, namespace); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 013b528731060..e54a3710e1294 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -310,6 +310,8 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st subsStats.msgBacklogNoDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); writeSubscriptionMetric(stream, "pulsar_subscription_delayed", subsStats.msgDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_in_replay", + subsStats.msgInReplay, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); writeSubscriptionMetric(stream, "pulsar_subscription_unacked_messages", diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index cabef1ca9602d..e307e41862e74 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -66,6 +66,9 @@ public interface SubscriptionStats { /** Number of delayed messages currently being tracked. */ long getMsgDelayed(); + /** Number of messages registered for replay. */ + long getMsgInReplay(); + /** * Number of unacknowledged messages for the subscription, where an unacknowledged message is one that has been * sent to a consumer but not yet acknowledged. Calculated by summing all {@link ConsumerStats#getUnackedMessages()} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index ab4d07c7ae486..977ed28e86814 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -74,6 +74,9 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** Number of delayed messages currently being tracked. */ public long msgDelayed; + /** Number of messages registered for replay. */ + public long msgInReplay; + /** * Number of unacknowledged messages for the subscription, where an unacknowledged message is one that has been * sent to a consumer but not yet acknowledged. Calculated by summing all {@link ConsumerStatsImpl#unackedMessages} @@ -173,6 +176,8 @@ public void reset() { msgBacklog = 0; backlogSize = 0; msgBacklogNoDelayed = 0; + msgDelayed = 0; + msgInReplay = 0; unackedMessages = 0; type = null; msgRateExpired = 0; @@ -208,6 +213,7 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) { this.backlogSize += stats.backlogSize; this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed; this.msgDelayed += stats.msgDelayed; + this.msgInReplay += stats.msgInReplay; this.unackedMessages += stats.unackedMessages; this.type = stats.type; this.msgRateExpired += stats.msgRateExpired;