From 08a2f1a6fc68e138c8a51d2331d7e1827c4de602 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Tue, 11 Jun 2024 10:22:19 -0700 Subject: [PATCH 01/10] Add ML cache metrics draft --- .../mledger/ManagedLedgerFactoryMXBean.java | 25 +++ .../impl/ManagedLedgerFactoryMBeanImpl.java | 25 +++ .../cache/PooledByteBufAllocatorStats.java | 68 ++++++ .../OpenTelemetryManagedLedgerCacheStats.java | 207 ++++++++++++++++++ .../metrics/ManagedLedgerCacheMetrics.java | 43 +--- ...nTelemetryManagedLedgerCacheStatsTest.java | 128 +++++++++++ .../org/apache/pulsar/common/stats/Rate.java | 19 +- 7 files changed, 472 insertions(+), 43 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java index 35c26c5dfdb89..43e8196daa9ae 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java @@ -47,26 +47,51 @@ public interface ManagedLedgerFactoryMXBean { */ double getCacheHitsRate(); + /** + * Cumulative number of cache hits. + */ + long getCacheHitsTotal(); + /** * Get the number of cache misses per second. */ double getCacheMissesRate(); + /** + * Cumulative number of cache misses. + */ + long getCacheMissesTotal(); + /** * Get the amount of data is retrieved from the cache in byte/s. */ double getCacheHitsThroughput(); + /** + * Cumulative amount of data retrieved from the cache in bytes. + */ + long getCacheHitsBytesTotal(); + /** * Get the amount of data is retrieved from the bookkeeper in byte/s. */ double getCacheMissesThroughput(); + /** + * Cumulative amount of data retrieved from the bookkeeper in bytes. + */ + long getCacheMissesBytesTotal(); + /** * Get the number of cache evictions during the last minute. */ long getNumberOfCacheEvictions(); + /** + * Cumulative number of cache evictions. + */ + long getNumberOfCacheEvictionsTotal(); + /** * Cumulative number of entries inserted into the cache. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java index cf3d7142d617e..a3038a0e7ff76 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java @@ -99,26 +99,51 @@ public double getCacheHitsRate() { return cacheHits.getRate(); } + @Override + public long getCacheHitsTotal() { + return cacheHits.getTotalCount(); + } + @Override public double getCacheMissesRate() { return cacheMisses.getRate(); } + @Override + public long getCacheMissesTotal() { + return cacheMisses.getTotalCount(); + } + @Override public double getCacheHitsThroughput() { return cacheHits.getValueRate(); } + @Override + public long getCacheHitsBytesTotal() { + return cacheHits.getTotalValue(); + } + @Override public double getCacheMissesThroughput() { return cacheMisses.getValueRate(); } + @Override + public long getCacheMissesBytesTotal() { + return cacheMisses.getTotalValue(); + } + @Override public long getNumberOfCacheEvictions() { return cacheEvictions.getCount(); } + @Override + public long getNumberOfCacheEvictionsTotal() { + return cacheEvictions.getTotalCount(); + } + public long getCacheInsertedEntriesCount() { return insertedEntryCount.sum(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java new file mode 100644 index 0000000000000..4f6a18cb5d934 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import io.netty.buffer.PooledByteBufAllocator; +import lombok.Value; + +@Value +public class PooledByteBufAllocatorStats { + + public long activeAllocations; + public long activeAllocationsSmall; + public long activeAllocationsNormal; + public long activeAllocationsHuge; + + public long totalAllocated; + public long totalUsed; + + public PooledByteBufAllocatorStats(PooledByteBufAllocator allocator) { + long activeAllocations = 0; + long activeAllocationsSmall = 0; + long activeAllocationsNormal = 0; + long activeAllocationsHuge = 0; + long totalAllocated = 0; + long totalUsed = 0; + + for (var arena : allocator.metric().directArenas()) { + activeAllocations += arena.numActiveAllocations(); + activeAllocationsSmall += arena.numActiveSmallAllocations(); + activeAllocationsNormal += arena.numActiveNormalAllocations(); + activeAllocationsHuge += arena.numActiveHugeAllocations(); + + for (var list : arena.chunkLists()) { + for (var chunk : list) { + int size = chunk.chunkSize(); + int used = size - chunk.freeBytes(); + + totalAllocated += size; + totalUsed += used; + } + } + } + + this.activeAllocations = activeAllocations; + this.activeAllocationsSmall = activeAllocationsSmall; + this.activeAllocationsNormal = activeAllocationsNormal; + this.activeAllocationsHuge = activeAllocationsHuge; + + this.totalAllocated = totalAllocated; + this.totalUsed = totalUsed; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java new file mode 100644 index 0000000000000..04ad1d7ba7939 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; +import org.apache.pulsar.broker.PulsarService; + +public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable { + + public static final AttributeKey POOL_ARENA_TYPE = + AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type"); + public enum PoolArenaType { + SMALL, + NORMAL, + HUGE; + public final Attributes attributes = Attributes.of(POOL_ARENA_TYPE, name().toLowerCase()); + } + + public static final AttributeKey POOL_CHUNK_ALLOCATION_TYPE = + AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type"); + public enum PoolChunkAllocationType { + ALLOCATED, + USED; + public final Attributes attributes = Attributes.of(POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase()); + } + + public static final AttributeKey CACHE_ENTRY_STATUS = + AttributeKey.stringKey("pulsar.managed_ledger.entry.status"); + public enum CacheEntryStatus { + ACTIVE, + EVICTED, + INSERTED; + public final Attributes attributes = Attributes.of(CACHE_ENTRY_STATUS, name().toLowerCase()); + } + + public static final AttributeKey CACHE_OPERATION_STATUS = + AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status"); + public enum CacheOperationStatus { + HIT, + MISS; + public final Attributes attributes = Attributes.of(CACHE_OPERATION_STATUS, name().toLowerCase()); + } + + // Replaces pulsar_ml_count + public static final String MANAGED_LEDGER_COUNTER = "pulsar.broker.managed_ledger.count"; + private final ObservableLongMeasurement managedLedgerCounter; + + // Replaces pulsar_ml_cache_evictions + public static final String CACHE_EVICTION_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.eviction.count"; + private final ObservableLongMeasurement cacheEvictionOperationCounter; + + // Replaces 'pulsar_ml_cache_entries', + // 'pulsar_ml_cache_inserted_entries_total', + // 'pulsar_ml_cache_evicted_entries_total' + public static final String CACHE_ENTRY_COUNTER = "pulsar.broker.managed_ledger.cache.entry.count"; + private final ObservableLongMeasurement cacheEntryCounter; + + // Replaces pulsar_ml_cache_used_size + public static final String CACHE_SIZE_COUNTER = "pulsar.broker.managed_ledger.cache.entry.size"; + private final ObservableLongMeasurement cacheSizeCounter; + + // Replaces pulsar_ml_cache_hits_rate, pulsar_ml_cache_misses_rate + public static final String CACHE_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.operation.count"; + private final ObservableLongMeasurement cacheOperationCounter; + + // Replaces pulsar_ml_cache_hits_throughput, pulsar_ml_cache_misses_throughput + public static final String CACHE_OPERATION_BYTES_COUNTER = "pulsar.broker.managed_ledger.cache.operation.size"; + private final ObservableLongMeasurement cacheOperationBytesCounter; + + // Replaces 'pulsar_ml_cache_pool_active_allocations', + // 'pulsar_ml_cache_pool_active_allocations_huge', + // 'pulsar_ml_cache_pool_active_allocations_normal', + // 'pulsar_ml_cache_pool_active_allocations_small' + public static final String CACHE_POOL_ACTIVE_ALLOCATION_COUNTER = + "pulsar.broker.managed_ledger.cache.pool.allocation.active.count"; + private final ObservableLongMeasurement cachePoolActiveAllocationCounter; + + // Replaces ['pulsar_ml_cache_pool_allocated', 'pulsar_ml_cache_pool_used'] + public static final String CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER = + "pulsar.broker.managed_ledger.cache.pool.allocation.size"; + private final ObservableLongMeasurement cachePoolActiveAllocationSizeCounter; + + private final BatchCallback batchCallback; + + public OpenTelemetryManagedLedgerCacheStats(PulsarService pulsar) { + var meter = pulsar.getOpenTelemetry().getMeter(); + + managedLedgerCounter = meter + .upDownCounterBuilder(MANAGED_LEDGER_COUNTER) + .setUnit("{managed_ledger}") + .setDescription("The total number of managed ledgers.") + .buildObserver(); + + cacheEvictionOperationCounter = meter + .counterBuilder(CACHE_EVICTION_OPERATION_COUNTER) + .setUnit("{eviction}") + .setDescription("The total number of cache eviction operations.") + .buildObserver(); + + cacheEntryCounter = meter + .upDownCounterBuilder(CACHE_ENTRY_COUNTER) + .setUnit("{entry}") + .setDescription("The number of entries in the entry cache.") + .buildObserver(); + + cacheSizeCounter = meter + .upDownCounterBuilder(CACHE_SIZE_COUNTER) + .setUnit("{By}") + .setDescription("The byte amount of entries stored in the entry cache.") + .buildObserver(); + + cacheOperationCounter = meter + .counterBuilder(CACHE_OPERATION_COUNTER) + .setUnit("{entry}") + .setDescription("The number of cache operations.") + .buildObserver(); + + cacheOperationBytesCounter = meter + .counterBuilder(CACHE_OPERATION_BYTES_COUNTER) + .setUnit("{By}") + .setDescription("The byte amount of data retrieved from cache operations.") + .buildObserver(); + + cachePoolActiveAllocationCounter = meter + .upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_COUNTER) + .setUnit("{allocation}") + .setDescription("The number of currently active allocations in the direct arena.") + .buildObserver(); + + cachePoolActiveAllocationSizeCounter = meter + .upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER) + .setUnit("{By}") + .setDescription("The memory allocated in the direct arena.") + .buildObserver(); + + + batchCallback = meter.batchCallback(() -> { + if (pulsar.getManagedLedgerFactory() instanceof ManagedLedgerFactoryImpl managedLedgerFactoryImpl) { + recordMetrics(managedLedgerFactoryImpl); + } + }, + managedLedgerCounter, + cacheEvictionOperationCounter, + cacheEntryCounter, + cacheSizeCounter, + cacheOperationCounter, + cacheOperationBytesCounter, + cachePoolActiveAllocationCounter, + cachePoolActiveAllocationSizeCounter); + } + + @Override + public void close() { + batchCallback.close(); + } + + private void recordMetrics(ManagedLedgerFactoryImpl factory) { + var stats = factory.getCacheStats(); + + managedLedgerCounter.record(stats.getNumberOfManagedLedgers()); + cacheEvictionOperationCounter.record(stats.getNumberOfCacheEvictionsTotal()); + + var entriesOut = stats.getCacheEvictedEntriesCount(); + var entriesIn = stats.getCacheInsertedEntriesCount(); + var entriesActive = entriesIn - entriesOut; + cacheEntryCounter.record(entriesActive, CacheEntryStatus.ACTIVE.attributes); + cacheEntryCounter.record(entriesIn, CacheEntryStatus.INSERTED.attributes); + cacheEntryCounter.record(entriesOut, CacheEntryStatus.EVICTED.attributes); + cacheSizeCounter.record(stats.getCacheUsedSize()); + + cacheOperationCounter.record(stats.getCacheHitsTotal(), CacheOperationStatus.HIT.attributes); + cacheOperationBytesCounter.record(stats.getCacheHitsBytesTotal(), CacheOperationStatus.HIT.attributes); + cacheOperationCounter.record(stats.getCacheMissesTotal(), CacheOperationStatus.MISS.attributes); + cacheOperationBytesCounter.record(stats.getCacheMissesBytesTotal(), CacheOperationStatus.MISS.attributes); + + var allocatorStats = new PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR); + cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsSmall, PoolArenaType.SMALL.attributes); + cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsNormal, + PoolArenaType.NORMAL.attributes); + cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsHuge, PoolArenaType.HUGE.attributes); + cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalAllocated, + PoolChunkAllocationType.ALLOCATED.attributes); + cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalUsed, PoolChunkAllocationType.USED.attributes); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java index 890a37aa2d877..9eb4beb72fbf2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java @@ -18,13 +18,10 @@ */ package org.apache.pulsar.broker.stats.metrics; -import io.netty.buffer.PoolArenaMetric; -import io.netty.buffer.PoolChunkListMetric; -import io.netty.buffer.PoolChunkMetric; -import io.netty.buffer.PooledByteBufAllocator; import java.util.ArrayList; import java.util.List; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; +import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.stats.Metrics; @@ -57,37 +54,13 @@ public synchronized List generate() { m.put("brk_ml_cache_hits_throughput", mlCacheStats.getCacheHitsThroughput()); m.put("brk_ml_cache_misses_throughput", mlCacheStats.getCacheMissesThroughput()); - PooledByteBufAllocator allocator = RangeEntryCacheImpl.ALLOCATOR; - long activeAllocations = 0; - long activeAllocationsSmall = 0; - long activeAllocationsNormal = 0; - long activeAllocationsHuge = 0; - long totalAllocated = 0; - long totalUsed = 0; - - for (PoolArenaMetric arena : allocator.metric().directArenas()) { - activeAllocations += arena.numActiveAllocations(); - activeAllocationsSmall += arena.numActiveSmallAllocations(); - activeAllocationsNormal += arena.numActiveNormalAllocations(); - activeAllocationsHuge += arena.numActiveHugeAllocations(); - - for (PoolChunkListMetric list : arena.chunkLists()) { - for (PoolChunkMetric chunk : list) { - int size = chunk.chunkSize(); - int used = size - chunk.freeBytes(); - - totalAllocated += size; - totalUsed += used; - } - } - } - - m.put("brk_ml_cache_pool_allocated", totalAllocated); - m.put("brk_ml_cache_pool_used", totalUsed); - m.put("brk_ml_cache_pool_active_allocations", activeAllocations); - m.put("brk_ml_cache_pool_active_allocations_small", activeAllocationsSmall); - m.put("brk_ml_cache_pool_active_allocations_normal", activeAllocationsNormal); - m.put("brk_ml_cache_pool_active_allocations_huge", activeAllocationsHuge); + var allocatorStats = new PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR); + m.put("brk_ml_cache_pool_allocated", allocatorStats.totalAllocated); + m.put("brk_ml_cache_pool_used", allocatorStats.totalUsed); + m.put("brk_ml_cache_pool_active_allocations", allocatorStats.activeAllocations); + m.put("brk_ml_cache_pool_active_allocations_small", allocatorStats.activeAllocationsSmall); + m.put("brk_ml_cache_pool_active_allocations_normal", allocatorStats.activeAllocationsNormal); + m.put("brk_ml_cache_pool_active_allocations_huge", allocatorStats.activeAllocationsHuge); metrics.clear(); metrics.add(m); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java new file mode 100644 index 0000000000000..83a891ce23e9b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.common.Attributes; +import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CacheEntryStatus; +import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CacheOperationStatus; +import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.PoolArenaType; +import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.PoolChunkAllocationType; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER; +import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER; +import static org.assertj.core.api.Assertions.assertThat; + +public class OpenTelemetryManagedLedgerCacheStatsTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { + super.customizeMainPulsarTestContextBuilder(builder); + builder.enableOpenTelemetry(true); + } + + @Test + public void testManagedLedgerCacheStats() throws Exception { + var topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testManagedLedgerCacheStats"); + + @Cleanup + var producer = pulsarClient.newProducer().topic(topicName).create(); + + @Cleanup + var consumer1 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName(BrokerTestUtil.newUniqueName("sub")) + .subscribe(); + + @Cleanup + var consumer2 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName(BrokerTestUtil.newUniqueName("sub")) + .subscribe(); + + producer.send("test".getBytes()); + consumer1.receive(); + + Awaitility.await().untilAsserted(() -> { + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.ACTIVE.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.INSERTED.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.EVICTED.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(metrics, CACHE_SIZE_COUNTER, Attributes.empty(), + value -> assertThat(value).isNotNegative()); + }); + + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + + assertMetricLongSumValue(metrics, MANAGED_LEDGER_COUNTER, Attributes.empty(), 2); + assertMetricLongSumValue(metrics, CACHE_EVICTION_OPERATION_COUNTER, Attributes.empty(), 0); + + assertMetricLongSumValue(metrics, CACHE_OPERATION_COUNTER, CacheOperationStatus.HIT.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(metrics, CACHE_OPERATION_BYTES_COUNTER, CacheOperationStatus.HIT.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(metrics, CACHE_OPERATION_COUNTER, CacheOperationStatus.MISS.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_OPERATION_BYTES_COUNTER, CacheOperationStatus.MISS.attributes, + value -> assertThat(value).isNotNegative()); + + assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.SMALL.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.NORMAL.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.HUGE.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER, + PoolChunkAllocationType.ALLOCATED.attributes, value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER, + PoolChunkAllocationType.USED.attributes, value -> assertThat(value).isNotNegative()); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java index 886e31ab71216..41402985ceb37 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java @@ -28,6 +28,7 @@ public class Rate { private final LongAdder valueAdder = new LongAdder(); private final LongAdder countAdder = new LongAdder(); private final LongAdder totalCountAdder = new LongAdder(); + private final LongAdder totalValueAdder = new LongAdder(); // Computed stats private long count = 0L; @@ -37,20 +38,18 @@ public class Rate { private long lastCalculatedTime = System.nanoTime(); public void recordEvent() { - countAdder.increment(); - totalCountAdder.increment(); + recordMultipleEvents(1, 0); } public void recordEvent(long value) { - valueAdder.add(value); - countAdder.increment(); - totalCountAdder.increment(); + recordMultipleEvents(1, value); } - public void recordMultipleEvents(long events, long totalValue) { + public void recordMultipleEvents(long totalCount, long totalValue) { valueAdder.add(totalValue); - countAdder.add(events); - totalCountAdder.add(events); + totalValueAdder.add(totalValue); + countAdder.add(totalCount); + totalCountAdder.add(totalCount); } public void calculateRate() { @@ -88,4 +87,8 @@ public double getValueRate() { public long getTotalCount() { return this.totalCountAdder.longValue(); } + + public long getTotalValue() { + return totalValueAdder.sum(); + } } From 252ee096f2bf27934f489aa918ef69820793102e Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Tue, 11 Jun 2024 10:48:25 -0700 Subject: [PATCH 02/10] Pass OpenTelemetry instance to ManagedLedgerClientFactory --- managed-ledger/pom.xml | 12 ++++++++ .../impl/ManagedLedgerFactoryImpl.java | 14 ++++++---- .../broker/ManagedLedgerClientFactory.java | 8 ++++-- .../apache/pulsar/broker/PulsarService.java | 2 +- .../stats/PulsarBrokerOpenTelemetry.java | 5 +++- .../broker/storage/ManagedLedgerStorage.java | 9 ++++-- .../broker/testcontext/PulsarTestContext.java | 6 ++-- .../client/impl/SequenceIdWithErrorTest.java | 3 +- .../pulsar/opentelemetry/Constants.java | 28 +++++++++++++++++++ 9 files changed, 71 insertions(+), 16 deletions(-) create mode 100644 pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index d8b31220d51be..60a4edab95b77 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -71,6 +71,12 @@ ${project.version} + + ${project.groupId} + pulsar-opentelemetry + ${project.version} + + com.google.guava guava @@ -120,6 +126,12 @@ test + + io.opentelemetry + opentelemetry-sdk-testing + test + + org.slf4j slf4j-api diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index ed803a81462e1..c8afd226156a0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -25,6 +25,7 @@ import com.google.common.base.Predicates; import com.google.common.collect.Maps; import io.netty.util.concurrent.DefaultThreadFactory; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -150,7 +151,7 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfi ManagedLedgerFactoryConfig config) throws Exception { this(metadataStore, new DefaultBkFactory(bkClientConfiguration), - true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE); + true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop()); } public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper) @@ -169,21 +170,24 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ManagedLedgerFactoryConfig config) throws Exception { this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */, - config, NullStatsLogger.INSTANCE); + config, NullStatsLogger.INSTANCE, OpenTelemetry.noop()); } public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, - ManagedLedgerFactoryConfig config, StatsLogger statsLogger) + ManagedLedgerFactoryConfig config, StatsLogger statsLogger, + OpenTelemetry openTelemetry) throws Exception { this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */, - config, statsLogger); + config, statsLogger, openTelemetry); } private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, boolean isBookkeeperManaged, - ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception { + ManagedLedgerFactoryConfig config, + StatsLogger statsLogger, + OpenTelemetry openTelemetry) throws Exception { MetadataCompressionConfig compressionConfigForManagedLedgerInfo = config.getCompressionConfigForManagedLedgerInfo(); MetadataCompressionConfig compressionConfigForManagedCursorInfo = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 6ed95f167a15a..9bbc2857863ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import io.netty.channel.EventLoopGroup; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import java.util.Map; import java.util.Optional; @@ -54,9 +55,11 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync(); private StatsProvider statsProvider = new NullStatsProvider(); + @Override public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, BookKeeperClientFactory bookkeeperProvider, - EventLoopGroup eventLoopGroup) throws Exception { + EventLoopGroup eventLoopGroup, + OpenTelemetry openTelemetry) throws Exception { ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L); managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark()); @@ -109,7 +112,8 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata try { this.managedLedgerFactory = - new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger); + new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger, + openTelemetry); } catch (Exception e) { statsProvider.stop(); defaultBkClient.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2e9f9dc6b0105..05104444dc769 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1050,7 +1050,7 @@ protected OrderedExecutor newOrderedExecutor() { protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception { return ManagedLedgerStorage.create( config, localMetadataStore, - bkClientFactory, ioEventLoopGroup + bkClientFactory, ioEventLoopGroup, openTelemetry.getOpenTelemetryService().getOpenTelemetry() ); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java index 01ca65d2cc537..178da8b84983f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java @@ -26,11 +26,14 @@ import lombok.Getter; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.opentelemetry.Constants; import org.apache.pulsar.opentelemetry.OpenTelemetryService; public class PulsarBrokerOpenTelemetry implements Closeable { public static final String SERVICE_NAME = "pulsar-broker"; + + @Getter private final OpenTelemetryService openTelemetryService; @Getter @@ -44,7 +47,7 @@ public PulsarBrokerOpenTelemetry(ServiceConfiguration config, .serviceVersion(PulsarVersion.getVersion()) .builderCustomizer(builderCustomizer) .build(); - meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker"); + meter = openTelemetryService.getOpenTelemetry().getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java index 0b5a102eed1e0..944d2badf75f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.storage; import io.netty.channel.EventLoopGroup; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; @@ -47,7 +48,8 @@ public interface ManagedLedgerStorage extends AutoCloseable { void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, BookKeeperClientFactory bookkeeperProvider, - EventLoopGroup eventLoopGroup) throws Exception; + EventLoopGroup eventLoopGroup, + OpenTelemetry openTelemetry) throws Exception; /** * Return the factory to create {@link ManagedLedgerFactory}. @@ -87,11 +89,12 @@ void initialize(ServiceConfiguration conf, static ManagedLedgerStorage create(ServiceConfiguration conf, MetadataStoreExtended metadataStore, BookKeeperClientFactory bkProvider, - EventLoopGroup eventLoopGroup) throws Exception { + EventLoopGroup eventLoopGroup, + OpenTelemetry openTelemetry) throws Exception { ManagedLedgerStorage storage = Reflections.createInstance(conf.getManagedLedgerStorageClassName(), ManagedLedgerStorage.class, Thread.currentThread().getContextClassLoader()); - storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup); + storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup, openTelemetry); return storage; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index 09cd4f7cb1a93..3d79a17a90f50 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.io.IOException; @@ -843,9 +844,8 @@ private static ManagedLedgerStorage createManagedLedgerClientFactory(BookKeeper @Override public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, - BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup) - throws Exception { - + BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup, + OpenTelemetry openTelemetry) { } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java index 7d330bb82addd..1395424b14123 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.opentelemetry.api.OpenTelemetry; import java.util.Collections; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -60,7 +61,7 @@ public void testCheckSequenceId() throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory(); clientFactory.initialize(pulsar.getConfiguration(), pulsar.getLocalMetadataStore(), - pulsar.getBookKeeperClientFactory(), eventLoopGroup); + pulsar.getBookKeeperClientFactory(), eventLoopGroup, OpenTelemetry.noop()); ManagedLedgerFactory mlFactory = clientFactory.getManagedLedgerFactory(); ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding()); ml.close(); diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java new file mode 100644 index 0000000000000..6d61cafb5a01a --- /dev/null +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.opentelemetry; + +/** + * Common OpenTelemetry constants to be used by Pulsar components. + */ +public interface Constants { + + String BROKER_INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.broker"; + +} From b0fb621646b87d51b1aa96a4ccb72cd5f7ae41ff Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Tue, 11 Jun 2024 11:03:23 -0700 Subject: [PATCH 03/10] Relocate stats class to ML package --- .../OpenTelemetryManagedLedgerCacheStats.java | 15 +++++------- .../impl/ManagedLedgerFactoryImpl.java | 6 +++++ ...nTelemetryManagedLedgerCacheStatsTest.java | 24 +++++++++---------- 3 files changed, 24 insertions(+), 21 deletions(-) rename {pulsar-broker/src/main/java/org/apache/pulsar/broker/stats => managed-ledger/src/main/java/org/apache/bookkeeper/mledger}/OpenTelemetryManagedLedgerCacheStats.java (95%) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java similarity index 95% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java index 04ad1d7ba7939..d936870507ead 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.stats; +package org.apache.bookkeeper.mledger; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.BatchCallback; @@ -25,7 +26,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; -import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.opentelemetry.Constants; public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable { @@ -104,8 +105,8 @@ public enum CacheOperationStatus { private final BatchCallback batchCallback; - public OpenTelemetryManagedLedgerCacheStats(PulsarService pulsar) { - var meter = pulsar.getOpenTelemetry().getMeter(); + public OpenTelemetryManagedLedgerCacheStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) { + var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME); managedLedgerCounter = meter .upDownCounterBuilder(MANAGED_LEDGER_COUNTER) @@ -156,11 +157,7 @@ public OpenTelemetryManagedLedgerCacheStats(PulsarService pulsar) { .buildObserver(); - batchCallback = meter.batchCallback(() -> { - if (pulsar.getManagedLedgerFactory() instanceof ManagedLedgerFactoryImpl managedLedgerFactoryImpl) { - recordMetrics(managedLedgerFactoryImpl); - } - }, + batchCallback = meter.batchCallback(() -> recordMetrics(factory), managedLedgerCounter, cacheEvictionOperationCounter, cacheEntryCounter, diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index c8afd226156a0..7b25783483cf2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -85,6 +85,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.lang3.tuple.Pair; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -120,6 +121,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private volatile long cacheEvictionTimeThresholdNanos; private final MetadataStore metadataStore; + private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats; + //indicate whether shutdown() is called. private volatile boolean closed; @@ -225,6 +228,8 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, closed = false; metadataStore.registerSessionListener(this::handleMetadataStoreNotification); + + openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this); } static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy { @@ -617,6 +622,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { })); }).thenAcceptAsync(__ -> { //wait for tasks in scheduledExecutor executed. + openTelemetryCacheStats.close(); scheduledExecutor.shutdownNow(); entryCacheManager.clear(); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java index 83a891ce23e9b..22e7fceb9a833 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java @@ -22,10 +22,10 @@ import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CacheEntryStatus; -import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CacheOperationStatus; -import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.PoolArenaType; -import org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.PoolChunkAllocationType; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CacheEntryStatus; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CacheOperationStatus; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.PoolArenaType; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.PoolChunkAllocationType; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.awaitility.Awaitility; @@ -34,14 +34,14 @@ import org.testng.annotations.Test; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER; -import static org.apache.pulsar.broker.stats.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER; import static org.assertj.core.api.Assertions.assertThat; public class OpenTelemetryManagedLedgerCacheStatsTest extends BrokerTestBase { From 7d46f560ef60ed55b7c6eda06435d431192a4665 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Tue, 11 Jun 2024 11:21:06 -0700 Subject: [PATCH 04/10] Relocate attributes --- .../OpenTelemetryManagedLedgerCacheStats.java | 40 ++----------------- .../impl/ManagedLedgerFactoryImpl.java | 2 +- ...nTelemetryManagedLedgerCacheStatsTest.java | 29 +++++++------- .../OpenTelemetryAttributes.java | 33 +++++++++++++++ 4 files changed, 52 insertions(+), 52 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java index d936870507ead..13e7ed6ac6799 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java @@ -19,51 +19,19 @@ package org.apache.bookkeeper.mledger; import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.BatchCallback; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; import org.apache.pulsar.opentelemetry.Constants; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType; public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable { - public static final AttributeKey POOL_ARENA_TYPE = - AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type"); - public enum PoolArenaType { - SMALL, - NORMAL, - HUGE; - public final Attributes attributes = Attributes.of(POOL_ARENA_TYPE, name().toLowerCase()); - } - - public static final AttributeKey POOL_CHUNK_ALLOCATION_TYPE = - AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type"); - public enum PoolChunkAllocationType { - ALLOCATED, - USED; - public final Attributes attributes = Attributes.of(POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase()); - } - - public static final AttributeKey CACHE_ENTRY_STATUS = - AttributeKey.stringKey("pulsar.managed_ledger.entry.status"); - public enum CacheEntryStatus { - ACTIVE, - EVICTED, - INSERTED; - public final Attributes attributes = Attributes.of(CACHE_ENTRY_STATUS, name().toLowerCase()); - } - - public static final AttributeKey CACHE_OPERATION_STATUS = - AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status"); - public enum CacheOperationStatus { - HIT, - MISS; - public final Attributes attributes = Attributes.of(CACHE_OPERATION_STATUS, name().toLowerCase()); - } - // Replaces pulsar_ml_count public static final String MANAGED_LEDGER_COUNTER = "pulsar.broker.managed_ledger.count"; private final ObservableLongMeasurement managedLedgerCounter; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 7b25783483cf2..973ae364d60d0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -69,6 +69,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo; import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo; import org.apache.bookkeeper.mledger.MetadataCompressionConfig; +import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback; @@ -85,7 +86,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.lang3.tuple.Pair; -import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java index 22e7fceb9a833..c3a4a2e054ef3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java @@ -18,32 +18,31 @@ */ package org.apache.pulsar.broker.stats; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER; +import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CacheEntryStatus; -import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CacheOperationStatus; -import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.PoolArenaType; -import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.PoolChunkAllocationType; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType; import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER; -import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER; -import static org.assertj.core.api.Assertions.assertThat; - public class OpenTelemetryManagedLedgerCacheStatsTest extends BrokerTestBase { @BeforeMethod(alwaysRun = true) diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 9783f0e754f63..5120864020043 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -127,4 +127,37 @@ enum BacklogQuotaType { TIME; public final Attributes attributes = Attributes.of(PULSAR_BACKLOG_QUOTA_TYPE, name().toLowerCase()); } + + // Managed Ledger Attributes + AttributeKey ML_POOL_ARENA_TYPE = AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type"); + enum PoolArenaType { + SMALL, + NORMAL, + HUGE; + public final Attributes attributes = Attributes.of(ML_POOL_ARENA_TYPE, name().toLowerCase()); + } + + AttributeKey ML_POOL_CHUNK_ALLOCATION_TYPE = + AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type"); + enum PoolChunkAllocationType { + ALLOCATED, + USED; + public final Attributes attributes = Attributes.of(ML_POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase()); + } + + AttributeKey ML_CACHE_ENTRY_STATUS = AttributeKey.stringKey("pulsar.managed_ledger.cache.entry.status"); + enum CacheEntryStatus { + ACTIVE, + EVICTED, + INSERTED; + public final Attributes attributes = Attributes.of(ML_CACHE_ENTRY_STATUS, name().toLowerCase()); + } + + AttributeKey ML_CACHE_OPERATION_STATUS = + AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status"); + enum CacheOperationStatus { + HIT, + MISS; + public final Attributes attributes = Attributes.of(ML_CACHE_OPERATION_STATUS, name().toLowerCase()); + } } From 5651c600894fea1f325abb5285a80914e5d966e2 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Tue, 11 Jun 2024 11:36:05 -0700 Subject: [PATCH 05/10] Cleanup Rate --- .../main/java/org/apache/pulsar/common/stats/Rate.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java index 41402985ceb37..faac9d7fb31d2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Rate.java @@ -38,11 +38,15 @@ public class Rate { private long lastCalculatedTime = System.nanoTime(); public void recordEvent() { - recordMultipleEvents(1, 0); + countAdder.increment(); + totalCountAdder.increment(); } public void recordEvent(long value) { - recordMultipleEvents(1, value); + valueAdder.add(value); + totalValueAdder.add(value); + countAdder.increment(); + totalCountAdder.increment(); } public void recordMultipleEvents(long totalCount, long totalValue) { From 17ff477f8df832085de0fd8d1fc82496e9878421 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 12 Jun 2024 11:01:45 -0700 Subject: [PATCH 06/10] Cosmetic fixes --- .../opentelemetry/OpenTelemetryAttributes.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 5120864020043..0cc7cf245f11a 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -129,6 +129,10 @@ enum BacklogQuotaType { } // Managed Ledger Attributes + + /** + * The type of the pool arena. + */ AttributeKey ML_POOL_ARENA_TYPE = AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type"); enum PoolArenaType { SMALL, @@ -137,6 +141,9 @@ enum PoolArenaType { public final Attributes attributes = Attributes.of(ML_POOL_ARENA_TYPE, name().toLowerCase()); } + /** + * The type of the pool chunk allocation. + */ AttributeKey ML_POOL_CHUNK_ALLOCATION_TYPE = AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type"); enum PoolChunkAllocationType { @@ -145,6 +152,9 @@ enum PoolChunkAllocationType { public final Attributes attributes = Attributes.of(ML_POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase()); } + /** + * The status of the cache entry. + */ AttributeKey ML_CACHE_ENTRY_STATUS = AttributeKey.stringKey("pulsar.managed_ledger.cache.entry.status"); enum CacheEntryStatus { ACTIVE, @@ -153,6 +163,9 @@ enum CacheEntryStatus { public final Attributes attributes = Attributes.of(ML_CACHE_ENTRY_STATUS, name().toLowerCase()); } + /** + * The result of the cache operation. + */ AttributeKey ML_CACHE_OPERATION_STATUS = AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status"); enum CacheOperationStatus { From 95ae2e1658a4676511af24f2a6d846949ea284da Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 12 Jun 2024 13:29:19 -0700 Subject: [PATCH 07/10] Add ML cursor metrics draft --- .../bookkeeper/mledger/ManagedCursor.java | 9 ++ .../mledger/impl/ManagedCursorAttributes.java | 63 ++++++++ .../mledger/impl/ManagedCursorImpl.java | 13 ++ .../impl/OpenTelemetryManagedCursorStats.java | 135 ++++++++++++++++++ .../stats/ManagedCursorMetricsTest.java | 98 +++++++++++-- 5 files changed, 306 insertions(+), 12 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorAttributes.java create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 227b5429abf77..c42396f0ef2d8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback; +import org.apache.bookkeeper.mledger.impl.ManagedCursorAttributes; import org.apache.bookkeeper.mledger.impl.PositionImpl; /** @@ -878,4 +879,12 @@ default boolean periodicRollover() { return false; } + /** + * Get the attributes associated with the cursor. + * + * @return the attributes associated with the cursor + */ + default ManagedCursorAttributes getManagedCursorAttributes() { + return new ManagedCursorAttributes(this); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorAttributes.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorAttributes.java new file mode 100644 index 0000000000000..c746bc4409d2a --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorAttributes.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import lombok.Data; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; + +@Data +public class ManagedCursorAttributes { + + public static final AttributeKey PULSAR_MANAGED_CURSOR_NAME = + AttributeKey.stringKey("pulsar.managed_cursor.name"); + + public static final AttributeKey PULSAR_MANAGED_CURSOR_OPERATION_STATUS = + AttributeKey.stringKey("pulsar.managed_cursor.operation.status"); + + @VisibleForTesting + public enum OperationStatus { + SUCCESS, + FAILURE; + public final Attributes attributes = + Attributes.of(PULSAR_MANAGED_CURSOR_OPERATION_STATUS, name().toLowerCase()); + } + + private final Attributes attributes; + private final Attributes attributesOperationSucceed; + private final Attributes attributesOperationFailure; + + public ManagedCursorAttributes(ManagedCursor cursor) { + var mlName = cursor.getManagedLedger().getName(); + var topicName = TopicName.get(TopicName.fromPersistenceNamingEncoding(mlName)); + attributes = Attributes.of( + PULSAR_MANAGED_CURSOR_NAME, cursor.getName(), + ManagedLedgerAttributes.PULSAR_MANAGER_LEDGER_NAME, mlName, + OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace() + ); + attributesOperationSucceed = + Attributes.builder().putAll(attributes).putAll(OperationStatus.SUCCESS.attributes).build(); + attributesOperationFailure = + Attributes.builder().putAll(attributes).putAll(OperationStatus.FAILURE.attributes).build(); + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 1d2065ef8e392..9896c900b004d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -291,6 +291,11 @@ public enum State { protected final ManagedCursorMXBean mbean; + private volatile ManagedCursorAttributes managedCursorAttributes; + private static final AtomicReferenceFieldUpdater ATTRIBUTES_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, ManagedCursorAttributes.class, + "managedCursorAttributes"); + @SuppressWarnings("checkstyle:javadoctype") public interface VoidCallback { void operationComplete(); @@ -3681,4 +3686,12 @@ public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) thro } return newNonDurableCursor; } + + @Override + public ManagedCursorAttributes getManagedCursorAttributes() { + if (managedCursorAttributes != null) { + return managedCursorAttributes; + } + return ATTRIBUTES_UPDATER.updateAndGet(this, old -> old != null ? old : new ManagedCursorAttributes(this)); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java new file mode 100644 index 0000000000000..72f5bae0c38e2 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import com.google.common.collect.Streams; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import org.apache.bookkeeper.mledger.ManagedCursor; + +public class OpenTelemetryManagedCursorStats implements AutoCloseable { + + // Replaces ['pulsar_ml_cursor_persistLedgerSucceed', 'pulsar_ml_cursor_persistLedgerErrors'] + public static final String PERSIST_OPERATION_COUNTER = "pulsar.broker.managed_ledger.persist.operation.count"; + private final ObservableLongMeasurement persistOperationCounter; + + // Replaces ['pulsar_ml_cursor_persistZookeeperSucceed', 'pulsar_ml_cursor_persistZookeeperErrors'] + public static final String PERSIST_OPERATION_METADATA_STORE_COUNTER = + "pulsar.broker.managed_ledger.persist.mds.operation.count"; + private final ObservableLongMeasurement persistOperationMetadataStoreCounter; + + // Replaces pulsar_ml_cursor_nonContiguousDeletedMessagesRange + public static final String NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER = + "pulsar.broker.managed_ledger.message_range.count"; + private final ObservableLongMeasurement nonContiguousMessageRangeCounter; + + // Replaces pulsar_ml_cursor_writeLedgerSize + public static final String OUTGOING_BYTE_COUNTER = "pulsar.broker.managed_ledger.cursor.outgoing.size"; + private final ObservableLongMeasurement outgoingByteCounter; + + // Replaces pulsar_ml_cursor_writeLedgerLogicalSize + public static final String OUTGOING_BYTE_LOGICAL_COUNTER = + "pulsar.broker.managed_ledger.cursor.outgoing.logical.size"; + private final ObservableLongMeasurement outgoingByteLogicalCounter; + + // Replaces pulsar_ml_cursor_readLedgerSize + public static final String INCOMING_BYTE_COUNTER = "pulsar.broker.managed_ledger.cursor.incoming.size"; + private final ObservableLongMeasurement incomingByteCounter; + + private final BatchCallback batchCallback; + + public OpenTelemetryManagedCursorStats(ManagedLedgerFactoryImpl factory, OpenTelemetry openTelemetry) { + var meter = openTelemetry.getMeter("pulsar.managed_ledger.cursor"); // TODO + + persistOperationCounter = meter + .counterBuilder(PERSIST_OPERATION_COUNTER) + .setUnit("{operation}") + .setDescription("The number of acknowledgment operations on the ledger.") + .buildObserver(); + + persistOperationMetadataStoreCounter = meter + .counterBuilder(PERSIST_OPERATION_METADATA_STORE_COUNTER) + .setUnit("{operation}") + .setDescription("The number of acknowledgment operations in the metadata store.") + .buildObserver(); + + nonContiguousMessageRangeCounter = meter + .upDownCounterBuilder(NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER) + .setUnit("{range}") + .setDescription("The number of non-contiguous deleted messages ranges.") + .buildObserver(); + + outgoingByteCounter = meter + .counterBuilder(OUTGOING_BYTE_COUNTER) + .setUnit("{By}") + .setDescription("The size of write to ledger.") + .buildObserver(); + + outgoingByteLogicalCounter = meter + .counterBuilder(OUTGOING_BYTE_LOGICAL_COUNTER) + .setUnit("{By}") + .setDescription("The size of write to ledger (not including replicas).") + .buildObserver(); + + incomingByteCounter = meter + .counterBuilder(INCOMING_BYTE_COUNTER) + .setUnit("{By}") + .setDescription("The size of read from ledger.") + .buildObserver(); + + batchCallback = meter.batchCallback(() -> factory.getManagedLedgers() + .values() + .stream() + .map(ManagedLedgerImpl::getCursors) + .flatMap(Streams::stream) + .forEach(this::recordMetrics), + persistOperationCounter, + persistOperationMetadataStoreCounter, + nonContiguousMessageRangeCounter, + outgoingByteCounter, + outgoingByteLogicalCounter, + incomingByteCounter); + } + + @Override + public void close() { + batchCallback.close(); + } + + private void recordMetrics(ManagedCursor cursor) { + var stats = cursor.getStats(); + var cursorAttributesSet = cursor.getManagedCursorAttributes(); + var attributes = cursorAttributesSet.getAttributes(); + var attributesSucceed = cursorAttributesSet.getAttributesOperationSucceed(); + var attributesFailed = cursorAttributesSet.getAttributesOperationFailure(); + + persistOperationCounter.record(stats.getPersistLedgerSucceed(), attributesSucceed); + persistOperationCounter.record(stats.getPersistLedgerErrors(), attributesFailed); + + persistOperationMetadataStoreCounter.record(stats.getPersistZookeeperSucceed(), attributesSucceed); + persistOperationMetadataStoreCounter.record(stats.getPersistZookeeperErrors(), attributesFailed); + + nonContiguousMessageRangeCounter.record(cursor.getTotalNonContiguousDeletedMessagesRange(), attributes); + + outgoingByteCounter.record(stats.getWriteCursorLedgerSize(), attributes); + outgoingByteLogicalCounter.record(stats.getWriteCursorLedgerLogicalSize(), attributes); + incomingByteCounter.record(stats.getReadCursorLedgerSize(), attributes); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java index baa4bea570155..17a111f48d282 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java @@ -18,20 +18,24 @@ */ package org.apache.pulsar.broker.stats; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursorMXBean; -import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedCursorAttributes; +import org.apache.bookkeeper.mledger.impl.OpenTelemetryManagedCursorStats; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; @@ -80,6 +84,12 @@ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws return PulsarTestClient.create(clientBuilder); } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder); + pulsarTestContextBuilder.enableOpenTelemetry(true); + } + /*** * This method has overridden these case: * brk_ml_cursor_persistLedgerSucceed @@ -115,10 +125,7 @@ public void testManagedCursorMetrics() throws Exception { .topic(topicName) .enableBatching(false) .create(); - final PersistentSubscription persistentSubscription = - (PersistentSubscription) pulsar.getBrokerService() - .getTopic(topicName, false).get().get().getSubscription(subName); - final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor(); + var managedCursor = getManagedCursor(topicName, subName); ManagedCursorMXBean managedCursorMXBean = managedCursor.getStats(); // Assert. metricsList = metrics.generate(); @@ -128,6 +135,19 @@ public void testManagedCursorMetrics() throws Exception { Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + // Validate OpenTelemetry metrics as well + var attributesSet = new ManagedCursorAttributes(managedCursor); + var otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationSucceed(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationSucceed(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER, + attributesSet.getAttributes(), 0); /** * 1. Send many messages, and only ack half. After the cursor data is written to BK, * verify "brk_ml_cursor_persistLedgerSucceed" and "brk_ml_cursor_nonContiguousDeletedMessagesRange". @@ -156,6 +176,17 @@ public void testManagedCursorMetrics() throws Exception { Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationSucceed(), value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationSucceed(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER, + attributesSet.getAttributes(), value -> assertThat(value).isPositive()); // Ack another half. for (MessageId messageId : keepsMessageIdList){ consumer.acknowledge(messageId); @@ -171,6 +202,17 @@ public void testManagedCursorMetrics() throws Exception { Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationSucceed(), value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationSucceed(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER, + attributesSet.getAttributes(), 0); /** * Make BK error, and send many message, then wait cursor persistent finish. * After the cursor data is written to ZK, verify "brk_ml_cursor_persistLedgerErrors" and @@ -196,6 +238,17 @@ public void testManagedCursorMetrics() throws Exception { Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationSucceed(), value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationFailure(), value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationSucceed(), value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER, + attributesSet.getAttributes(), 0); /** * TODO verify "brk_ml_cursor_persistZookeeperErrors". * This is not easy to implement, we can use {@link #mockZooKeeper} to fail ZK, but we cannot identify whether @@ -210,13 +263,16 @@ public void testManagedCursorMetrics() throws Exception { admin.topics().delete(topicName, true); } - private ManagedCursorMXBean getManagedCursorMXBean(String topicName, String subscriptionName) - throws ExecutionException, InterruptedException { + private ManagedCursorMXBean getManagedCursorMXBean(String topicName, String subscriptionName) throws Exception { + var managedCursor = getManagedCursor(topicName, subscriptionName); + return managedCursor.getStats(); + } + + private ManagedCursor getManagedCursor(String topicName, String subscriptionName) throws Exception { final PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService() .getTopic(topicName, false).get().get().getSubscription(subscriptionName); - final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor(); - return managedCursor.getStats(); + return persistentSubscription.getCursor(); } @Test @@ -265,9 +321,11 @@ public void testCursorReadWriteMetrics() throws Exception { } } + var managedCursor1 = getManagedCursor(topicName, subName1); + var cursorMXBean1 = managedCursor1.getStats(); + var managedCursor2 = getManagedCursor(topicName, subName2); + var cursorMXBean2 = managedCursor2.getStats(); // Wait for persistent cursor meta. - ManagedCursorMXBean cursorMXBean1 = getManagedCursorMXBean(topicName, subName1); - ManagedCursorMXBean cursorMXBean2 = getManagedCursorMXBean(topicName, subName2); Awaitility.await().until(() -> cursorMXBean1.getWriteCursorLedgerLogicalSize() > 0); Awaitility.await().until(() -> cursorMXBean2.getWriteCursorLedgerLogicalSize() > 0); @@ -281,6 +339,22 @@ public void testCursorReadWriteMetrics() throws Exception { Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L); Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L); + var otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + var attributes1 = new ManagedCursorAttributes(managedCursor1).getAttributes(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.OUTGOING_BYTE_COUNTER, + attributes1, value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.OUTGOING_BYTE_LOGICAL_COUNTER, + attributes1, value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.INCOMING_BYTE_COUNTER, + attributes1, 0); + + var attributes2 = new ManagedCursorAttributes(managedCursor2).getAttributes(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.OUTGOING_BYTE_COUNTER, + attributes2, value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.OUTGOING_BYTE_LOGICAL_COUNTER, + attributes2, value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.INCOMING_BYTE_COUNTER, + attributes2, 0); // cleanup. consumer.close(); consumer2.close(); From 2cd155d61b358e69f00459d661e0a49ace26cc9e Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 12 Jun 2024 13:45:49 -0700 Subject: [PATCH 08/10] Refactor attributes --- .../bookkeeper/mledger/ManagedCursor.java | 1 - .../{impl => }/ManagedCursorAttributes.java | 36 +++++++------------ .../mledger/impl/ManagedCursorImpl.java | 1 + .../stats/ManagedCursorMetricsTest.java | 2 +- .../OpenTelemetryAttributes.java | 21 +++++++++++ 5 files changed, 35 insertions(+), 26 deletions(-) rename managed-ledger/src/main/java/org/apache/bookkeeper/mledger/{impl => }/ManagedCursorAttributes.java (56%) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index c42396f0ef2d8..76f9109215e4b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -34,7 +34,6 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback; -import org.apache.bookkeeper.mledger.impl.ManagedCursorAttributes; import org.apache.bookkeeper.mledger.impl.PositionImpl; /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorAttributes.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java similarity index 56% rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorAttributes.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java index c746bc4409d2a..f6c2e811d28c3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorAttributes.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java @@ -16,33 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.bookkeeper.mledger.impl; +package org.apache.bookkeeper.mledger; -import com.google.common.annotations.VisibleForTesting; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import lombok.Data; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedCursorOperationStatus; @Data public class ManagedCursorAttributes { - public static final AttributeKey PULSAR_MANAGED_CURSOR_NAME = - AttributeKey.stringKey("pulsar.managed_cursor.name"); - - public static final AttributeKey PULSAR_MANAGED_CURSOR_OPERATION_STATUS = - AttributeKey.stringKey("pulsar.managed_cursor.operation.status"); - - @VisibleForTesting - public enum OperationStatus { - SUCCESS, - FAILURE; - public final Attributes attributes = - Attributes.of(PULSAR_MANAGED_CURSOR_OPERATION_STATUS, name().toLowerCase()); - } - private final Attributes attributes; private final Attributes attributesOperationSucceed; private final Attributes attributesOperationFailure; @@ -51,13 +35,17 @@ public ManagedCursorAttributes(ManagedCursor cursor) { var mlName = cursor.getManagedLedger().getName(); var topicName = TopicName.get(TopicName.fromPersistenceNamingEncoding(mlName)); attributes = Attributes.of( - PULSAR_MANAGED_CURSOR_NAME, cursor.getName(), - ManagedLedgerAttributes.PULSAR_MANAGER_LEDGER_NAME, mlName, + OpenTelemetryAttributes.ML_CURSOR_NAME, cursor.getName(), + OpenTelemetryAttributes.ML_LEDGER_NAME, mlName, OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace() ); - attributesOperationSucceed = - Attributes.builder().putAll(attributes).putAll(OperationStatus.SUCCESS.attributes).build(); - attributesOperationFailure = - Attributes.builder().putAll(attributes).putAll(OperationStatus.FAILURE.attributes).build(); + attributesOperationSucceed = Attributes.builder() + .putAll(attributes) + .putAll(ManagedCursorOperationStatus.SUCCESS.attributes) + .build(); + attributesOperationFailure = Attributes.builder() + .putAll(attributes) + .putAll(ManagedCursorOperationStatus.FAILURE.attributes) + .build(); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 9896c900b004d..868e570e87b07 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -77,6 +77,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedCursorAttributes; import org.apache.bookkeeper.mledger.ManagedCursorMXBean; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java index 17a111f48d282..8ddb5320588da 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java @@ -28,7 +28,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursorMXBean; -import org.apache.bookkeeper.mledger.impl.ManagedCursorAttributes; +import org.apache.bookkeeper.mledger.ManagedCursorAttributes; import org.apache.bookkeeper.mledger.impl.OpenTelemetryManagedCursorStats; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.ServiceConfiguration; diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 0cc7cf245f11a..957aa80f14396 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -101,6 +101,7 @@ public interface OpenTelemetryAttributes { * The status of the Pulsar transaction. */ AttributeKey PULSAR_TRANSACTION_STATUS = AttributeKey.stringKey("pulsar.transaction.status"); + enum TransactionStatus { ACTIVE, COMMITTED, @@ -129,6 +130,26 @@ enum BacklogQuotaType { } // Managed Ledger Attributes + /** + * The name of the managed ledger. + */ + AttributeKey ML_LEDGER_NAME = AttributeKey.stringKey("pulsar.managed_ledger.name"); + + /** + * The name of the managed cursor. + */ + AttributeKey ML_CURSOR_NAME = AttributeKey.stringKey("pulsar.managed_ledger.cursor.name"); + + /** + * The status of the managed cursor operation. + */ + AttributeKey ML_CURSOR_OPERATION_STATUS = + AttributeKey.stringKey("pulsar.managed_ledger.cursor.operation.status"); + enum ManagedCursorOperationStatus { + SUCCESS, + FAILURE; + public final Attributes attributes = Attributes.of(ML_CURSOR_OPERATION_STATUS, name().toLowerCase()); + } /** * The type of the pool arena. From 11d51a89da9a06637b90e44667f88590cb9c16f1 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 12 Jun 2024 13:46:02 -0700 Subject: [PATCH 09/10] Instantiate stats object --- .../bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 3 +++ .../mledger/impl/OpenTelemetryManagedCursorStats.java | 5 +++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 973ae364d60d0..45ccc4c26c160 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -122,6 +122,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final MetadataStore metadataStore; private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats; + private final OpenTelemetryManagedCursorStats openTelemetryManagedCursorStats; //indicate whether shutdown() is called. private volatile boolean closed; @@ -230,6 +231,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, metadataStore.registerSessionListener(this::handleMetadataStoreNotification); openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this); + openTelemetryManagedCursorStats = new OpenTelemetryManagedCursorStats(openTelemetry, this); } static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy { @@ -622,6 +624,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { })); }).thenAcceptAsync(__ -> { //wait for tasks in scheduledExecutor executed. + openTelemetryManagedCursorStats.close(); openTelemetryCacheStats.close(); scheduledExecutor.shutdownNow(); entryCacheManager.clear(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java index 72f5bae0c38e2..d76ce3b45eb96 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java @@ -23,6 +23,7 @@ import io.opentelemetry.api.metrics.BatchCallback; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.pulsar.opentelemetry.Constants; public class OpenTelemetryManagedCursorStats implements AutoCloseable { @@ -55,8 +56,8 @@ public class OpenTelemetryManagedCursorStats implements AutoCloseable { private final BatchCallback batchCallback; - public OpenTelemetryManagedCursorStats(ManagedLedgerFactoryImpl factory, OpenTelemetry openTelemetry) { - var meter = openTelemetry.getMeter("pulsar.managed_ledger.cursor"); // TODO + public OpenTelemetryManagedCursorStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) { + var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME); persistOperationCounter = meter .counterBuilder(PERSIST_OPERATION_COUNTER) From 32080b07484524f24244e14a18d67fb10a3e4a29 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 26 Jun 2024 11:40:00 -0700 Subject: [PATCH 10/10] Update metric descriptions --- .../apache/bookkeeper/mledger/ManagedCursorAttributes.java | 4 ++-- .../mledger/impl/OpenTelemetryManagedCursorStats.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java index f6c2e811d28c3..6c06e68d75e24 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java @@ -19,12 +19,12 @@ package org.apache.bookkeeper.mledger; import io.opentelemetry.api.common.Attributes; -import lombok.Data; +import lombok.Getter; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedCursorOperationStatus; -@Data +@Getter public class ManagedCursorAttributes { private final Attributes attributes; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java index d76ce3b45eb96..93a749d4aef51 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java @@ -80,19 +80,19 @@ public OpenTelemetryManagedCursorStats(OpenTelemetry openTelemetry, ManagedLedge outgoingByteCounter = meter .counterBuilder(OUTGOING_BYTE_COUNTER) .setUnit("{By}") - .setDescription("The size of write to ledger.") + .setDescription("The total amount of data written to the ledger.") .buildObserver(); outgoingByteLogicalCounter = meter .counterBuilder(OUTGOING_BYTE_LOGICAL_COUNTER) .setUnit("{By}") - .setDescription("The size of write to ledger (not including replicas).") + .setDescription("The total amount of data written to the ledger, not including replicas.") .buildObserver(); incomingByteCounter = meter .counterBuilder(INCOMING_BYTE_COUNTER) .setUnit("{By}") - .setDescription("The size of read from ledger.") + .setDescription("The total amount of data read from the ledger.") .buildObserver(); batchCallback = meter.batchCallback(() -> factory.getManagedLedgers()