Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12697: Add Global Topic and Partition count metrics to the Quorum Controller #10679

Merged
merged 9 commits into from
May 13, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface ControllerMetrics {
void updateEventQueueTime(long durationMs);

void updateEventQueueProcessingTime(long durationMs);

void setGlobalTopicsCount(int topicCount);

int globalTopicsCount();

void setGlobalPartitionCount(int partitionCount);

int globalPartitionCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ private QuorumController(LogContext logContext,
this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder);
this.replicationControl = new ReplicationControlManager(snapshotRegistry,
logContext, defaultReplicationFactor, defaultNumPartitions,
configurationControl, clusterControl);
configurationControl, clusterControl, controllerMetrics);
this.logManager = logManager;
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,24 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
"kafka.controller", "ControllerEventManager", "EventQueueTimeMs", null);
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = new MetricName(
"kafka.controller", "ControllerEventManager", "EventQueueProcessingTimeMs", null);
private final static MetricName GLOBAL_TOPIC_COUNT = new MetricName(
"kafka.controller", "KafkaController", "GlobalTopicCount", null);
private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName(
dielhennr marked this conversation as resolved.
Show resolved Hide resolved
"kafka.controller", "KafkaController", "GlobalPartitionCount", null);

private volatile boolean active;
private volatile int globalTopicCount;
private volatile int globalPartitionCount;
private final Gauge<Integer> activeControllerCount;
private final Gauge<Integer> globalPartitionCountGauge;
private final Gauge<Integer> globalTopicCountGauge;
private final Histogram eventQueueTime;
private final Histogram eventQueueProcessingTime;

public QuorumControllerMetrics(MetricsRegistry registry) {
this.active = false;
this.globalTopicCount = 0;
this.globalPartitionCount = 0;
this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
Expand All @@ -46,6 +56,18 @@ public Integer value() {
});
this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.globalTopicCountGauge = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return globalTopicCount;
}
});
this.globalPartitionCountGauge = registry.newGauge(GLOBAL_PARTITION_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return globalPartitionCount;
}
});
}

@Override
Expand All @@ -67,4 +89,24 @@ public void updateEventQueueTime(long durationMs) {
public void updateEventQueueProcessingTime(long durationMs) {
eventQueueTime.update(durationMs);
}

@Override
public void setGlobalTopicsCount(int topicCount) {
this.globalTopicCount = topicCount;
}

@Override
public int globalTopicsCount() {
return this.globalTopicCount;
}

@Override
public void setGlobalPartitionCount(int partitionCount) {
this.globalPartitionCount = partitionCount;
}

@Override
public int globalPartitionCount() {
return this.globalPartitionCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ public String toString() {
*/
private final int defaultNumPartitions;

/**
* A count of the total number of partitions in the cluster.
*/
private int globalPartitionCount;
dielhennr marked this conversation as resolved.
Show resolved Hide resolved

/**
* A reference to the controller's configuration control manager.
*/
Expand All @@ -289,6 +294,11 @@ public String toString() {
*/
private final ClusterControlManager clusterControl;

/**
* A reference to the controller's metrics registry.
*/
private final ControllerMetrics controllerMetrics;

/**
* Maps topic names to topic UUIDs.
*/
Expand All @@ -309,13 +319,16 @@ public String toString() {
short defaultReplicationFactor,
int defaultNumPartitions,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl) {
ClusterControlManager clusterControl,
ControllerMetrics controllerMetrics) {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(ReplicationControlManager.class);
this.defaultReplicationFactor = defaultReplicationFactor;
this.defaultNumPartitions = defaultNumPartitions;
this.configurationControl = configurationControl;
this.controllerMetrics = controllerMetrics;
this.clusterControl = clusterControl;
this.globalPartitionCount = 0;
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
Expand All @@ -325,6 +338,7 @@ public void replay(TopicRecord record) {
topicsByName.put(record.name(), record.topicId());
topics.put(record.topicId(),
new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
controllerMetrics.setGlobalTopicsCount(topics.size());
log.info("Created topic {} with topic ID {}.", record.name(), record.topicId());
}

Expand All @@ -343,6 +357,8 @@ public void replay(PartitionRecord record) {
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), null,
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
globalPartitionCount++;
controllerMetrics.setGlobalPartitionCount(globalPartitionCount);
} else if (!newPartInfo.equals(prevPartInfo)) {
newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
Expand Down Expand Up @@ -389,9 +405,11 @@ public void replay(RemoveTopicRecord record) {
for (int i = 0; i < partition.isr.length; i++) {
brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
}
globalPartitionCount--;
}
brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);

controllerMetrics.setGlobalTopicsCount(topics.size());
controllerMetrics.setGlobalPartitionCount(globalPartitionCount);
log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.kafka.controller;


public final class MockControllerMetrics implements ControllerMetrics {
private volatile boolean active;
private volatile int topics;
private volatile int partitions;

public MockControllerMetrics() {
this.active = false;
this.topics = 0;
this.partitions = 0;
}

@Override
Expand All @@ -44,4 +47,24 @@ public void updateEventQueueTime(long durationMs) {
public void updateEventQueueProcessingTime(long durationMs) {
// nothing to do
}

@Override
public void setGlobalTopicsCount(int topicCount) {
this.topics = topicCount;
}

@Override
public int globalTopicsCount() {
return this.topics;
}

@Override
public void setGlobalPartitionCount(int partitionCount) {
this.partitions = partitionCount;
}

@Override
public int globalPartitionCount() {
return this.partitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ private static class ReplicationControlTestContext {
final ClusterControlManager clusterControl = new ClusterControlManager(
logContext, time, snapshotRegistry, 1000,
new SimpleReplicaPlacementPolicy(random));
final ControllerMetrics metrics = new MockControllerMetrics();
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, Collections.emptyMap());
final ReplicationControlManager replicationControl = new ReplicationControlManager(snapshotRegistry,
new LogContext(),
(short) 3,
1,
configurationControl,
clusterControl);
clusterControl,
metrics);

void replay(List<ApiMessageAndVersion> records) throws Exception {
ControllerTestUtils.replayAll(clusterControl, records);
Expand Down Expand Up @@ -203,6 +205,53 @@ public void testCreateTopics() throws Exception {
ctx.replicationControl.iterator(Long.MAX_VALUE));
}

@Test
public void testGlobalTopicAndPartitionMetrics() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) -1));

registerBroker(0, ctx);
unfenceBroker(0, ctx);
registerBroker(1, ctx);
unfenceBroker(1, ctx);
registerBroker(2, ctx);
unfenceBroker(2, ctx);

List<Uuid> topicsToDelete = new ArrayList<>();

ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(request);
topicsToDelete.add(result.response().topics().find("foo").topicId());

ControllerTestUtils.replayAll(replicationControl, result.records());
assertEquals(1, ctx.metrics.globalTopicsCount());

request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("bar").
setNumPartitions(1).setReplicationFactor((short) -1));
request.topics().add(new CreatableTopic().setName("baz").
setNumPartitions(2).setReplicationFactor((short) -1));
result = replicationControl.createTopics(request);
ControllerTestUtils.replayAll(replicationControl, result.records());
assertEquals(3, ctx.metrics.globalTopicsCount());
assertEquals(4, ctx.metrics.globalPartitionCount());

topicsToDelete.add(result.response().topics().find("baz").topicId());
ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.deleteTopics(topicsToDelete);
ControllerTestUtils.replayAll(replicationControl, deleteResult.records());
assertEquals(1, ctx.metrics.globalTopicsCount());
assertEquals(1, ctx.metrics.globalPartitionCount());

Uuid topicToDelete = result.response().topics().find("bar").topicId();
deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicToDelete));
ControllerTestUtils.replayAll(replicationControl, deleteResult.records());
assertEquals(0, ctx.metrics.globalTopicsCount());
assertEquals(0, ctx.metrics.globalPartitionCount());
}

@Test
public void testValidateNewTopicNames() {
Map<String, ApiError> topicErrors = new HashMap<>();
Expand Down