diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java index ffc0af28ec6a9f..5cbd013d5236b0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java @@ -70,4 +70,33 @@ public interface ManagedCursorMXBean { */ long getPersistZookeeperErrors(); + /** + * Add write data to a ledger of a cursor (in bytes). + * + * @param size Size of data written to cursor (in bytes) + */ + void addWriteCursorLedgerSize(long size); + + /** + * Add read data from a ledger of a cursor (in bytes). + * + * @param size Size of data read from cursor (in bytes) + */ + void addReadCursorLedgerSize(long size); + + /** + * @return the size of data written to cursor (in bytes) + */ + long getWriteCursorLedgerSize(); + + /** + * @return the size of data written to cursor without replicas (in bytes) + */ + long getWriteCursorLedgerLogicalSize(); + + /** + * @return the size of data read from cursor (in bytes) + */ + long getReadCursorLedgerSize(); + } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index da93df28fd65ea..82b4cfcc9b68ba 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -421,6 +421,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac } LedgerEntry entry = seq.nextElement(); + mbean.addReadCursorLedgerSize(entry.getLength()); PositionInfo positionInfo; try { positionInfo = PositionInfo.parseFrom(entry.getEntry()); @@ -2599,7 +2600,8 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin } checkNotNull(lh); - lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> { + byte[] data = pi.toByteArray(); + lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> { if (rc == BKException.Code.OK) { if (log.isDebugEnabled()) { log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position, @@ -2614,6 +2616,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin } mbean.persistToLedger(true); + mbean.addWriteCursorLedgerSize(data.length); callback.operationComplete(); } else { log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name, diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java index 56a23003b03982..fd9dcc1e17f1c4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java @@ -30,6 +30,10 @@ public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean { private final LongAdder persistZookeeperSucceed = new LongAdder(); private final LongAdder persistZookeeperFailed = new LongAdder(); + private final LongAdder writeCursorLedgerSize = new LongAdder(); + private final LongAdder writeCursorLedgerLogicalSize = new LongAdder(); + private final LongAdder readCursorLedgerSize = new LongAdder(); + private final ManagedCursor managedCursor; public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) { @@ -83,4 +87,30 @@ public long getPersistZookeeperSucceed() { public long getPersistZookeeperErrors() { return persistZookeeperFailed.longValue(); } + + @Override + public void addWriteCursorLedgerSize(final long size) { + writeCursorLedgerSize.add(size * ((ManagedCursorImpl) managedCursor).config.getWriteQuorumSize()); + writeCursorLedgerLogicalSize.add(size); + } + + @Override + public void addReadCursorLedgerSize(final long size) { + readCursorLedgerSize.add(size); + } + + @Override + public long getWriteCursorLedgerSize() { + return writeCursorLedgerSize.longValue(); + } + + @Override + public long getWriteCursorLedgerLogicalSize() { + return writeCursorLedgerLogicalSize.longValue(); + } + + @Override + public long getReadCursorLedgerSize() { + return readCursorLedgerSize.longValue(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java index 7000aae72b4671..cda8e6d25f5894 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java @@ -62,6 +62,14 @@ private List aggregate() { ManagedCursorContainer cursorContainer = ledger.getCursors(); Iterator cursorIterator = cursorContainer.iterator(); + long nsTotalNonContiguousDeletedMessagesRange = 0; + long nsPersistLedgerSucceed = 0; + long nsPersistLedgerErrors = 0; + long nsPersistZookeeperSucceed = 0; + long nsPersistZookeeperErrors = 0; + long nsWriteCursorLedgerSize = 0; + long nsWriteCursorLedgerLogicalSize = 0; + long nsReadCursorLedgerSize = 0; while (cursorIterator.hasNext()) { ManagedCursorImpl cursor = (ManagedCursorImpl) cursorIterator.next(); @@ -77,8 +85,32 @@ private List aggregate() { metrics.put("brk_ml_cursor_persistLedgerErrors", cStats.getPersistLedgerErrors()); metrics.put("brk_ml_cursor_persistZookeeperSucceed", cStats.getPersistZookeeperSucceed()); metrics.put("brk_ml_cursor_persistZookeeperErrors", cStats.getPersistZookeeperErrors()); + metrics.put("brk_ml_cursor_writeLedgerSize", cStats.getWriteCursorLedgerSize()); + metrics.put("brk_ml_cursor_writeLedgerLogicalSize", cStats.getWriteCursorLedgerLogicalSize()); + metrics.put("brk_ml_cursor_readLedgerSize", cStats.getReadCursorLedgerSize()); metricsCollection.add(metrics); + + nsTotalNonContiguousDeletedMessagesRange += cursor.getTotalNonContiguousDeletedMessagesRange(); + nsPersistLedgerSucceed += cStats.getPersistLedgerSucceed(); + nsPersistLedgerErrors += cStats.getPersistLedgerErrors(); + nsPersistZookeeperSucceed += cStats.getPersistZookeeperSucceed(); + nsPersistZookeeperErrors += cStats.getPersistZookeeperErrors(); + nsWriteCursorLedgerSize += cStats.getWriteCursorLedgerSize(); + nsWriteCursorLedgerLogicalSize += cStats.getWriteCursorLedgerLogicalSize(); + nsReadCursorLedgerSize += cStats.getReadCursorLedgerSize(); } + + Metrics metrics = createMetricsByDimension(namespace); + metrics.put("brk_ml_cursor_nonContiguousDeletedMessagesRange", + nsTotalNonContiguousDeletedMessagesRange); + metrics.put("brk_ml_cursor_persistLedgerSucceed", nsPersistLedgerSucceed); + metrics.put("brk_ml_cursor_persistLedgerErrors", nsPersistLedgerErrors); + metrics.put("brk_ml_cursor_persistZookeeperSucceed", nsPersistZookeeperSucceed); + metrics.put("brk_ml_cursor_persistZookeeperErrors", nsPersistZookeeperErrors); + metrics.put("brk_ml_cursor_writeLedgerSize", nsWriteCursorLedgerSize); + metrics.put("brk_ml_cursor_writeLedgerLogicalSize", nsWriteCursorLedgerLogicalSize); + metrics.put("brk_ml_cursor_readLedgerSize", nsReadCursorLedgerSize); + metricsCollection.add(metrics); } return metricsCollection; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java index ff35a4a99c52de..75b9f9bbb0e060 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats; +import lombok.Cleanup; import org.apache.bookkeeper.client.PulsarMockLedgerHandle; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; @@ -90,4 +91,66 @@ public void testManagedCursorMetrics() throws Exception { Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); } + @Test + public void testCursorReadWriteMetrics() throws Exception { + final String subName = "read-write"; + final String topicName = "persistent://my-namespace/use/my-ns/read-write"; + final int messageSize = 10; + + ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar); + + List metricsList = metrics.generate(); + Assert.assertTrue(metricsList.isEmpty()); + + metricsList = metrics.generate(); + Assert.assertTrue(metricsList.isEmpty()); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .subscriptionName(subName) + .subscribe(); + + @Cleanup + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .subscriptionName(subName + "-2") + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + + for (PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) { + ledgerHandle.close(); + } + + for (int i = 0; i < messageSize; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + if (i % 2 == 0) { + consumer.acknowledge(consumer.receive().getMessageId()); + } else { + consumer2.acknowledge(consumer.receive().getMessageId()); + } + } + metricsList = metrics.generate(); + Assert.assertEquals(metricsList.size(), 3); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L); + + Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L); + Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L); + Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L); + + Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 52L); + Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 26L); + Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 73879156fdaaab..bc6e672e6e019d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1036,7 +1036,7 @@ public void testManagedCursorPersistStats() throws Exception { Multimap metrics = parseMetrics(metricsStr); List cm = (List) metrics.get("pulsar_ml_cursor_persistLedgerSucceed"); - assertEquals(cm.size(), 1); + assertEquals(cm.size(), 2); assertEquals(cm.get(0).tags.get("cluster"), "test"); assertEquals(cm.get(0).tags.get("cursor_name"), subName); diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md index aa69b790537166..839029b91a8423 100644 --- a/site2/docs/reference-metrics.md +++ b/site2/docs/reference-metrics.md @@ -286,6 +286,9 @@ brk_ml_cursor_persistLedgerErrors(namespace="", ledger_name="", cursor_name:"")| brk_ml_cursor_persistZookeeperSucceed(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of acknowledgment states that is persistent to ZooKeeper. brk_ml_cursor_persistZookeeperErrors(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of ledger errors occurred when acknowledgment states fail to be persistent to ZooKeeper. brk_ml_cursor_nonContiguousDeletedMessagesRange(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of non-contiguous deleted messages ranges. +brk_ml_cursor_writeLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger. +brk_ml_cursor_writeLedgerLogicalSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger (accounting for without replicas). +brk_ml_cursor_readLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of read from ledger. ### LoadBalancing metrics All the loadbalancing metrics are labelled with the following labels: