diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java index fd4f3befb805f..406a533e7d7e4 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java @@ -26,4 +26,12 @@ public interface ControllerMetrics { void updateEventQueueTime(long durationMs); void updateEventQueueProcessingTime(long durationMs); + + void setGlobalTopicsCount(int topicCount); + + int globalTopicsCount(); + + void setGlobalPartitionCount(int partitionCount); + + int globalPartitionCount(); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 86faa5ede8e6c..42bc308c24160 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -928,7 +928,7 @@ private QuorumController(LogContext logContext, this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder); this.replicationControl = new ReplicationControlManager(snapshotRegistry, logContext, defaultReplicationFactor, defaultNumPartitions, - configurationControl, clusterControl); + configurationControl, clusterControl, controllerMetrics); this.logManager = logManager; this.metaLogListener = new QuorumMetaLogListener(); this.curClaimEpoch = -1L; diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java index ad56faf3da99c..a9de1ff0a6836 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java @@ -30,14 +30,24 @@ public final class QuorumControllerMetrics implements ControllerMetrics { "kafka.controller", "ControllerEventManager", "EventQueueTimeMs", null); private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = new MetricName( "kafka.controller", "ControllerEventManager", "EventQueueProcessingTimeMs", null); + private final static MetricName GLOBAL_TOPIC_COUNT = new MetricName( + "kafka.controller", "KafkaController", "GlobalTopicCount", null); + private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName( + "kafka.controller", "KafkaController", "GlobalPartitionCount", null); private volatile boolean active; + private volatile int globalTopicCount; + private volatile int globalPartitionCount; private final Gauge activeControllerCount; + private final Gauge globalPartitionCountGauge; + private final Gauge globalTopicCountGauge; private final Histogram eventQueueTime; private final Histogram eventQueueProcessingTime; public QuorumControllerMetrics(MetricsRegistry registry) { this.active = false; + this.globalTopicCount = 0; + this.globalPartitionCount = 0; this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge() { @Override public Integer value() { @@ -46,6 +56,18 @@ public Integer value() { }); this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true); this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true); + this.globalTopicCountGauge = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge() { + @Override + public Integer value() { + return globalTopicCount; + } + }); + this.globalPartitionCountGauge = registry.newGauge(GLOBAL_PARTITION_COUNT, new Gauge() { + @Override + public Integer value() { + return globalPartitionCount; + } + }); } @Override @@ -67,4 +89,24 @@ public void updateEventQueueTime(long durationMs) { public void updateEventQueueProcessingTime(long durationMs) { eventQueueTime.update(durationMs); } + + @Override + public void setGlobalTopicsCount(int topicCount) { + this.globalTopicCount = topicCount; + } + + @Override + public int globalTopicsCount() { + return this.globalTopicCount; + } + + @Override + public void setGlobalPartitionCount(int partitionCount) { + this.globalPartitionCount = partitionCount; + } + + @Override + public int globalPartitionCount() { + return this.globalPartitionCount; + } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index ea94a0052d34b..d039c2c7c51aa 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -279,6 +279,11 @@ public String toString() { */ private final int defaultNumPartitions; + /** + * A count of the total number of partitions in the cluster. + */ + private int globalPartitionCount; + /** * A reference to the controller's configuration control manager. */ @@ -289,6 +294,11 @@ public String toString() { */ private final ClusterControlManager clusterControl; + /** + * A reference to the controller's metrics registry. + */ + private final ControllerMetrics controllerMetrics; + /** * Maps topic names to topic UUIDs. */ @@ -309,13 +319,16 @@ public String toString() { short defaultReplicationFactor, int defaultNumPartitions, ConfigurationControlManager configurationControl, - ClusterControlManager clusterControl) { + ClusterControlManager clusterControl, + ControllerMetrics controllerMetrics) { this.snapshotRegistry = snapshotRegistry; this.log = logContext.logger(ReplicationControlManager.class); this.defaultReplicationFactor = defaultReplicationFactor; this.defaultNumPartitions = defaultNumPartitions; this.configurationControl = configurationControl; + this.controllerMetrics = controllerMetrics; this.clusterControl = clusterControl; + this.globalPartitionCount = 0; this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0); this.topics = new TimelineHashMap<>(snapshotRegistry, 0); this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry); @@ -325,6 +338,7 @@ public void replay(TopicRecord record) { topicsByName.put(record.name(), record.topicId()); topics.put(record.topicId(), new TopicControlInfo(record.name(), snapshotRegistry, record.topicId())); + controllerMetrics.setGlobalTopicsCount(topics.size()); log.info("Created topic {} with topic ID {}.", record.name(), record.topicId()); } @@ -343,6 +357,8 @@ public void replay(PartitionRecord record) { topicInfo.parts.put(record.partitionId(), newPartInfo); brokersToIsrs.update(record.topicId(), record.partitionId(), null, newPartInfo.isr, NO_LEADER, newPartInfo.leader); + globalPartitionCount++; + controllerMetrics.setGlobalPartitionCount(globalPartitionCount); } else if (!newPartInfo.equals(prevPartInfo)) { newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo); topicInfo.parts.put(record.partitionId(), newPartInfo); @@ -389,9 +405,11 @@ public void replay(RemoveTopicRecord record) { for (int i = 0; i < partition.isr.length; i++) { brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]); } + globalPartitionCount--; } brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER); - + controllerMetrics.setGlobalTopicsCount(topics.size()); + controllerMetrics.setGlobalPartitionCount(globalPartitionCount); log.info("Removed topic {} with ID {}.", topic.name, record.topicId()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java index 4e6523e37f286..45a69d74c5573 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java @@ -17,12 +17,15 @@ package org.apache.kafka.controller; - public final class MockControllerMetrics implements ControllerMetrics { private volatile boolean active; + private volatile int topics; + private volatile int partitions; public MockControllerMetrics() { this.active = false; + this.topics = 0; + this.partitions = 0; } @Override @@ -44,4 +47,24 @@ public void updateEventQueueTime(long durationMs) { public void updateEventQueueProcessingTime(long durationMs) { // nothing to do } + + @Override + public void setGlobalTopicsCount(int topicCount) { + this.topics = topicCount; + } + + @Override + public int globalTopicsCount() { + return this.topics; + } + + @Override + public void setGlobalPartitionCount(int partitionCount) { + this.partitions = partitionCount; + } + + @Override + public int globalPartitionCount() { + return this.partitions; + } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 2a456be830bb9..38d2cd9cd5fcf 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -88,6 +88,7 @@ private static class ReplicationControlTestContext { final ClusterControlManager clusterControl = new ClusterControlManager( logContext, time, snapshotRegistry, 1000, new SimpleReplicaPlacementPolicy(random)); + final ControllerMetrics metrics = new MockControllerMetrics(); final ConfigurationControlManager configurationControl = new ConfigurationControlManager( new LogContext(), snapshotRegistry, Collections.emptyMap()); final ReplicationControlManager replicationControl = new ReplicationControlManager(snapshotRegistry, @@ -95,7 +96,8 @@ private static class ReplicationControlTestContext { (short) 3, 1, configurationControl, - clusterControl); + clusterControl, + metrics); void replay(List records) throws Exception { ControllerTestUtils.replayAll(clusterControl, records); @@ -203,6 +205,53 @@ public void testCreateTopics() throws Exception { ctx.replicationControl.iterator(Long.MAX_VALUE)); } + @Test + public void testGlobalTopicAndPartitionMetrics() throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(1).setReplicationFactor((short) -1)); + + registerBroker(0, ctx); + unfenceBroker(0, ctx); + registerBroker(1, ctx); + unfenceBroker(1, ctx); + registerBroker(2, ctx); + unfenceBroker(2, ctx); + + List topicsToDelete = new ArrayList<>(); + + ControllerResult result = + replicationControl.createTopics(request); + topicsToDelete.add(result.response().topics().find("foo").topicId()); + + ControllerTestUtils.replayAll(replicationControl, result.records()); + assertEquals(1, ctx.metrics.globalTopicsCount()); + + request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("bar"). + setNumPartitions(1).setReplicationFactor((short) -1)); + request.topics().add(new CreatableTopic().setName("baz"). + setNumPartitions(2).setReplicationFactor((short) -1)); + result = replicationControl.createTopics(request); + ControllerTestUtils.replayAll(replicationControl, result.records()); + assertEquals(3, ctx.metrics.globalTopicsCount()); + assertEquals(4, ctx.metrics.globalPartitionCount()); + + topicsToDelete.add(result.response().topics().find("baz").topicId()); + ControllerResult> deleteResult = replicationControl.deleteTopics(topicsToDelete); + ControllerTestUtils.replayAll(replicationControl, deleteResult.records()); + assertEquals(1, ctx.metrics.globalTopicsCount()); + assertEquals(1, ctx.metrics.globalPartitionCount()); + + Uuid topicToDelete = result.response().topics().find("bar").topicId(); + deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicToDelete)); + ControllerTestUtils.replayAll(replicationControl, deleteResult.records()); + assertEquals(0, ctx.metrics.globalTopicsCount()); + assertEquals(0, ctx.metrics.globalPartitionCount()); + } + @Test public void testValidateNewTopicNames() { Map topicErrors = new HashMap<>();