Skip to content

Commit

Permalink
[fix][broker] PIP-399: Fix Metric Name for Delayed Queue (#23712)
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled authored Jan 14, 2025
1 parent c92930f commit 5be922b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st
writeSubscriptionMetric(stream, "pulsar_subscription_filter_rescheduled_msg_count",
subsStats.filterRescheduledMsgCount, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_delayed_message_index_size_bytes",
writeSubscriptionMetric(stream, "pulsar_subscription_delayed_message_index_size_bytes",
subsStats.delayedMessageIndexSizeInBytes, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,11 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
String subName = "test_sub";
@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test_sub")
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
try {
Expand All @@ -453,7 +454,13 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex

Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
Collection<Metric> metrics = metricsMap.get("pulsar_delayed_message_index_size_bytes");
Assert.assertTrue(metrics.size() > 0);
Collection<Metric> subMetrics = metricsMap.get("pulsar_subscription_delayed_message_index_size_bytes");
assertFalse(metrics.isEmpty());
if (exposeTopicLevelMetrics) {
assertFalse(subMetrics.isEmpty());
} else {
assertTrue(subMetrics.isEmpty());
}

int topicLevelNum = 0;
int namespaceLevelNum = 0;
Expand All @@ -462,14 +469,20 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex
if (exposeTopicLevelMetrics && metric.tags.get("topic").equals(topic)) {
Assert.assertTrue(metric.value > 0);
topicLevelNum++;
if ("test_sub".equals(metric.tags.get("subscription"))) {
subscriptionLevelNum++;
}
} else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
Assert.assertTrue(metric.value > 0);
namespaceLevelNum++;
}
}
if (exposeTopicLevelMetrics) {
for (Metric metric : subMetrics) {
if (metric.tags.get("topic").equals(topic) &&
subName.equals(metric.tags.get("subscription"))) {
Assert.assertTrue(metric.value > 0);
subscriptionLevelNum++;
}
}
}

if (exposeTopicLevelMetrics) {
Assert.assertTrue(topicLevelNum > 0);
Expand Down

0 comments on commit 5be922b

Please sign in to comment.