Skip to content

Commit

Permalink
[improve] [broker] PIP-355: Enhancing Broker-Level Metrics for Pulsar (
Browse files Browse the repository at this point in the history
  • Loading branch information
hangc0276 committed Jun 12, 2024
1 parent 1770cbc commit c724f02
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import com.google.common.base.MoreObjects;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -32,6 +33,7 @@
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -128,6 +130,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP

private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();
private LongAdder systemTopicBytesInCounter = new LongAdder();
private final LongAdder filteredEntriesCounter = new LongAdder();

private static final AtomicLongFieldUpdater<AbstractTopic> RATE_LIMITED_UPDATER =
Expand Down Expand Up @@ -157,10 +160,13 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP

protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
protected final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
protected final LongAdder bytesOutFromRemovedSystemSubscriptions = new LongAdder();
protected volatile Pair<String, List<EntryFilter>> entryFilters;
protected volatile boolean transferring = false;
private volatile List<PublishRateLimiter> activeRateLimiters;

protected Set<String> additionalSystemCursorNames = new TreeSet<>();

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
Expand All @@ -176,6 +182,8 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
topicPublishRateLimiter = new PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicSnapshotClock());
updateActiveRateLimiters();

additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
}

public SubscribeRate getSubscribeRate() {
Expand Down Expand Up @@ -921,6 +929,10 @@ public void incrementPublishCount(Producer producer, int numOfMessages, long msg
// increase counters
bytesInCounter.add(msgSizeInBytes);
msgInCounter.add(numOfMessages);

if (isSystemTopic()) {
systemTopicBytesInCounter.add(msgSizeInBytes);
}
}

private void handlePublishThrottling(Producer producer, int numOfMessages, long msgSizeInBytes) {
Expand Down Expand Up @@ -1184,6 +1196,10 @@ public long getMsgOutCounter() {
+ sumSubscriptions(AbstractSubscription::getMsgOutCounter);
}

public long getSystemTopicBytesInCounter() {
return systemTopicBytesInCounter.longValue();
}

public long getBytesOutCounter() {
return bytesOutFromRemovedSubscriptions.longValue()
+ sumSubscriptions(AbstractSubscription::getBytesOutCounter);
Expand Down Expand Up @@ -1369,4 +1385,9 @@ public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar, S
}
return Optional.empty();
}

public boolean isSystemCursor(String sub) {
return COMPACTION_SUBSCRIPTION.equals(sub)
|| (additionalSystemCursorNames != null && additionalSystemCursorNames.contains(sub));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -940,9 +940,11 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
stats.systemTopicBytesInCounter = getSystemTopicBytesInCounter();
stats.waitingPublishers = getWaitingProducersCount();
stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
stats.bytesOutInternalCounter = bytesOutFromRemovedSystemSubscriptions.longValue();

subscriptions.forEach((name, subscription) -> {
NonPersistentSubscriptionStatsImpl subStats = subscription.getStats(getStatsOptions);
Expand All @@ -952,6 +954,10 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.getSubscriptions().put(name, subStats);

if (isSystemCursor(name)) {
stats.bytesOutInternalCounter += subStats.bytesOutCounter;
}
});

replicators.forEach((cluster, replicator) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -287,7 +286,6 @@ protected TopicStatsHelper initialValue() {
private final ExecutorService orderedExecutor;

private volatile CloseFutures closeFutures;
private Set<String> additionalSystemCursorNames = new TreeSet<>();

@Getter
private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();
Expand Down Expand Up @@ -431,7 +429,6 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
} else {
shadowSourceTopic = null;
}
additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
}

@Override
Expand Down Expand Up @@ -1401,6 +1398,10 @@ void removeSubscription(String subscriptionName) {
SubscriptionStatsImpl stats = sub.getStats(new GetStatsOptions(false, false, false, false, false));
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);

if (isSystemCursor(subscriptionName)) {
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
}
}
}

Expand Down Expand Up @@ -2566,10 +2567,12 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
stats.systemTopicBytesInCounter = getSystemTopicBytesInCounter();
stats.msgChunkPublished = this.msgChunkPublished;
stats.waitingPublishers = getWaitingProducersCount();
stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
stats.bytesOutInternalCounter = bytesOutFromRemovedSystemSubscriptions.longValue();
stats.publishRateLimitedTimes = publishRateLimitedTimes;
TransactionBuffer txnBuffer = getTransactionBuffer();
stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount();
Expand All @@ -2596,6 +2599,10 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
topicMetricBean.labelsAndValues = v.labelsAndValues;
topicMetricBean.value += v.value;
});

if (isSystemCursor(name)) {
stats.bytesOutInternalCounter += subStats.bytesOutCounter;
}
});

replicators.forEach((cluster, replicator) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public class AggregatedBrokerStats {
public long msgBacklog;
public long sizeBasedBacklogQuotaExceededEvictionCount;
public long timeBasedBacklogQuotaExceededEvictionCount;
public long bytesInCounter;
public long bytesOutCounter;
public long systemTopicBytesInCounter;
public long bytesOutInternalCounter;

@SuppressWarnings("DuplicatedCode")
void updateStats(TopicStats stats) {
Expand All @@ -54,6 +58,10 @@ void updateStats(TopicStats stats) {
msgBacklog += stats.msgBacklog;
timeBasedBacklogQuotaExceededEvictionCount += stats.timeBasedBacklogQuotaExceededEvictionCount;
sizeBasedBacklogQuotaExceededEvictionCount += stats.sizeBasedBacklogQuotaExceededEvictionCount;
bytesInCounter += stats.bytesInCounter;
bytesOutCounter += stats.bytesOutCounter;
systemTopicBytesInCounter += stats.systemTopicBytesInCounter;
bytesOutInternalCounter += stats.bytesOutInternalCounter;
}

@SuppressWarnings("DuplicatedCode")
Expand All @@ -74,5 +82,9 @@ public void reset() {
msgBacklog = 0;
sizeBasedBacklogQuotaExceededEvictionCount = 0;
timeBasedBacklogQuotaExceededEvictionCount = 0;
bytesInCounter = 0;
bytesOutCounter = 0;
systemTopicBytesInCounter = 0;
bytesOutInternalCounter = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.msgInCounter = tStatus.msgInCounter;
stats.bytesInCounter = tStatus.bytesInCounter;
stats.msgOutCounter = tStatus.msgOutCounter;
stats.systemTopicBytesInCounter = tStatus.systemTopicBytesInCounter;
stats.bytesOutInternalCounter = tStatus.getBytesOutInternalCounter();
stats.bytesOutCounter = tStatus.bytesOutCounter;
stats.averageMsgSize = tStatus.averageMsgSize;
stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
Expand Down Expand Up @@ -358,6 +360,16 @@ private static void printBrokerStats(PrometheusMetricStreams stream, String clus
brokerStats.timeBasedBacklogQuotaExceededEvictionCount, cluster, BacklogQuotaType.message_age);

writeMetric(stream, "pulsar_broker_msg_backlog", brokerStats.msgBacklog, cluster);
long userOutBytes = brokerStats.bytesOutCounter - brokerStats.bytesOutInternalCounter;
writeMetric(stream, "pulsar_broker_out_bytes_total",
userOutBytes, cluster, "system_subscription", "false");
writeMetric(stream, "pulsar_broker_out_bytes_total",
brokerStats.bytesOutInternalCounter, cluster, "system_subscription", "true");
long userTopicInBytes = brokerStats.bytesInCounter - brokerStats.systemTopicBytesInCounter;
writeMetric(stream, "pulsar_broker_in_bytes_total",
userTopicInBytes, cluster, "system_topic", "false");
writeMetric(stream, "pulsar_broker_in_bytes_total",
brokerStats.systemTopicBytesInCounter, cluster, "system_topic", "true");
}

private static void printTopicsCountStats(PrometheusMetricStreams stream, Map<String, Long> namespaceTopicsCount,
Expand Down Expand Up @@ -412,7 +424,8 @@ private static void printNamespaceStats(PrometheusMetricStreams stream, Aggregat
namespace);

stats.bucketDelayedIndexStats.forEach((k, metric) -> {
writeMetric(stream, metric.name, metric.value, cluster, namespace, metric.labelsAndValues);
String[] labels = ArrayUtils.addAll(new String[]{"namespace", namespace}, metric.labelsAndValues);
writeMetric(stream, metric.name, metric.value, cluster, labels);
});

writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
Expand Down Expand Up @@ -534,13 +547,21 @@ private static void writeMetric(PrometheusMetricStreams stream, String metricNam
stream.writeSample(metricName, value, "cluster", cluster);
}

private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value,
String cluster, String... extraLabelsAndValues) {
String[] labels = ArrayUtils.addAll(new String[]{"cluster", cluster}, extraLabelsAndValues);
stream.writeSample(metricName, value, labels);
}


private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
String namespace, String... extraLabelsAndValues) {
String[] labelsAndValues = new String[]{"cluster", cluster, "namespace", namespace};
String[] labels = ArrayUtils.addAll(labelsAndValues, extraLabelsAndValues);
String namespace) {
String[] labels = new String[]{"cluster", cluster, "namespace", namespace};
stream.writeSample(metricName, value, labels);
}



private static void writeReplicationStat(PrometheusMetricStreams stream, String metricName,
AggregatedNamespaceStats namespaceStats,
Function<AggregatedReplicationStats, Number> sampleValueFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class TopicStats {
long msgOutCounter;
@PulsarDeprecatedMetric(newMetricName = OpenTelemetryTopicStats.BYTES_OUT_COUNTER)
long bytesOutCounter;
long systemTopicBytesInCounter;
long bytesOutInternalCounter;
@PulsarDeprecatedMetric // Can be derived from MESSAGE_IN_COUNTER and BYTES_IN_COUNTER
double averageMsgSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,94 @@ public void testPublishRateLimitedTimes() throws Exception {
producer3.close();
}

@Test
public void testBrokerMetrics() throws Exception {
cleanup();
conf.setAdditionalSystemCursorNames(Set.of("test-cursor"));
setup();

Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
// system topic
Producer<byte[]> p3 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/__change_events").create();

Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();

// additional system cursor
Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("test-cursor")
.subscribe();

Consumer<byte[]> c3 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/__change_events")
.subscriptionName("test-v1")
.subscribe();

final int messages = 10;
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
p3.send(message.getBytes());
}

for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
c2.acknowledge(c2.receive());
c3.acknowledge(c3.receive());
}

// unsubscribe to test remove cursor impact on metric
c1.unsubscribe();
c2.unsubscribe();

//admin.topics().unload("persistent://my-property/use/my-ns/my-topic1");

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);

metrics.entries().forEach(e -> {
System.out.println(e.getKey() + ": " + e.getValue());
});

List<Metric> bytesOutTotal = (List<Metric>) metrics.get("pulsar_broker_out_bytes_total");
List<Metric> bytesInTotal = (List<Metric>) metrics.get("pulsar_broker_in_bytes_total");
assertEquals(bytesOutTotal.size(), 2);
assertEquals(bytesInTotal.size(), 2);

double systemOutBytes = 0.0;
double userOutBytes = 0.0;
switch (bytesOutTotal.get(0).tags.get("system_subscription").toString()) {
case "true":
systemOutBytes = bytesOutTotal.get(0).value;
userOutBytes = bytesOutTotal.get(1).value;
case "false":
systemOutBytes = bytesOutTotal.get(1).value;
userOutBytes = bytesOutTotal.get(0).value;
}

double systemInBytes = 0.0;
double userInBytes = 0.0;
switch (bytesInTotal.get(0).tags.get("system_topic").toString()) {
case "true":
systemInBytes = bytesInTotal.get(0).value;
userInBytes = bytesInTotal.get(1).value;
case "false":
systemInBytes = bytesInTotal.get(1).value;
userInBytes = bytesInTotal.get(0).value;
}

assertEquals(userOutBytes / 2, systemOutBytes);
assertEquals(userInBytes / 2, systemInBytes);
assertEquals(userOutBytes + systemOutBytes, userInBytes + systemInBytes);
}

@Test
public void testMetricsTopicCount() throws Exception {
String ns1 = "prop/ns-abc1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,18 @@ public class TopicStatsImpl implements TopicStats {
/** Total messages published to the topic (msg). */
public long msgInCounter;

/** Total bytes published to the system topic (bytes). */
public long systemTopicBytesInCounter;

/** Total bytes delivered to consumer (bytes). */
public long bytesOutCounter;

/** Total messages delivered to consumer (msg). */
public long msgOutCounter;

/** Total bytes delivered to internal cursors. */
public long bytesOutInternalCounter;

/** Average size of published messages (bytes). */
public double averageMsgSize;

Expand Down

0 comments on commit c724f02

Please sign in to comment.