Skip to content

Commit

Permalink
KAFKA-12697: Add Global Topic and Partition count metrics to the Quor…
Browse files Browse the repository at this point in the history
…um Controller (#10679)

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
dielhennr authored May 13, 2021
1 parent 4b27365 commit e69571a
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface ControllerMetrics {
void updateEventQueueTime(long durationMs);

void updateEventQueueProcessingTime(long durationMs);

void setGlobalTopicsCount(int topicCount);

int globalTopicsCount();

void setGlobalPartitionCount(int partitionCount);

int globalPartitionCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ private QuorumController(LogContext logContext,
this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder);
this.replicationControl = new ReplicationControlManager(snapshotRegistry,
logContext, defaultReplicationFactor, defaultNumPartitions,
configurationControl, clusterControl);
configurationControl, clusterControl, controllerMetrics);
this.logManager = logManager;
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,24 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
"kafka.controller", "ControllerEventManager", "EventQueueTimeMs", null);
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = new MetricName(
"kafka.controller", "ControllerEventManager", "EventQueueProcessingTimeMs", null);
private final static MetricName GLOBAL_TOPIC_COUNT = new MetricName(
"kafka.controller", "KafkaController", "GlobalTopicCount", null);
private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName(
"kafka.controller", "KafkaController", "GlobalPartitionCount", null);

private volatile boolean active;
private volatile int globalTopicCount;
private volatile int globalPartitionCount;
private final Gauge<Integer> activeControllerCount;
private final Gauge<Integer> globalPartitionCountGauge;
private final Gauge<Integer> globalTopicCountGauge;
private final Histogram eventQueueTime;
private final Histogram eventQueueProcessingTime;

public QuorumControllerMetrics(MetricsRegistry registry) {
this.active = false;
this.globalTopicCount = 0;
this.globalPartitionCount = 0;
this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
Expand All @@ -46,6 +56,18 @@ public Integer value() {
});
this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.globalTopicCountGauge = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return globalTopicCount;
}
});
this.globalPartitionCountGauge = registry.newGauge(GLOBAL_PARTITION_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return globalPartitionCount;
}
});
}

@Override
Expand All @@ -67,4 +89,24 @@ public void updateEventQueueTime(long durationMs) {
public void updateEventQueueProcessingTime(long durationMs) {
eventQueueTime.update(durationMs);
}

@Override
public void setGlobalTopicsCount(int topicCount) {
this.globalTopicCount = topicCount;
}

@Override
public int globalTopicsCount() {
return this.globalTopicCount;
}

@Override
public void setGlobalPartitionCount(int partitionCount) {
this.globalPartitionCount = partitionCount;
}

@Override
public int globalPartitionCount() {
return this.globalPartitionCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ public String toString() {
*/
private final int defaultNumPartitions;

/**
* A count of the total number of partitions in the cluster.
*/
private int globalPartitionCount;

/**
* A reference to the controller's configuration control manager.
*/
Expand All @@ -289,6 +294,11 @@ public String toString() {
*/
private final ClusterControlManager clusterControl;

/**
* A reference to the controller's metrics registry.
*/
private final ControllerMetrics controllerMetrics;

/**
* Maps topic names to topic UUIDs.
*/
Expand All @@ -309,13 +319,16 @@ public String toString() {
short defaultReplicationFactor,
int defaultNumPartitions,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl) {
ClusterControlManager clusterControl,
ControllerMetrics controllerMetrics) {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(ReplicationControlManager.class);
this.defaultReplicationFactor = defaultReplicationFactor;
this.defaultNumPartitions = defaultNumPartitions;
this.configurationControl = configurationControl;
this.controllerMetrics = controllerMetrics;
this.clusterControl = clusterControl;
this.globalPartitionCount = 0;
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
Expand All @@ -325,6 +338,7 @@ public void replay(TopicRecord record) {
topicsByName.put(record.name(), record.topicId());
topics.put(record.topicId(),
new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
controllerMetrics.setGlobalTopicsCount(topics.size());
log.info("Created topic {} with topic ID {}.", record.name(), record.topicId());
}

Expand All @@ -343,6 +357,8 @@ public void replay(PartitionRecord record) {
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), null,
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
globalPartitionCount++;
controllerMetrics.setGlobalPartitionCount(globalPartitionCount);
} else if (!newPartInfo.equals(prevPartInfo)) {
newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
Expand Down Expand Up @@ -389,9 +405,11 @@ public void replay(RemoveTopicRecord record) {
for (int i = 0; i < partition.isr.length; i++) {
brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
}
globalPartitionCount--;
}
brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);

controllerMetrics.setGlobalTopicsCount(topics.size());
controllerMetrics.setGlobalPartitionCount(globalPartitionCount);
log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.kafka.controller;


public final class MockControllerMetrics implements ControllerMetrics {
private volatile boolean active;
private volatile int topics;
private volatile int partitions;

public MockControllerMetrics() {
this.active = false;
this.topics = 0;
this.partitions = 0;
}

@Override
Expand All @@ -44,4 +47,24 @@ public void updateEventQueueTime(long durationMs) {
public void updateEventQueueProcessingTime(long durationMs) {
// nothing to do
}

@Override
public void setGlobalTopicsCount(int topicCount) {
this.topics = topicCount;
}

@Override
public int globalTopicsCount() {
return this.topics;
}

@Override
public void setGlobalPartitionCount(int partitionCount) {
this.partitions = partitionCount;
}

@Override
public int globalPartitionCount() {
return this.partitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ private static class ReplicationControlTestContext {
final ClusterControlManager clusterControl = new ClusterControlManager(
logContext, time, snapshotRegistry, 1000,
new SimpleReplicaPlacementPolicy(random));
final ControllerMetrics metrics = new MockControllerMetrics();
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, Collections.emptyMap());
final ReplicationControlManager replicationControl = new ReplicationControlManager(snapshotRegistry,
new LogContext(),
(short) 3,
1,
configurationControl,
clusterControl);
clusterControl,
metrics);

void replay(List<ApiMessageAndVersion> records) throws Exception {
ControllerTestUtils.replayAll(clusterControl, records);
Expand Down Expand Up @@ -203,6 +205,53 @@ public void testCreateTopics() throws Exception {
ctx.replicationControl.iterator(Long.MAX_VALUE));
}

@Test
public void testGlobalTopicAndPartitionMetrics() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) -1));

registerBroker(0, ctx);
unfenceBroker(0, ctx);
registerBroker(1, ctx);
unfenceBroker(1, ctx);
registerBroker(2, ctx);
unfenceBroker(2, ctx);

List<Uuid> topicsToDelete = new ArrayList<>();

ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(request);
topicsToDelete.add(result.response().topics().find("foo").topicId());

ControllerTestUtils.replayAll(replicationControl, result.records());
assertEquals(1, ctx.metrics.globalTopicsCount());

request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("bar").
setNumPartitions(1).setReplicationFactor((short) -1));
request.topics().add(new CreatableTopic().setName("baz").
setNumPartitions(2).setReplicationFactor((short) -1));
result = replicationControl.createTopics(request);
ControllerTestUtils.replayAll(replicationControl, result.records());
assertEquals(3, ctx.metrics.globalTopicsCount());
assertEquals(4, ctx.metrics.globalPartitionCount());

topicsToDelete.add(result.response().topics().find("baz").topicId());
ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.deleteTopics(topicsToDelete);
ControllerTestUtils.replayAll(replicationControl, deleteResult.records());
assertEquals(1, ctx.metrics.globalTopicsCount());
assertEquals(1, ctx.metrics.globalPartitionCount());

Uuid topicToDelete = result.response().topics().find("bar").topicId();
deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicToDelete));
ControllerTestUtils.replayAll(replicationControl, deleteResult.records());
assertEquals(0, ctx.metrics.globalTopicsCount());
assertEquals(0, ctx.metrics.globalPartitionCount());
}

@Test
public void testValidateNewTopicNames() {
Map<String, ApiError> topicErrors = new HashMap<>();
Expand Down

0 comments on commit e69571a

Please sign in to comment.