Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Part 2 of PIP-370: add metrics "pulsar_replication_disconnected_count" #23213

Merged
merged 4 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pip/pip-370.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@ For each metric provide:
* Attributes (labels)
* Unit
-->
| Name | Description | Attributes | Units|
| --- | --- | --- | --- |
| `pulsar_broker_replication_count` | Counter. The number of topics enabled replication. | cluster | - |
| `pulsar_broker_replication_disconnected_count` | Counter. The number of topics that enabled replication and its replicator failed to connect | cluster | - |
| Name | Description | Attributes | Units|
| --- |---------------------------------------------------------------------------------------------|---------------------------| --- |
| `pulsar_replication_disconnected_count` | Counter. The number of replicators. | cluster, namespace, topic | - |


# Monitoring
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ void updateStats(TopicStats stats) {
replStats.replicationBacklog += as.replicationBacklog;
replStats.msgRateExpired += as.msgRateExpired;
replStats.connectedCount += as.connectedCount;
replStats.disconnectedCount += as.disconnectedCount;
replStats.replicationDelayInSeconds += as.replicationDelayInSeconds;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class AggregatedReplicationStats {
/** The count of replication-subscriber up and running to replicate to remote cluster. */
public long connectedCount;

/** The count of replication-subscriber that failed to start to replicate to remote cluster. */
public long disconnectedCount;

/** Time in seconds from the time a message was produced to the time when it is about to be replicated. */
public long replicationDelayInSeconds;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,11 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
aggReplStats.msgThroughputOut += replStats.msgThroughputOut;
aggReplStats.replicationBacklog += replStats.replicationBacklog;
aggReplStats.msgRateExpired += replStats.msgRateExpired;
aggReplStats.connectedCount += replStats.connected ? 1 : 0;
if (replStats.connected) {
aggReplStats.connectedCount += 1;
} else {
aggReplStats.disconnectedCount += 1;
}
aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds;
});

Expand Down Expand Up @@ -510,6 +514,8 @@ private static void printNamespaceStats(PrometheusMetricStreams stream, Aggregat
replStats -> replStats.replicationBacklog, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_connected_count", stats,
replStats -> replStats.connectedCount, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_disconnected_count", stats,
replStats -> replStats.disconnectedCount, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_rate_expired", stats,
replStats -> replStats.msgRateExpired, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_delay_in_seconds", stats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_connected_count", replStats.connectedCount,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_disconnected_count", replStats.disconnectedCount,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_rate_expired", replStats.msgRateExpired,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_delay_in_seconds", replStats.replicationDelayInSeconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -91,6 +93,8 @@
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.glassfish.jersey.client.JerseyClient;
import org.glassfish.jersey.client.JerseyClientBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -1160,4 +1164,121 @@
admin1.namespaces().deleteNamespace(ns);
admin2.namespaces().deleteNamespace(ns);
}

@Test
public void testReplicationCountMetrics() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
// 1.Create topic, does not enable replication now.
admin1.topics().createNonPartitionedTopic(topicName);
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();

// We inject an error to make the internal producer fail to connect.
final AtomicInteger createProducerCounter = new AtomicInteger();
final AtomicBoolean failedCreateProducer = new AtomicBoolean(true);
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully or not.
if (failedCreateProducer.get()) {
log.info("Retry create replicator.producer count: {}", createProducerCounter);
// Release producer and fail callback.
originalProducer.closeAsync();
throw new RuntimeException("mock error");
}
return originalProducer;
}
return originalProducer;
});

// 2.Enable replication.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));

// Verify: metrics.
// Cluster level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
// Namespace level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
// Topic level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
JerseyClient httpClient = JerseyClientBuilder.createClient();
Awaitility.await().untilAsserted(() -> {
int topicConnected = 0;
int topicDisconnected = 0;

String response = httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
Multimap<String, PrometheusMetricsClient.Metric> metricMap = PrometheusMetricsClient.parseMetrics(response);
if (!metricMap.containsKey("pulsar_replication_disconnected_count")) {
fail("Expected 1 disconnected replicator.");
}
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_connected_count")) {
if (cluster1.equals(metric.tags.get("cluster"))
&& nonReplicatedNamespace.equals(metric.tags.get("namespace"))
&& topicName.equals(metric.tags.get("topic"))) {
topicConnected += metric.value;
Fixed Show fixed Hide fixed
}
}
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_disconnected_count")) {
if (cluster1.equals(metric.tags.get("cluster"))
&& nonReplicatedNamespace.equals(metric.tags.get("namespace"))
&& topicName.equals(metric.tags.get("topic"))) {
topicDisconnected += metric.value;
Fixed Show fixed Hide fixed
}
}
log.info("{}, {},", topicConnected, topicDisconnected);
assertEquals(topicConnected, 0);
assertEquals(topicDisconnected, 1);
});

// Let replicator connect successfully.
failedCreateProducer.set(false);
// Verify: metrics.
// Cluster level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
// Namespace level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
// Topic level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
Awaitility.await().atMost(Duration.ofSeconds(130)).untilAsserted(() -> {
int topicConnected = 0;
int topicDisconnected = 0;

String response = httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
Multimap<String, PrometheusMetricsClient.Metric> metricMap = PrometheusMetricsClient.parseMetrics(response);
if (!metricMap.containsKey("pulsar_replication_disconnected_count")) {
fail("Expected 1 disconnected replicator.");
}
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_connected_count")) {
if (cluster1.equals(metric.tags.get("cluster"))
&& nonReplicatedNamespace.equals(metric.tags.get("namespace"))
&& topicName.equals(metric.tags.get("topic"))) {
topicConnected += metric.value;
Fixed Show fixed Hide fixed
}
}
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_disconnected_count")) {
if (cluster1.equals(metric.tags.get("cluster"))
&& nonReplicatedNamespace.equals(metric.tags.get("namespace"))
&& topicName.equals(metric.tags.get("topic"))) {
topicDisconnected += metric.value;
Fixed Show fixed Hide fixed
}
}
log.info("{}, {}", topicConnected, topicDisconnected);
assertEquals(topicConnected, 1);
assertEquals(topicDisconnected, 0);
});

// cleanup.
taskToClearInjection.run();
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
waitReplicatorStopped(topicName);
admin1.topics().delete(topicName, false);
admin2.topics().delete(topicName, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,10 @@ public void testConfigReplicationStartAt() throws Exception {
public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception {
super.testDifferentTopicCreationRule(replicationMode);
}

@Test(enabled = false)
@Override
public void testReplicationCountMetrics() throws Exception {
super.testReplicationCountMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void testSimpleAggregation() {
replStats2.msgThroughputOut = 1536.0;
replStats2.replicationBacklog = 99;
replStats2.connectedCount = 1;
replStats2.disconnectedCount = 2;
replStats2.msgRateExpired = 3.0;
replStats2.replicationDelayInSeconds = 20;
topicStats2.replicationStats.put(namespace, replStats2);
Expand Down Expand Up @@ -148,6 +149,7 @@ public void testSimpleAggregation() {
assertEquals(nsReplStats.msgThroughputOut, 1792.0);
assertEquals(nsReplStats.replicationBacklog, 100);
assertEquals(nsReplStats.connectedCount, 1);
assertEquals(nsReplStats.disconnectedCount, 2);
assertEquals(nsReplStats.msgRateExpired, 6.0);
assertEquals(nsReplStats.replicationDelayInSeconds, 40);

Expand Down
Loading