Skip to content

Commit

Permalink
Add metrics for writing or reading size of cursor (apache#11500)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Aug 6, 2021
1 parent 09944d9 commit d68e3b5
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,33 @@ public interface ManagedCursorMXBean {
*/
long getPersistZookeeperErrors();

/**
* Add write data to a ledger of a cursor (in bytes).
*
* @param size Size of data written to cursor (in bytes)
*/
void addWriteCursorLedgerSize(long size);

/**
* Add read data from a ledger of a cursor (in bytes).
*
* @param size Size of data read from cursor (in bytes)
*/
void addReadCursorLedgerSize(long size);

/**
* @return the size of data written to cursor (in bytes)
*/
long getWriteCursorLedgerSize();

/**
* @return the size of data written to cursor without replicas (in bytes)
*/
long getWriteCursorLedgerLogicalSize();

/**
* @return the size of data read from cursor (in bytes)
*/
long getReadCursorLedgerSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

LedgerEntry entry = seq.nextElement();
mbean.addReadCursorLedgerSize(entry.getLength());
PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(entry.getEntry());
Expand Down Expand Up @@ -2599,7 +2600,8 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

checkNotNull(lh);
lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
byte[] data = pi.toByteArray();
lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position,
Expand All @@ -2614,6 +2616,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

mbean.persistToLedger(true);
mbean.addWriteCursorLedgerSize(data.length);
callback.operationComplete();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean {
private final LongAdder persistZookeeperSucceed = new LongAdder();
private final LongAdder persistZookeeperFailed = new LongAdder();

private final LongAdder writeCursorLedgerSize = new LongAdder();
private final LongAdder writeCursorLedgerLogicalSize = new LongAdder();
private final LongAdder readCursorLedgerSize = new LongAdder();

private final ManagedCursor managedCursor;

public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) {
Expand Down Expand Up @@ -83,4 +87,30 @@ public long getPersistZookeeperSucceed() {
public long getPersistZookeeperErrors() {
return persistZookeeperFailed.longValue();
}

@Override
public void addWriteCursorLedgerSize(final long size) {
writeCursorLedgerSize.add(size * ((ManagedCursorImpl) managedCursor).config.getWriteQuorumSize());
writeCursorLedgerLogicalSize.add(size);
}

@Override
public void addReadCursorLedgerSize(final long size) {
readCursorLedgerSize.add(size);
}

@Override
public long getWriteCursorLedgerSize() {
return writeCursorLedgerSize.longValue();
}

@Override
public long getWriteCursorLedgerLogicalSize() {
return writeCursorLedgerLogicalSize.longValue();
}

@Override
public long getReadCursorLedgerSize() {
return readCursorLedgerSize.longValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ private List<Metrics> aggregate() {

ManagedCursorContainer cursorContainer = ledger.getCursors();
Iterator<ManagedCursor> cursorIterator = cursorContainer.iterator();
long nsTotalNonContiguousDeletedMessagesRange = 0;
long nsPersistLedgerSucceed = 0;
long nsPersistLedgerErrors = 0;
long nsPersistZookeeperSucceed = 0;
long nsPersistZookeeperErrors = 0;
long nsWriteCursorLedgerSize = 0;
long nsWriteCursorLedgerLogicalSize = 0;
long nsReadCursorLedgerSize = 0;

while (cursorIterator.hasNext()) {
ManagedCursorImpl cursor = (ManagedCursorImpl) cursorIterator.next();
Expand All @@ -77,8 +85,32 @@ private List<Metrics> aggregate() {
metrics.put("brk_ml_cursor_persistLedgerErrors", cStats.getPersistLedgerErrors());
metrics.put("brk_ml_cursor_persistZookeeperSucceed", cStats.getPersistZookeeperSucceed());
metrics.put("brk_ml_cursor_persistZookeeperErrors", cStats.getPersistZookeeperErrors());
metrics.put("brk_ml_cursor_writeLedgerSize", cStats.getWriteCursorLedgerSize());
metrics.put("brk_ml_cursor_writeLedgerLogicalSize", cStats.getWriteCursorLedgerLogicalSize());
metrics.put("brk_ml_cursor_readLedgerSize", cStats.getReadCursorLedgerSize());
metricsCollection.add(metrics);

nsTotalNonContiguousDeletedMessagesRange += cursor.getTotalNonContiguousDeletedMessagesRange();
nsPersistLedgerSucceed += cStats.getPersistLedgerSucceed();
nsPersistLedgerErrors += cStats.getPersistLedgerErrors();
nsPersistZookeeperSucceed += cStats.getPersistZookeeperSucceed();
nsPersistZookeeperErrors += cStats.getPersistZookeeperErrors();
nsWriteCursorLedgerSize += cStats.getWriteCursorLedgerSize();
nsWriteCursorLedgerLogicalSize += cStats.getWriteCursorLedgerLogicalSize();
nsReadCursorLedgerSize += cStats.getReadCursorLedgerSize();
}

Metrics metrics = createMetricsByDimension(namespace);
metrics.put("brk_ml_cursor_nonContiguousDeletedMessagesRange",
nsTotalNonContiguousDeletedMessagesRange);
metrics.put("brk_ml_cursor_persistLedgerSucceed", nsPersistLedgerSucceed);
metrics.put("brk_ml_cursor_persistLedgerErrors", nsPersistLedgerErrors);
metrics.put("brk_ml_cursor_persistZookeeperSucceed", nsPersistZookeeperSucceed);
metrics.put("brk_ml_cursor_persistZookeeperErrors", nsPersistZookeeperErrors);
metrics.put("brk_ml_cursor_writeLedgerSize", nsWriteCursorLedgerSize);
metrics.put("brk_ml_cursor_writeLedgerLogicalSize", nsWriteCursorLedgerLogicalSize);
metrics.put("brk_ml_cursor_readLedgerSize", nsReadCursorLedgerSize);
metricsCollection.add(metrics);
}
return metricsCollection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;

import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
Expand Down Expand Up @@ -90,4 +91,66 @@ public void testManagedCursorMetrics() throws Exception {
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
}

@Test
public void testCursorReadWriteMetrics() throws Exception {
final String subName = "read-write";
final String topicName = "persistent://my-namespace/use/my-ns/read-write";
final int messageSize = 10;

ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar);

List<Metrics> metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "-2")
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

for (PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
ledgerHandle.close();
}

for (int i = 0; i < messageSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
if (i % 2 == 0) {
consumer.acknowledge(consumer.receive().getMessageId());
} else {
consumer2.acknowledge(consumer.receive().getMessageId());
}
}
metricsList = metrics.generate();
Assert.assertEquals(metricsList.size(), 3);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 52L);
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 26L);
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ public void testManagedCursorPersistStats() throws Exception {
Multimap<String, Metric> metrics = parseMetrics(metricsStr);

List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_cursor_persistLedgerSucceed");
assertEquals(cm.size(), 1);
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("cluster"), "test");
assertEquals(cm.get(0).tags.get("cursor_name"), subName);

Expand Down
3 changes: 3 additions & 0 deletions site2/docs/reference-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ brk_ml_cursor_persistLedgerErrors(namespace="", ledger_name="", cursor_name:"")|
brk_ml_cursor_persistZookeeperSucceed(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of acknowledgment states that is persistent to ZooKeeper.
brk_ml_cursor_persistZookeeperErrors(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of ledger errors occurred when acknowledgment states fail to be persistent to ZooKeeper.
brk_ml_cursor_nonContiguousDeletedMessagesRange(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of non-contiguous deleted messages ranges.
brk_ml_cursor_writeLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger.
brk_ml_cursor_writeLedgerLogicalSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger (accounting for without replicas).
brk_ml_cursor_readLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of read from ledger.
### LoadBalancing metrics
All the loadbalancing metrics are labelled with the following labels:
Expand Down

0 comments on commit d68e3b5

Please sign in to comment.