Skip to content

Commit

Permalink
Add metrics for writing or reading size of cursor (#11500)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Jul 29, 2021
1 parent 2cefbcc commit 445f8f8
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,33 @@ public interface ManagedCursorMXBean {
*/
long getPersistZookeeperErrors();

/**
* Update write cursor size.
*
* @param size the size of write to cursor
*/
void updateWriteLedgerSize(long size);

/**
* Update read cursor size.
*
* @param size the size of read from cursor
*/
void updateReadLedgerSize(long size);

/**
* @return the size of write to ledger size
*/
long getWriteLedgerSize();

/**
* @return the size of write to ledger size (accounting for without replicas)
*/
long getWriteLedgerLogicalSize();

/**
* @return the size of read from ledger size
*/
long getReadLedgerSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

LedgerEntry entry = seq.nextElement();
mbean.updateReadLedgerSize(entry.getLength());
PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(entry.getEntry());
Expand Down Expand Up @@ -2599,7 +2600,8 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

checkNotNull(lh);
lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
byte[] data = pi.toByteArray();
lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position,
Expand All @@ -2614,6 +2616,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

mbean.persistToLedger(true);
mbean.updateWriteLedgerSize(data.length);
callback.operationComplete();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean {
private final LongAdder persistZookeeperSucceed = new LongAdder();
private final LongAdder persistZookeeperFailed = new LongAdder();

private final LongAdder writeLedgerSize = new LongAdder();
private final LongAdder writeLedgerLogicalSize = new LongAdder();
private final LongAdder readLedgerSize = new LongAdder();

private final ManagedCursor managedCursor;

public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) {
Expand Down Expand Up @@ -83,4 +87,30 @@ public long getPersistZookeeperSucceed() {
public long getPersistZookeeperErrors() {
return persistZookeeperFailed.longValue();
}

@Override
public void updateWriteLedgerSize(long size) {
writeLedgerSize.add(size * ((ManagedCursorImpl) managedCursor).config.getWriteQuorumSize());
writeLedgerLogicalSize.add(size);
}

@Override
public void updateReadLedgerSize(long size) {
readLedgerSize.add(size);
}

@Override
public long getWriteLedgerSize() {
return writeLedgerSize.longValue();
}

@Override
public long getWriteLedgerLogicalSize() {
return writeLedgerLogicalSize.longValue();
}

@Override
public long getReadLedgerSize() {
return readLedgerSize.longValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ private List<Metrics> aggregate() {

ManagedCursorContainer cursorContainer = ledger.getCursors();
Iterator<ManagedCursor> cursorIterator = cursorContainer.iterator();
long nsTotalNonContiguousDeletedMessagesRange = 0;
long nsPersistLedgerSucceed = 0;
long nsPersistLedgerErrors = 0;
long nsPersistZookeeperSucceed = 0;
long nsPersistZookeeperErrors = 0;
long nsWriteLedgerSize = 0;
long nsWriteLedgerLogicalSize = 0;
long nsReadLedgerSize = 0;

while (cursorIterator.hasNext()) {
ManagedCursorImpl cursor = (ManagedCursorImpl) cursorIterator.next();
Expand All @@ -77,8 +85,32 @@ private List<Metrics> aggregate() {
metrics.put("brk_ml_cursor_persistLedgerErrors", cStats.getPersistLedgerErrors());
metrics.put("brk_ml_cursor_persistZookeeperSucceed", cStats.getPersistZookeeperSucceed());
metrics.put("brk_ml_cursor_persistZookeeperErrors", cStats.getPersistZookeeperErrors());
metrics.put("brk_ml_cursor_writeLedgerSize", cStats.getWriteLedgerSize());
metrics.put("brk_ml_cursor_writeLedgerLogicalSize", cStats.getWriteLedgerLogicalSize());
metrics.put("brk_ml_cursor_readLedgerSize", cStats.getReadLedgerSize());
metricsCollection.add(metrics);

nsTotalNonContiguousDeletedMessagesRange += cursor.getTotalNonContiguousDeletedMessagesRange();
nsPersistLedgerSucceed += cStats.getPersistLedgerSucceed();
nsPersistLedgerErrors += cStats.getPersistLedgerErrors();
nsPersistZookeeperSucceed += cStats.getPersistZookeeperSucceed();
nsPersistZookeeperErrors += cStats.getPersistZookeeperErrors();
nsWriteLedgerSize += cStats.getWriteLedgerSize();
nsWriteLedgerLogicalSize += cStats.getWriteLedgerLogicalSize();
nsReadLedgerSize += cStats.getReadLedgerSize();
}

Metrics metrics = createMetricsByDimension(namespace);
metrics.put("brk_ml_cursor_nonContiguousDeletedMessagesRange",
nsTotalNonContiguousDeletedMessagesRange);
metrics.put("brk_ml_cursor_persistLedgerSucceed", nsPersistLedgerSucceed);
metrics.put("brk_ml_cursor_persistLedgerErrors", nsPersistLedgerErrors);
metrics.put("brk_ml_cursor_persistZookeeperSucceed", nsPersistZookeeperSucceed);
metrics.put("brk_ml_cursor_persistZookeeperErrors", nsPersistZookeeperErrors);
metrics.put("brk_ml_cursor_writeLedgerSize", nsWriteLedgerSize);
metrics.put("brk_ml_cursor_writeLedgerLogicalSize", nsWriteLedgerLogicalSize);
metrics.put("brk_ml_cursor_readLedgerSize", nsReadLedgerSize);
metricsCollection.add(metrics);
}
return metricsCollection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;

import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
Expand Down Expand Up @@ -90,4 +91,66 @@ public void testManagedCursorMetrics() throws Exception {
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
}

@Test
public void testCursorReadWriteMetrics() throws Exception {
final String subName = "read-write";
final String topicName = "persistent://my-namespace/use/my-ns/read-write";
final int messageSize = 10;

ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar);

List<Metrics> metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "-2")
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

for (PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
ledgerHandle.close();
}

for (int i = 0; i < messageSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
if (i % 2 == 0) {
consumer.acknowledge(consumer.receive().getMessageId());
} else {
consumer2.acknowledge(consumer.receive().getMessageId());
}
}
metricsList = metrics.generate();
Assert.assertEquals(metricsList.size(), 3);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 52L);
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 26L);
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
}
}
3 changes: 3 additions & 0 deletions site2/docs/reference-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ brk_ml_cursor_persistLedgerErrors(namespace="", ledger_name="", cursor_name:"")|
brk_ml_cursor_persistZookeeperSucceed(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of acknowledgment states that is persistent to ZooKeeper.
brk_ml_cursor_persistZookeeperErrors(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of ledger errors occurred when acknowledgment states fail to be persistent to ZooKeeper.
brk_ml_cursor_nonContiguousDeletedMessagesRange(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of non-contiguous deleted messages ranges.
brk_ml_cursor_writeLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger.
brk_ml_cursor_writeLedgerLogicalSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger (accounting for without replicas).
brk_ml_cursor_readLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of read from ledger.
### LoadBalancing metrics
All the loadbalancing metrics are labelled with the following labels:
Expand Down

0 comments on commit 445f8f8

Please sign in to comment.