Skip to content

Commit

Permalink
Add metrics for cursor of how many bytes are written and read per cur…
Browse files Browse the repository at this point in the history
…sor/namespace. (#)
  • Loading branch information
Technoboy- committed Jul 29, 2021
1 parent 2cefbcc commit 8c3253e
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,33 @@ public interface ManagedCursorMXBean {
*/
long getPersistZookeeperErrors();

/**
* Update write cursor size.
*
* @param size the size of write to cursor
*/
void updateWriteLedgerSize(long size);

/**
* Update read cursor size.
*
* @param size the size of read from cursor
*/
void updateReadLedgerSize(long size);

/**
* @return the size of write to ledger size
*/
long getWriteLedgerSize();

/**
* @return the size of write to ledger size (accounting for without replicas)
*/
long getWriteLedgerLogicalSize();

/**
* @return the size of read from ledger size
*/
long getReadLedgerSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

LedgerEntry entry = seq.nextElement();
mbean.updateReadLedgerSize(entry.getLength());
PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(entry.getEntry());
Expand Down Expand Up @@ -2599,7 +2600,8 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

checkNotNull(lh);
lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
byte[] data = pi.toByteArray();
lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position,
Expand All @@ -2614,6 +2616,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

mbean.persistToLedger(true);
mbean.updateWriteLedgerSize(data.length);
callback.operationComplete();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean {
private final LongAdder persistZookeeperSucceed = new LongAdder();
private final LongAdder persistZookeeperFailed = new LongAdder();

private final LongAdder writeLedgerSize = new LongAdder();
private final LongAdder writeLedgerLogicalSize = new LongAdder();
private final LongAdder readLedgerSize = new LongAdder();

private final ManagedCursor managedCursor;

public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) {
Expand Down Expand Up @@ -83,4 +87,30 @@ public long getPersistZookeeperSucceed() {
public long getPersistZookeeperErrors() {
return persistZookeeperFailed.longValue();
}

@Override
public void updateWriteLedgerSize(long size) {
writeLedgerSize.add(size * ((ManagedCursorImpl) managedCursor).config.getWriteQuorumSize());
writeLedgerLogicalSize.add(size);
}

@Override
public void updateReadLedgerSize(long size) {
readLedgerSize.add(size);
}

@Override
public long getWriteLedgerSize() {
return writeLedgerSize.longValue();
}

@Override
public long getWriteLedgerLogicalSize() {
return writeLedgerLogicalSize.longValue();
}

@Override
public long getReadLedgerSize() {
return readLedgerSize.longValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ private List<Metrics> aggregate() {

ManagedCursorContainer cursorContainer = ledger.getCursors();
Iterator<ManagedCursor> cursorIterator = cursorContainer.iterator();
long nsTotalNonContiguousDeletedMessagesRange = 0;
long nsPersistLedgerSucceed = 0;
long nsPersistLedgerErrors = 0;
long nsPersistZookeeperSucceed = 0;
long nsPersistZookeeperErrors = 0;
long nsWriteLedgerSize = 0;
long nsWriteLedgerLogicalSize = 0;
long nsReadLedgerSize = 0;

while (cursorIterator.hasNext()) {
ManagedCursorImpl cursor = (ManagedCursorImpl) cursorIterator.next();
Expand All @@ -77,8 +85,32 @@ private List<Metrics> aggregate() {
metrics.put("brk_ml_cursor_persistLedgerErrors", cStats.getPersistLedgerErrors());
metrics.put("brk_ml_cursor_persistZookeeperSucceed", cStats.getPersistZookeeperSucceed());
metrics.put("brk_ml_cursor_persistZookeeperErrors", cStats.getPersistZookeeperErrors());
metrics.put("brk_ml_cursor_writeLedgerSize", cStats.getWriteLedgerSize());
metrics.put("brk_ml_cursor_writeLedgerLogicalSize", cStats.getWriteLedgerLogicalSize());
metrics.put("brk_ml_cursor_readLedgerSize", cStats.getReadLedgerSize());
metricsCollection.add(metrics);

nsTotalNonContiguousDeletedMessagesRange += cursor.getTotalNonContiguousDeletedMessagesRange();
nsPersistLedgerSucceed += cStats.getPersistLedgerSucceed();
nsPersistLedgerErrors += cStats.getPersistLedgerErrors();
nsPersistZookeeperSucceed += cStats.getPersistZookeeperSucceed();
nsPersistZookeeperErrors += cStats.getPersistZookeeperErrors();
nsWriteLedgerSize += cStats.getWriteLedgerSize();
nsWriteLedgerLogicalSize += cStats.getWriteLedgerLogicalSize();
nsReadLedgerSize += cStats.getReadLedgerSize();
}

Metrics metrics = createMetricsByDimension(namespace);
metrics.put("brk_ml_cursor_nonContiguousDeletedMessagesRange",
nsTotalNonContiguousDeletedMessagesRange);
metrics.put("brk_ml_cursor_persistLedgerSucceed", nsPersistLedgerSucceed);
metrics.put("brk_ml_cursor_persistLedgerErrors", nsPersistLedgerErrors);
metrics.put("brk_ml_cursor_persistZookeeperSucceed", nsPersistZookeeperSucceed);
metrics.put("brk_ml_cursor_persistZookeeperErrors", nsPersistZookeeperErrors);
metrics.put("brk_ml_cursor_writeLedgerSize", nsWriteLedgerSize);
metrics.put("brk_ml_cursor_writeLedgerLogicalSize", nsWriteLedgerLogicalSize);
metrics.put("brk_ml_cursor_readLedgerSize", nsReadLedgerSize);
metricsCollection.add(metrics);
}
return metricsCollection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;

import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
Expand Down Expand Up @@ -90,4 +91,66 @@ public void testManagedCursorMetrics() throws Exception {
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
}

@Test
public void testCursorReadWriteMetrics() throws Exception {
final String subName = "read-write";
final String topicName = "persistent://my-namespace/use/my-ns/read-write";
final int messageSize = 10;

ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar);

List<Metrics> metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "-2")
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

for (PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
ledgerHandle.close();
}

for (int i = 0; i < messageSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
if (i % 2 == 0) {
consumer.acknowledge(consumer.receive().getMessageId());
} else {
consumer2.acknowledge(consumer.receive().getMessageId());
}
}
metricsList = metrics.generate();
Assert.assertEquals(metricsList.size(), 3);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 52L);
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 26L);
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
}
}
3 changes: 3 additions & 0 deletions site2/docs/reference-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ brk_ml_cursor_persistLedgerErrors(namespace="", ledger_name="", cursor_name:"")|
brk_ml_cursor_persistZookeeperSucceed(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of acknowledgment states that is persistent to ZooKeeper.
brk_ml_cursor_persistZookeeperErrors(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of ledger errors occurred when acknowledgment states fail to be persistent to ZooKeeper.
brk_ml_cursor_nonContiguousDeletedMessagesRange(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of non-contiguous deleted messages ranges.
brk_ml_cursor_writeLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger.
brk_ml_cursor_writeLedgerLogicalSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger (accounting for without replicas).
brk_ml_cursor_readLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of read from ledger.
### LoadBalancing metrics
All the loadbalancing metrics are labelled with the following labels:
Expand Down

0 comments on commit 8c3253e

Please sign in to comment.