Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[common]feature:add kafka common queue #966

Merged
merged 4 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,10 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package org.dromara.hertzbeat.common.queue.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.dromara.hertzbeat.common.entity.alerter.Alert;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.queue.CommonDataQueue;
import org.dromara.hertzbeat.common.serialize.AlertDeserializer;
import org.dromara.hertzbeat.common.serialize.AlertSerializer;
import org.dromara.hertzbeat.common.serialize.KafkaMetricsDataDeserializer;
import org.dromara.hertzbeat.common.serialize.KafkaMetricsDataSerializer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.*;


/**
* kafka采集数据队列实现
* @author tablerow
*
*/
@Configuration
@ConditionalOnProperty(prefix = "common.queue", name = "type", havingValue = "kafka",
matchIfMissing = false)
@Slf4j
public class KafkaCommonDataQueue implements CommonDataQueue, DisposableBean {

private KafkaProducer<Long, CollectRep.MetricsData> metricsDataProducer;
private KafkaProducer<Long, Alert> kafkaAlertProducer;
private KafkaConsumer<Long, Alert> alertConsumer;
private KafkaConsumer<Long, CollectRep.MetricsData> metricsDataToAlertConsumer;
private KafkaConsumer<Long, CollectRep.MetricsData> metricsDataToPersistentStorageConsumer;
private KafkaConsumer<Long, CollectRep.MetricsData> metricsDataToMemoryStorageConsumer;
@Autowired
private KafkaProperties kafka;

@PostConstruct
public void initDataQueue(){
try {
Map<String, Object> producerConfig = new HashMap<String, Object>(3);
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 3);
metricsDataProducer = new KafkaProducer<>(producerConfig, new LongSerializer(), new KafkaMetricsDataSerializer());
kafkaAlertProducer = new KafkaProducer<>(producerConfig, new LongSerializer(), new AlertSerializer());

Map<String, Object> consumerConfig = new HashMap<String, Object>(4);
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka.getServers());
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
consumerConfig.put("group.id", "default-consumer");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

alertConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new AlertDeserializer());

metricsDataToAlertConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer());
metricsDataToMemoryStorageConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer());
metricsDataToPersistentStorageConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer());

alertConsumer.subscribe(Collections.singletonList(kafka.getAlertTopic()));
metricsDataToAlertConsumer.subscribe(Collections.singletonList(kafka.getAlertMetricTopic()));
metricsDataToPersistentStorageConsumer.subscribe(Collections.singletonList(kafka.getPersistentStorageTopic()));
metricsDataToMemoryStorageConsumer.subscribe(Collections.singletonList(kafka.getRealTimeMemoryStorageTopic()));
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

@Override
public void addAlertData(Alert alert) {
if (kafkaAlertProducer != null) {
kafkaAlertProducer.send(new ProducerRecord<>(kafka.getAlertTopic(), alert));
} else {
log.error("kafkaAlertProducer is not enable");
}
}

@Override
public Alert pollAlertData() throws InterruptedException {
Alert alert = null;
try {
ConsumerRecords<Long, Alert> records = alertConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Long, Alert> record : records) {
alert = record.value();
}
alertConsumer.commitAsync();
}catch (ConcurrentModificationException e){
//kafka多线程下线程不安全异常
}
return alert;
}

@Override
public CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException {
CollectRep.MetricsData metricsData = null;
try {
ConsumerRecords<Long, CollectRep.MetricsData> records = metricsDataToAlertConsumer.poll(Duration.ofSeconds(1));
for ( ConsumerRecord<Long, CollectRep.MetricsData> record : records) {
metricsData = record.value();
}
metricsDataToAlertConsumer.commitAsync();
}catch (ConcurrentModificationException e){
//kafka多线程下线程不安全异常
}
return metricsData;
}

@Override
public CollectRep.MetricsData pollPersistentStorageMetricsData() throws InterruptedException {
CollectRep.MetricsData persistentStorageMetricsData = null;
try {
ConsumerRecords<Long, CollectRep.MetricsData> records = metricsDataToPersistentStorageConsumer.poll(Duration.ofSeconds(1));
for ( ConsumerRecord<Long, CollectRep.MetricsData> record : records) {
persistentStorageMetricsData = record.value();
}
metricsDataToPersistentStorageConsumer.commitAsync();
}catch (ConcurrentModificationException e){
//kafka多线程下线程不安全异常
}
return persistentStorageMetricsData;
}

@Override
public CollectRep.MetricsData pollRealTimeStorageMetricsData() throws InterruptedException {
CollectRep.MetricsData memoryMetricsData = null;
try {
ConsumerRecords<Long, CollectRep.MetricsData> records = metricsDataToMemoryStorageConsumer.poll(Duration.ofSeconds(1));
for ( ConsumerRecord<Long, CollectRep.MetricsData> record : records) {
memoryMetricsData = record.value();
}
metricsDataToMemoryStorageConsumer.commitAsync();
}catch (ConcurrentModificationException e){
//kafka多线程下线程不安全异常
}
return memoryMetricsData;
}

@Override
public void sendMetricsData(CollectRep.MetricsData metricsData) {
if (metricsDataProducer != null) {
metricsDataProducer.send(new ProducerRecord<>(kafka.getAlertMetricTopic(), metricsData));
metricsDataProducer.send(new ProducerRecord<>(kafka.getPersistentStorageTopic(), metricsData));
metricsDataProducer.send(new ProducerRecord<>(kafka.getRealTimeMemoryStorageTopic(), metricsData));
} else {
log.error("metricsDataProducer is not enabled");
}

}

@Override
public void destroy() throws Exception {
if (metricsDataProducer != null) {
metricsDataProducer.close();
}
if (kafkaAlertProducer != null) {
kafkaAlertProducer.close();
}
if (alertConsumer != null) {
alertConsumer.close();
}
if (metricsDataToAlertConsumer != null) {
metricsDataToAlertConsumer.close();
}
if (metricsDataToPersistentStorageConsumer != null) {
metricsDataToPersistentStorageConsumer.close();
}
if (metricsDataToMemoryStorageConsumer != null) {
metricsDataToMemoryStorageConsumer.close();
}
}
@Component
@ConfigurationProperties("common.queue.kafka")
public static class KafkaProperties {
/**
* kafka的连接服务器url
*/
private String servers;
/**
* 接收数据的topic名称
*/
private String alertTopic;
private String alertMetricTopic;
private String persistentStorageTopic;
private String realTimeMemoryStorageTopic;
/**
* 消费者组ID
*/
private String groupId;
public String getServers() {
return servers;
}

public void setServers(String servers) {
this.servers = servers;
}

public String getAlertTopic() {
return alertTopic;
}

public void setAlertTopic(String alertTopic) {
this.alertTopic = alertTopic;
}

public String getAlertMetricTopic() {
return alertMetricTopic;
}

public void setAlertMetricTopic(String alertMetricTopic) {
this.alertMetricTopic = alertMetricTopic;
}

public String getPersistentStorageTopic() {
return persistentStorageTopic;
}

public void setPersistentStorageTopic(String persistentStorageTopic) {
this.persistentStorageTopic = persistentStorageTopic;
}

public String getRealTimeMemoryStorageTopic() {
return realTimeMemoryStorageTopic;
}

public void setRealTimeMemoryStorageTopic(String realTimeMemoryStorageTopic) {
this.realTimeMemoryStorageTopic = realTimeMemoryStorageTopic;
}

public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.dromara.hertzbeat.common.serialize;

import org.dromara.hertzbeat.common.entity.alerter.Alert;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.dromara.hertzbeat.common.util.JsonUtil;

import java.util.Map;

/**
* kafka告警记录反序列化类
* @author tablerow
*/
public class AlertDeserializer implements Deserializer {
@Override
public void configure(Map configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}

@Override
public Object deserialize(String s, byte[] bytes) {
return JsonUtil.fromJson(new String(bytes), Alert.class);
}

@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}

@Override
public void close() {
Deserializer.super.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.dromara.hertzbeat.common.serialize;


import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.dromara.hertzbeat.common.entity.alerter.Alert;
import org.dromara.hertzbeat.common.util.JsonUtil;

import java.util.Map;

/**
* kafka告警记录序列化类
* @author tablerow
*/
public class AlertSerializer implements Serializer<Alert> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Serializer.super.configure(configs, isKey);
}

@Override
public byte[] serialize(String s, Alert alert) {
if (alert == null){
return null;
}
return JsonUtil.toJson(alert).getBytes();
}

@Override
public byte[] serialize(String topic, Headers headers, Alert data) {
return Serializer.super.serialize(topic, headers, data);
}

@Override
public void close() {
Serializer.super.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.dromara.hertzbeat.common.serialize;

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.dromara.hertzbeat.common.entity.message.CollectRep;

import java.util.Map;

/**
* kafka指标组监控数据反序列化类
* @author tablerow
*/
public class KafkaMetricsDataDeserializer implements Deserializer<CollectRep.MetricsData> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}

@Override
public CollectRep.MetricsData deserialize(String s, byte[] bytes){
try {
return CollectRep.MetricsData.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}

@Override
public CollectRep.MetricsData deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}

@Override
public void close() {
Deserializer.super.close();
}
}
Loading