Skip to content

Commit

Permalink
[improve] [broker] Add subscription prefix for internal reader (#23044)
Browse files Browse the repository at this point in the history
### Motivation
We have many system topics, such as

__change_events
__transaction_buffer_snapshot
__transaction_buffer_snapshot_indexes
__transaction_buffer_snapshot_segments
transaction_coordinator_assign
_transaction_log
__transaction_pending_ack
In Pulsar Broker, we create an internal reader to fetch messages from those system topics. Due to we do not specify the subscription prefix, the reader will generate a random subscription name for each reader.

In PIP-355, we introduced a broker-level metric named pulsar_broker_out_bytes_total, which separate the system subscription traffic bytes and user subscription traffic bytes. Due to the internal readers don't have a subscription prefix, we group the internal reader's traffic bytes into user subscription traffic.

### Modifications
In this PR, we introduce a system subscription prefix named __system_reader and group the internal reader's traffic into system subscription traffic bytes in metric pulsar_broker_out_bytes_total.
  • Loading branch information
hangc0276 authored Jul 25, 2024
1 parent 55e468e commit c7310e3
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
Expand Down Expand Up @@ -1213,6 +1214,11 @@ public CompletableFuture<Void> unsubscribe(String subscriptionName) {
SubscriptionStatsImpl stats = sub.getStats(getStatsOptions);
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);

if (isSystemCursor(subscriptionName)
|| subscriptionName.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
}
}
}, brokerService.executor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,8 @@ void removeSubscription(String subscriptionName) {
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);

if (isSystemCursor(subscriptionName)) {
if (isSystemCursor(subscriptionName)
|| subscriptionName.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
}
}
Expand Down Expand Up @@ -2637,7 +2638,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
topicMetricBean.value += v.value;
});

if (isSystemCursor(name)) {
if (isSystemCursor(name) || name.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
stats.bytesOutInternalCounter += subStats.bytesOutCounter;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -69,6 +70,7 @@ protected CompletableFuture<Writer<PulsarEvent>> newWriterAsyncInternal() {
protected CompletableFuture<Reader<PulsarEvent>> newReaderAsyncInternal() {
return client.newReader(avroSchema)
.topic(topicName.toString())
.subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
.startMessageId(MessageId.earliest)
.readCompacted(true)
.createAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;

@Slf4j
Expand Down Expand Up @@ -201,6 +202,7 @@ protected CompletableFuture<Writer<T>> newWriterAsyncInternal() {
protected CompletableFuture<Reader<T>> newReaderAsyncInternal() {
return client.newReader(Schema.AVRO(schemaType))
.topic(topicName.toString())
.subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
.startMessageId(MessageId.earliest)
.readCompacted(true)
.createAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.testng.Assert.fail;
import com.google.common.base.Splitter;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.jsonwebtoken.SignatureAlgorithm;
import io.prometheus.client.Collector;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -87,6 +88,9 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -209,26 +213,31 @@ public void testPublishRateLimitedTimes() throws Exception {
public void testBrokerMetrics() throws Exception {
cleanup();
conf.setAdditionalSystemCursorNames(Set.of("test-cursor"));
conf.setTopicLevelPoliciesEnabled(true);
conf.setSystemTopicEnabled(true);
setup();

Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
admin.tenants().createTenant("test-tenant",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("test-tenant/test-ns", 4);
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://test-tenant/test-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://test-tenant/test-ns/my-topic2").create();
// system topic
Producer<byte[]> p3 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/__change_events").create();
Producer<byte[]> p3 = pulsarClient.newProducer().topic("persistent://test-tenant/test-ns/__test-topic").create();

Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.topic("persistent://test-tenant/test-ns/my-topic1")
.subscriptionName("test")
.subscribe();

// additional system cursor
Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic2")
.topic("persistent://test-tenant/test-ns/my-topic2")
.subscriptionName("test-cursor")
.subscribe();

Consumer<byte[]> c3 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/__change_events")
.topic("persistent://test-tenant/test-ns/__test-topic")
.subscriptionName("test-v1")
.subscribe();

Expand All @@ -250,7 +259,8 @@ public void testBrokerMetrics() throws Exception {
c1.unsubscribe();
c2.unsubscribe();

//admin.topics().unload("persistent://my-property/use/my-ns/my-topic1");
admin.topicPolicies().setRetention("persistent://test-tenant/test-ns/my-topic2",
new RetentionPolicies(60, 1024));

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut);
Expand All @@ -263,33 +273,43 @@ public void testBrokerMetrics() throws Exception {

List<Metric> bytesOutTotal = (List<Metric>) metrics.get("pulsar_broker_out_bytes_total");
List<Metric> bytesInTotal = (List<Metric>) metrics.get("pulsar_broker_in_bytes_total");
List<Metric> topicLevelBytesOutTotal = (List<Metric>) metrics.get("pulsar_out_bytes_total");

assertEquals(bytesOutTotal.size(), 2);
assertEquals(bytesInTotal.size(), 2);
assertEquals(topicLevelBytesOutTotal.size(), 3);

double systemOutBytes = 0.0;
double userOutBytes = 0.0;
switch (bytesOutTotal.get(0).tags.get("system_subscription").toString()) {
case "true":
systemOutBytes = bytesOutTotal.get(0).value;
userOutBytes = bytesOutTotal.get(1).value;
case "false":
systemOutBytes = bytesOutTotal.get(1).value;
userOutBytes = bytesOutTotal.get(0).value;
}

double systemInBytes = 0.0;
double userInBytes = 0.0;
switch (bytesInTotal.get(0).tags.get("system_topic").toString()) {
case "true":
systemInBytes = bytesInTotal.get(0).value;
userInBytes = bytesInTotal.get(1).value;
case "false":
systemInBytes = bytesInTotal.get(1).value;
userInBytes = bytesInTotal.get(0).value;

for (Metric metric : bytesOutTotal) {
if (metric.tags.get("system_subscription").equals("true")) {
systemOutBytes = metric.value;
} else {
userOutBytes = metric.value;
}
}

for (Metric metric : bytesInTotal) {
if (metric.tags.get("system_topic").equals("true")) {
systemInBytes = metric.value;
} else {
userInBytes = metric.value;
}
}

double systemCursorOutBytes = 0.0;
for (Metric metric : topicLevelBytesOutTotal) {
if (metric.tags.get("subscription").startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)
|| metric.tags.get("subscription").equals(Compactor.COMPACTION_SUBSCRIPTION)) {
systemCursorOutBytes = metric.value;
}
}

assertEquals(userOutBytes / 2, systemOutBytes);
assertEquals(userInBytes / 2, systemInBytes);
assertEquals(systemCursorOutBytes, systemInBytes);
assertEquals(userOutBytes / 2, systemOutBytes - systemCursorOutBytes);
assertEquals(userOutBytes + systemOutBytes, userInBytes + systemInBytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class SystemTopicNames {

public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state";

/**
* Prefix for the system reader for all the system topics.
*/
public static final String SYSTEM_READER_PREFIX = "__system_reader";

/**
* The set of all local topic names declared above.
*/
Expand Down

0 comments on commit c7310e3

Please sign in to comment.