Skip to content

Commit

Permalink
[collector]add monitoring metrics for consumer groups in Kafka client (
Browse files Browse the repository at this point in the history
…#2887)

Co-authored-by: tomsun28 <tomsun28@outlook.com>
  • Loading branch information
doveLin0818 and tomsun28 authored Dec 20, 2024
1 parent 5ba09cf commit 6aff6ed
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,41 @@
import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.util.Assert;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

@Slf4j
public class KafkaCollectImpl extends AbstractCollect {

private static final String LAG_NUM = "lag_num";
private static final String PARTITION_OFFSET = "Partition_offset";

@Override
public void preCheck(Metrics metrics) throws IllegalArgumentException {
KafkaProtocol kafkaProtocol = metrics.getKclient();
Expand Down Expand Up @@ -79,6 +94,9 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri
case TOPIC_OFFSET:
collectTopicOffset(builder, adminClient);
break;
case CONSUMER_DETAIL:
collectTopicConsumerGroups(builder, adminClient);
break;
default:
log.error("Unsupported command: {}", command);
break;
Expand Down Expand Up @@ -203,6 +221,98 @@ private static void collectTopicDescribe(CollectRep.MetricsData.Builder builder,
});
}

/**
* Collect Topic ConsumerGroups Message
*
* @param builder The MetricsData builder
* @param adminClient The AdminClient
*/
private static void collectTopicConsumerGroups(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
// Get all consumer groups
ListConsumerGroupsResult consumerGroupsResult = adminClient.listConsumerGroups();
Collection<ConsumerGroupListing> consumerGroups = consumerGroupsResult.all().get();
// Get the list of consumer groups for each topic
Map<String, Set<String>> topicConsumerGroupsMap = getTopicConsumerGroupsMap(consumerGroups, adminClient);
topicConsumerGroupsMap.entrySet().stream()
.flatMap(entry -> entry.getValue().stream()
.map(groupId -> {
try {
String topic = entry.getKey();
DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Collections.singletonList(groupId));
Map<String, ConsumerGroupDescription> consumerGroupDescriptions = describeResult.all().get();
ConsumerGroupDescription description = consumerGroupDescriptions.get(groupId);
Map<String, String> offsetAndLagNum = getConsumerGroupMetrics(topic, groupId, adminClient);
return CollectRep.ValueRow.newBuilder()
.addColumns(groupId)
.addColumns(String.valueOf(description.members().size()))
.addColumns(topic)
.addColumns(offsetAndLagNum.get(PARTITION_OFFSET))
.addColumns(offsetAndLagNum.get(LAG_NUM))
.build();
} catch (InterruptedException | ExecutionException e) {
log.warn("group {} get message fail", groupId);
return null;
}
})
)
.filter(Objects::nonNull)
.forEach(builder::addValues);
}

private static Map<String, Set<String>> getTopicConsumerGroupsMap(Collection<ConsumerGroupListing> consumerGroups,
AdminClient adminClient)
throws ExecutionException, InterruptedException {
Map<String, Set<String>> topicConsumerGroupsMap = new HashMap<>();
for (ConsumerGroupListing consumerGroup : consumerGroups) {
String groupId = consumerGroup.groupId();
// Get the offset information for the consumer group
ListConsumerGroupOffsetsResult consumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> topicOffsets = consumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
// Iterate over all TopicPartitions consumed by the consumer group
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicOffsets.entrySet()) {
String topic = entry.getKey().topic();
topicConsumerGroupsMap.computeIfAbsent(topic, k -> new HashSet<>()).add(groupId);
}
}
return topicConsumerGroupsMap;
}

private static Map<String, String> getConsumerGroupMetrics(String topic, String groupId, AdminClient adminClient)
throws ExecutionException, InterruptedException {
// Get the offset for each groupId for the specified topic
ListConsumerGroupOffsetsResult consumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> topicOffsets = consumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
long totalLag = 0L;
for (Entry<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataEntry : topicOffsets.entrySet()) {
if (topicPartitionOffsetAndMetadataEntry.getKey().topic().equals(topic)) {
OffsetAndMetadata offsetMetadata = topicPartitionOffsetAndMetadataEntry.getValue();
TopicPartition partition = topicPartitionOffsetAndMetadataEntry.getKey();
// Get the latest offset for each TopicPartition
ListOffsetsResultInfo resultInfo = adminClient.listOffsets(
Collections.singletonMap(partition, OffsetSpec.latest())).all().get().get(partition);
long latestOffset = resultInfo.offset();
// Accumulate the lag for each partition
long l = latestOffset - offsetMetadata.offset();
totalLag += l;
}
}
// Get all offsets and convert them to a string, joined by "、"
String partitionOffsets = topicOffsets.entrySet().stream()
.filter(entry -> entry.getKey().topic().equals(topic))
.map(entry -> String.valueOf(entry.getValue().offset()))
.collect(Collectors.collectingAndThen(
Collectors.joining(","),
result -> "[" + result + "]"
));
Map<String, String> res = new HashMap<>();
res.put(LAG_NUM, String.valueOf(totalLag));
res.put(PARTITION_OFFSET, partitionOffsets);
return res;
}


@Override
public String supportProtocol() {
return DispatchConstants.PROTOCOL_KAFKA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public enum SupportedCommand {

TOPIC_DESCRIBE("topic-describe"),
TOPIC_LIST("topic-list"),
TOPIC_OFFSET("topic-offset");
TOPIC_OFFSET("topic-offset"),
CONSUMER_DETAIL("consumer-detail");

private static Set<String> SUPPORTED_COMMAND = new HashSet<>();

Expand Down
41 changes: 39 additions & 2 deletions hertzbeat-manager/src/main/resources/define/app-kafka_client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ metrics:
- field: PartitionNum
type: 1
i18n:
zh-CN: 分区数量
zh-CN: 分区号
en-US: Partition Num
- field: earliest
type: 0
Expand All @@ -140,4 +140,41 @@ metrics:
host: ^_^host^_^
port: ^_^port^_^
command: topic-offset

- name: consumer_detail
i18n:
zh-CN: 消费者组情况
en-US: Consumer Detail Info
priority: 3
# Kafka offset does not need to be obtained frequently, as getting it too quickly will affect performance
interval: 300
fields:
- field: GroupId
type: 1
i18n:
zh-CN: 消费者组ID
en-US: Consumer Group ID
- field: Group Member Num
type: 1
i18n:
zh-CN: 消费者实例数量
en-US: Group Member Num
- field: Topic
type: 1
i18n:
zh-CN: 订阅主题名称
en-US: Subscribed Topic Name
- field: Offset of Each Partition
type: 1
i18n:
zh-CN: 各分区偏移量
en-US: Offset of Each Partition
- field: Lag
type: 0
i18n:
zh-CN: 落后偏移量
en-US: Total Lag
protocol: kclient
kclient:
host: ^_^host^_^
port: ^_^port^_^
command: consumer-detail
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,19 @@ keywords: [开源监控系统, 开源消息中间件监控, Kafka监控]

#### 指标集合:topic_offset

| 指标名称 | 指标单位 | 指标帮助描述 |
|-------|---|---------|
| TopicName || 主题名称 |
| PartitionNum || 分区数量 |
| earliest || 最早偏移量 |
| latest || 最新偏移量 |
| 指标名称 | 指标单位 | 指标帮助描述 |
|-------|---|--------|
| TopicName || 主题名称 |
| PartitionNum || 分区号 |
| earliest || 最早偏移量 |
| latest || 最新偏移量 |

#### 指标集合:consumer_detail

| 指标名称 | 指标单位 | 指标帮助描述 |
|-----------|------|-------|
| GroupId || 消费者组ID |
| Group Member Num || 消费者实例数量|
| Subscribed Topic Name || 订阅主题名称 |
| Offsets of Each Partition || 各分区偏移量 |
| Lag || 落后偏移量 |

0 comments on commit 6aff6ed

Please sign in to comment.