-
Notifications
You must be signed in to change notification settings - Fork 994
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[common] feature:add kafka common queue (#966)
- Loading branch information
Showing
8 changed files
with
426 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
251 changes: 251 additions & 0 deletions
251
common/src/main/java/org/dromara/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,251 @@ | ||
package org.dromara.hertzbeat.common.queue.impl; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.common.serialization.LongDeserializer; | ||
import org.apache.kafka.common.serialization.LongSerializer; | ||
import org.dromara.hertzbeat.common.entity.alerter.Alert; | ||
import org.dromara.hertzbeat.common.entity.message.CollectRep; | ||
import org.dromara.hertzbeat.common.queue.CommonDataQueue; | ||
import org.dromara.hertzbeat.common.serialize.AlertDeserializer; | ||
import org.dromara.hertzbeat.common.serialize.AlertSerializer; | ||
import org.dromara.hertzbeat.common.serialize.KafkaMetricsDataDeserializer; | ||
import org.dromara.hertzbeat.common.serialize.KafkaMetricsDataSerializer; | ||
import org.springframework.beans.factory.DisposableBean; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | ||
import org.springframework.boot.context.properties.ConfigurationProperties; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.stereotype.Component; | ||
|
||
import javax.annotation.PostConstruct; | ||
import java.time.Duration; | ||
import java.util.*; | ||
|
||
|
||
/** | ||
* kafka采集数据队列实现 | ||
* @author tablerow | ||
* | ||
*/ | ||
@Configuration | ||
@ConditionalOnProperty(prefix = "common.queue", name = "type", havingValue = "kafka", | ||
matchIfMissing = false) | ||
@Slf4j | ||
public class KafkaCommonDataQueue implements CommonDataQueue, DisposableBean { | ||
|
||
private KafkaProducer<Long, CollectRep.MetricsData> metricsDataProducer; | ||
private KafkaProducer<Long, Alert> kafkaAlertProducer; | ||
private KafkaConsumer<Long, Alert> alertConsumer; | ||
private KafkaConsumer<Long, CollectRep.MetricsData> metricsDataToAlertConsumer; | ||
private KafkaConsumer<Long, CollectRep.MetricsData> metricsDataToPersistentStorageConsumer; | ||
private KafkaConsumer<Long, CollectRep.MetricsData> metricsDataToMemoryStorageConsumer; | ||
@Autowired | ||
private KafkaProperties kafka; | ||
|
||
@PostConstruct | ||
public void initDataQueue(){ | ||
try { | ||
Map<String, Object> producerConfig = new HashMap<String, Object>(3); | ||
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getServers()); | ||
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); | ||
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 3); | ||
metricsDataProducer = new KafkaProducer<>(producerConfig, new LongSerializer(), new KafkaMetricsDataSerializer()); | ||
kafkaAlertProducer = new KafkaProducer<>(producerConfig, new LongSerializer(), new AlertSerializer()); | ||
|
||
Map<String, Object> consumerConfig = new HashMap<String, Object>(4); | ||
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka.getServers()); | ||
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); | ||
consumerConfig.put("group.id", "default-consumer"); | ||
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); | ||
|
||
alertConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new AlertDeserializer()); | ||
|
||
metricsDataToAlertConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer()); | ||
metricsDataToMemoryStorageConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer()); | ||
metricsDataToPersistentStorageConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer()); | ||
|
||
alertConsumer.subscribe(Collections.singletonList(kafka.getAlertTopic())); | ||
metricsDataToAlertConsumer.subscribe(Collections.singletonList(kafka.getAlertMetricTopic())); | ||
metricsDataToPersistentStorageConsumer.subscribe(Collections.singletonList(kafka.getPersistentStorageTopic())); | ||
metricsDataToMemoryStorageConsumer.subscribe(Collections.singletonList(kafka.getRealTimeMemoryStorageTopic())); | ||
} catch (Exception e) { | ||
log.error(e.getMessage(), e); | ||
} | ||
} | ||
|
||
@Override | ||
public void addAlertData(Alert alert) { | ||
if (kafkaAlertProducer != null) { | ||
kafkaAlertProducer.send(new ProducerRecord<>(kafka.getAlertTopic(), alert)); | ||
} else { | ||
log.error("kafkaAlertProducer is not enable"); | ||
} | ||
} | ||
|
||
@Override | ||
public Alert pollAlertData() throws InterruptedException { | ||
Alert alert = null; | ||
try { | ||
ConsumerRecords<Long, Alert> records = alertConsumer.poll(Duration.ofSeconds(1)); | ||
for (ConsumerRecord<Long, Alert> record : records) { | ||
alert = record.value(); | ||
} | ||
alertConsumer.commitAsync(); | ||
}catch (ConcurrentModificationException e){ | ||
//kafka多线程下线程不安全异常 | ||
} | ||
return alert; | ||
} | ||
|
||
@Override | ||
public CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException { | ||
CollectRep.MetricsData metricsData = null; | ||
try { | ||
ConsumerRecords<Long, CollectRep.MetricsData> records = metricsDataToAlertConsumer.poll(Duration.ofSeconds(1)); | ||
for ( ConsumerRecord<Long, CollectRep.MetricsData> record : records) { | ||
metricsData = record.value(); | ||
} | ||
metricsDataToAlertConsumer.commitAsync(); | ||
}catch (ConcurrentModificationException e){ | ||
//kafka多线程下线程不安全异常 | ||
} | ||
return metricsData; | ||
} | ||
|
||
@Override | ||
public CollectRep.MetricsData pollPersistentStorageMetricsData() throws InterruptedException { | ||
CollectRep.MetricsData persistentStorageMetricsData = null; | ||
try { | ||
ConsumerRecords<Long, CollectRep.MetricsData> records = metricsDataToPersistentStorageConsumer.poll(Duration.ofSeconds(1)); | ||
for ( ConsumerRecord<Long, CollectRep.MetricsData> record : records) { | ||
persistentStorageMetricsData = record.value(); | ||
} | ||
metricsDataToPersistentStorageConsumer.commitAsync(); | ||
}catch (ConcurrentModificationException e){ | ||
//kafka多线程下线程不安全异常 | ||
} | ||
return persistentStorageMetricsData; | ||
} | ||
|
||
@Override | ||
public CollectRep.MetricsData pollRealTimeStorageMetricsData() throws InterruptedException { | ||
CollectRep.MetricsData memoryMetricsData = null; | ||
try { | ||
ConsumerRecords<Long, CollectRep.MetricsData> records = metricsDataToMemoryStorageConsumer.poll(Duration.ofSeconds(1)); | ||
for ( ConsumerRecord<Long, CollectRep.MetricsData> record : records) { | ||
memoryMetricsData = record.value(); | ||
} | ||
metricsDataToMemoryStorageConsumer.commitAsync(); | ||
}catch (ConcurrentModificationException e){ | ||
//kafka多线程下线程不安全异常 | ||
} | ||
return memoryMetricsData; | ||
} | ||
|
||
@Override | ||
public void sendMetricsData(CollectRep.MetricsData metricsData) { | ||
if (metricsDataProducer != null) { | ||
metricsDataProducer.send(new ProducerRecord<>(kafka.getAlertMetricTopic(), metricsData)); | ||
metricsDataProducer.send(new ProducerRecord<>(kafka.getPersistentStorageTopic(), metricsData)); | ||
metricsDataProducer.send(new ProducerRecord<>(kafka.getRealTimeMemoryStorageTopic(), metricsData)); | ||
} else { | ||
log.error("metricsDataProducer is not enabled"); | ||
} | ||
|
||
} | ||
|
||
@Override | ||
public void destroy() throws Exception { | ||
if (metricsDataProducer != null) { | ||
metricsDataProducer.close(); | ||
} | ||
if (kafkaAlertProducer != null) { | ||
kafkaAlertProducer.close(); | ||
} | ||
if (alertConsumer != null) { | ||
alertConsumer.close(); | ||
} | ||
if (metricsDataToAlertConsumer != null) { | ||
metricsDataToAlertConsumer.close(); | ||
} | ||
if (metricsDataToPersistentStorageConsumer != null) { | ||
metricsDataToPersistentStorageConsumer.close(); | ||
} | ||
if (metricsDataToMemoryStorageConsumer != null) { | ||
metricsDataToMemoryStorageConsumer.close(); | ||
} | ||
} | ||
@Component | ||
@ConfigurationProperties("common.queue.kafka") | ||
public static class KafkaProperties { | ||
/** | ||
* kafka的连接服务器url | ||
*/ | ||
private String servers; | ||
/** | ||
* 接收数据的topic名称 | ||
*/ | ||
private String alertTopic; | ||
private String alertMetricTopic; | ||
private String persistentStorageTopic; | ||
private String realTimeMemoryStorageTopic; | ||
/** | ||
* 消费者组ID | ||
*/ | ||
private String groupId; | ||
public String getServers() { | ||
return servers; | ||
} | ||
|
||
public void setServers(String servers) { | ||
this.servers = servers; | ||
} | ||
|
||
public String getAlertTopic() { | ||
return alertTopic; | ||
} | ||
|
||
public void setAlertTopic(String alertTopic) { | ||
this.alertTopic = alertTopic; | ||
} | ||
|
||
public String getAlertMetricTopic() { | ||
return alertMetricTopic; | ||
} | ||
|
||
public void setAlertMetricTopic(String alertMetricTopic) { | ||
this.alertMetricTopic = alertMetricTopic; | ||
} | ||
|
||
public String getPersistentStorageTopic() { | ||
return persistentStorageTopic; | ||
} | ||
|
||
public void setPersistentStorageTopic(String persistentStorageTopic) { | ||
this.persistentStorageTopic = persistentStorageTopic; | ||
} | ||
|
||
public String getRealTimeMemoryStorageTopic() { | ||
return realTimeMemoryStorageTopic; | ||
} | ||
|
||
public void setRealTimeMemoryStorageTopic(String realTimeMemoryStorageTopic) { | ||
this.realTimeMemoryStorageTopic = realTimeMemoryStorageTopic; | ||
} | ||
|
||
public String getGroupId() { | ||
return groupId; | ||
} | ||
|
||
public void setGroupId(String groupId) { | ||
this.groupId = groupId; | ||
} | ||
} | ||
} |
34 changes: 34 additions & 0 deletions
34
common/src/main/java/org/dromara/hertzbeat/common/serialize/AlertDeserializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package org.dromara.hertzbeat.common.serialize; | ||
|
||
import org.dromara.hertzbeat.common.entity.alerter.Alert; | ||
import org.apache.kafka.common.header.Headers; | ||
import org.apache.kafka.common.serialization.Deserializer; | ||
import org.dromara.hertzbeat.common.util.JsonUtil; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* kafka告警记录反序列化类 | ||
* @author tablerow | ||
*/ | ||
public class AlertDeserializer implements Deserializer { | ||
@Override | ||
public void configure(Map configs, boolean isKey) { | ||
Deserializer.super.configure(configs, isKey); | ||
} | ||
|
||
@Override | ||
public Object deserialize(String s, byte[] bytes) { | ||
return JsonUtil.fromJson(new String(bytes), Alert.class); | ||
} | ||
|
||
@Override | ||
public Object deserialize(String topic, Headers headers, byte[] data) { | ||
return Deserializer.super.deserialize(topic, headers, data); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
Deserializer.super.close(); | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
common/src/main/java/org/dromara/hertzbeat/common/serialize/AlertSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package org.dromara.hertzbeat.common.serialize; | ||
|
||
|
||
import org.apache.kafka.common.header.Headers; | ||
import org.apache.kafka.common.serialization.Serializer; | ||
import org.dromara.hertzbeat.common.entity.alerter.Alert; | ||
import org.dromara.hertzbeat.common.util.JsonUtil; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* kafka告警记录序列化类 | ||
* @author tablerow | ||
*/ | ||
public class AlertSerializer implements Serializer<Alert> { | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs, boolean isKey) { | ||
Serializer.super.configure(configs, isKey); | ||
} | ||
|
||
@Override | ||
public byte[] serialize(String s, Alert alert) { | ||
if (alert == null){ | ||
return null; | ||
} | ||
return JsonUtil.toJson(alert).getBytes(); | ||
} | ||
|
||
@Override | ||
public byte[] serialize(String topic, Headers headers, Alert data) { | ||
return Serializer.super.serialize(topic, headers, data); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
Serializer.super.close(); | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
...on/src/main/java/org/dromara/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package org.dromara.hertzbeat.common.serialize; | ||
|
||
import com.google.protobuf.InvalidProtocolBufferException; | ||
import org.apache.kafka.common.header.Headers; | ||
import org.apache.kafka.common.serialization.Deserializer; | ||
import org.dromara.hertzbeat.common.entity.message.CollectRep; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* kafka指标组监控数据反序列化类 | ||
* @author tablerow | ||
*/ | ||
public class KafkaMetricsDataDeserializer implements Deserializer<CollectRep.MetricsData> { | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs, boolean isKey) { | ||
Deserializer.super.configure(configs, isKey); | ||
} | ||
|
||
@Override | ||
public CollectRep.MetricsData deserialize(String s, byte[] bytes){ | ||
try { | ||
return CollectRep.MetricsData.parseFrom(bytes); | ||
} catch (InvalidProtocolBufferException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public CollectRep.MetricsData deserialize(String topic, Headers headers, byte[] data) { | ||
return Deserializer.super.deserialize(topic, headers, data); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
Deserializer.super.close(); | ||
} | ||
} |
Oops, something went wrong.