From b9bfaf1eb511b450bd5c58cd20e43c7f33b8ebae Mon Sep 17 00:00:00 2001
From: TableRow <59152619+baiban114@users.noreply.github.com>
Date: Fri, 19 May 2023 11:03:36 +0800
Subject: [PATCH] [common] feature:add kafka common queue (#966)
---
common/pom.xml | 5 +
.../queue/impl/KafkaCommonDataQueue.java | 251 ++++++++++++++++++
.../common/serialize/AlertDeserializer.java | 34 +++
.../common/serialize/AlertSerializer.java | 39 +++
.../KafkaMetricsDataDeserializer.java | 39 +++
.../serialize/KafkaMetricsDataSerializer.java | 36 +++
.../src/main/resources/application-test.yml | 10 +
manager/src/main/resources/application.yml | 12 +
8 files changed, 426 insertions(+)
create mode 100644 common/src/main/java/org/dromara/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
create mode 100644 common/src/main/java/org/dromara/hertzbeat/common/serialize/AlertDeserializer.java
create mode 100644 common/src/main/java/org/dromara/hertzbeat/common/serialize/AlertSerializer.java
create mode 100644 common/src/main/java/org/dromara/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java
create mode 100644 common/src/main/java/org/dromara/hertzbeat/common/serialize/KafkaMetricsDataSerializer.java
diff --git a/common/pom.xml b/common/pom.xml
index e89deb3f2ca..2bed925cc40 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -102,5 +102,10 @@
com.github.ben-manes.caffeine
caffeine
+
+
+ org.apache.kafka
+ kafka-clients
+
diff --git a/common/src/main/java/org/dromara/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java b/common/src/main/java/org/dromara/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
new file mode 100644
index 00000000000..dec5ef889ef
--- /dev/null
+++ b/common/src/main/java/org/dromara/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
@@ -0,0 +1,251 @@
+package org.dromara.hertzbeat.common.queue.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.dromara.hertzbeat.common.entity.alerter.Alert;
+import org.dromara.hertzbeat.common.entity.message.CollectRep;
+import org.dromara.hertzbeat.common.queue.CommonDataQueue;
+import org.dromara.hertzbeat.common.serialize.AlertDeserializer;
+import org.dromara.hertzbeat.common.serialize.AlertSerializer;
+import org.dromara.hertzbeat.common.serialize.KafkaMetricsDataDeserializer;
+import org.dromara.hertzbeat.common.serialize.KafkaMetricsDataSerializer;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.time.Duration;
+import java.util.*;
+
+
+/**
+ * kafka采集数据队列实现
+ * @author tablerow
+ *
+ */
+@Configuration
+@ConditionalOnProperty(prefix = "common.queue", name = "type", havingValue = "kafka",
+ matchIfMissing = false)
+@Slf4j
+public class KafkaCommonDataQueue implements CommonDataQueue, DisposableBean {
+
+ private KafkaProducer metricsDataProducer;
+ private KafkaProducer kafkaAlertProducer;
+ private KafkaConsumer alertConsumer;
+ private KafkaConsumer metricsDataToAlertConsumer;
+ private KafkaConsumer metricsDataToPersistentStorageConsumer;
+ private KafkaConsumer metricsDataToMemoryStorageConsumer;
+ @Autowired
+ private KafkaProperties kafka;
+
+ @PostConstruct
+ public void initDataQueue(){
+ try {
+ Map producerConfig = new HashMap(3);
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getServers());
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfig.put(ProducerConfig.RETRIES_CONFIG, 3);
+ metricsDataProducer = new KafkaProducer<>(producerConfig, new LongSerializer(), new KafkaMetricsDataSerializer());
+ kafkaAlertProducer = new KafkaProducer<>(producerConfig, new LongSerializer(), new AlertSerializer());
+
+ Map consumerConfig = new HashMap(4);
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka.getServers());
+ consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
+ consumerConfig.put("group.id", "default-consumer");
+ consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ alertConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new AlertDeserializer());
+
+ metricsDataToAlertConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer());
+ metricsDataToMemoryStorageConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer());
+ metricsDataToPersistentStorageConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer());
+
+ alertConsumer.subscribe(Collections.singletonList(kafka.getAlertTopic()));
+ metricsDataToAlertConsumer.subscribe(Collections.singletonList(kafka.getAlertMetricTopic()));
+ metricsDataToPersistentStorageConsumer.subscribe(Collections.singletonList(kafka.getPersistentStorageTopic()));
+ metricsDataToMemoryStorageConsumer.subscribe(Collections.singletonList(kafka.getRealTimeMemoryStorageTopic()));
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void addAlertData(Alert alert) {
+ if (kafkaAlertProducer != null) {
+ kafkaAlertProducer.send(new ProducerRecord<>(kafka.getAlertTopic(), alert));
+ } else {
+ log.error("kafkaAlertProducer is not enable");
+ }
+ }
+
+ @Override
+ public Alert pollAlertData() throws InterruptedException {
+ Alert alert = null;
+ try {
+ ConsumerRecords records = alertConsumer.poll(Duration.ofSeconds(1));
+ for (ConsumerRecord record : records) {
+ alert = record.value();
+ }
+ alertConsumer.commitAsync();
+ }catch (ConcurrentModificationException e){
+ //kafka多线程下线程不安全异常
+ }
+ return alert;
+ }
+
+ @Override
+ public CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException {
+ CollectRep.MetricsData metricsData = null;
+ try {
+ ConsumerRecords records = metricsDataToAlertConsumer.poll(Duration.ofSeconds(1));
+ for ( ConsumerRecord record : records) {
+ metricsData = record.value();
+ }
+ metricsDataToAlertConsumer.commitAsync();
+ }catch (ConcurrentModificationException e){
+ //kafka多线程下线程不安全异常
+ }
+ return metricsData;
+ }
+
+ @Override
+ public CollectRep.MetricsData pollPersistentStorageMetricsData() throws InterruptedException {
+ CollectRep.MetricsData persistentStorageMetricsData = null;
+ try {
+ ConsumerRecords records = metricsDataToPersistentStorageConsumer.poll(Duration.ofSeconds(1));
+ for ( ConsumerRecord record : records) {
+ persistentStorageMetricsData = record.value();
+ }
+ metricsDataToPersistentStorageConsumer.commitAsync();
+ }catch (ConcurrentModificationException e){
+ //kafka多线程下线程不安全异常
+ }
+ return persistentStorageMetricsData;
+ }
+
+ @Override
+ public CollectRep.MetricsData pollRealTimeStorageMetricsData() throws InterruptedException {
+ CollectRep.MetricsData memoryMetricsData = null;
+ try {
+ ConsumerRecords records = metricsDataToMemoryStorageConsumer.poll(Duration.ofSeconds(1));
+ for ( ConsumerRecord record : records) {
+ memoryMetricsData = record.value();
+ }
+ metricsDataToMemoryStorageConsumer.commitAsync();
+ }catch (ConcurrentModificationException e){
+ //kafka多线程下线程不安全异常
+ }
+ return memoryMetricsData;
+ }
+
+ @Override
+ public void sendMetricsData(CollectRep.MetricsData metricsData) {
+ if (metricsDataProducer != null) {
+ metricsDataProducer.send(new ProducerRecord<>(kafka.getAlertMetricTopic(), metricsData));
+ metricsDataProducer.send(new ProducerRecord<>(kafka.getPersistentStorageTopic(), metricsData));
+ metricsDataProducer.send(new ProducerRecord<>(kafka.getRealTimeMemoryStorageTopic(), metricsData));
+ } else {
+ log.error("metricsDataProducer is not enabled");
+ }
+
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ if (metricsDataProducer != null) {
+ metricsDataProducer.close();
+ }
+ if (kafkaAlertProducer != null) {
+ kafkaAlertProducer.close();
+ }
+ if (alertConsumer != null) {
+ alertConsumer.close();
+ }
+ if (metricsDataToAlertConsumer != null) {
+ metricsDataToAlertConsumer.close();
+ }
+ if (metricsDataToPersistentStorageConsumer != null) {
+ metricsDataToPersistentStorageConsumer.close();
+ }
+ if (metricsDataToMemoryStorageConsumer != null) {
+ metricsDataToMemoryStorageConsumer.close();
+ }
+ }
+ @Component
+ @ConfigurationProperties("common.queue.kafka")
+ public static class KafkaProperties {
+ /**
+ * kafka的连接服务器url
+ */
+ private String servers;
+ /**
+ * 接收数据的topic名称
+ */
+ private String alertTopic;
+ private String alertMetricTopic;
+ private String persistentStorageTopic;
+ private String realTimeMemoryStorageTopic;
+ /**
+ * 消费者组ID
+ */
+ private String groupId;
+ public String getServers() {
+ return servers;
+ }
+
+ public void setServers(String servers) {
+ this.servers = servers;
+ }
+
+ public String getAlertTopic() {
+ return alertTopic;
+ }
+
+ public void setAlertTopic(String alertTopic) {
+ this.alertTopic = alertTopic;
+ }
+
+ public String getAlertMetricTopic() {
+ return alertMetricTopic;
+ }
+
+ public void setAlertMetricTopic(String alertMetricTopic) {
+ this.alertMetricTopic = alertMetricTopic;
+ }
+
+ public String getPersistentStorageTopic() {
+ return persistentStorageTopic;
+ }
+
+ public void setPersistentStorageTopic(String persistentStorageTopic) {
+ this.persistentStorageTopic = persistentStorageTopic;
+ }
+
+ public String getRealTimeMemoryStorageTopic() {
+ return realTimeMemoryStorageTopic;
+ }
+
+ public void setRealTimeMemoryStorageTopic(String realTimeMemoryStorageTopic) {
+ this.realTimeMemoryStorageTopic = realTimeMemoryStorageTopic;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+ }
+}
diff --git a/common/src/main/java/org/dromara/hertzbeat/common/serialize/AlertDeserializer.java b/common/src/main/java/org/dromara/hertzbeat/common/serialize/AlertDeserializer.java
new file mode 100644
index 00000000000..c0f940aa6c9
--- /dev/null
+++ b/common/src/main/java/org/dromara/hertzbeat/common/serialize/AlertDeserializer.java
@@ -0,0 +1,34 @@
+package org.dromara.hertzbeat.common.serialize;
+
+import org.dromara.hertzbeat.common.entity.alerter.Alert;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.dromara.hertzbeat.common.util.JsonUtil;
+
+import java.util.Map;
+
+/**
+ * kafka告警记录反序列化类
+ * @author tablerow
+ */
+public class AlertDeserializer implements Deserializer {
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ Deserializer.super.configure(configs, isKey);
+ }
+
+ @Override
+ public Object deserialize(String s, byte[] bytes) {
+ return JsonUtil.fromJson(new String(bytes), Alert.class);
+ }
+
+ @Override
+ public Object deserialize(String topic, Headers headers, byte[] data) {
+ return Deserializer.super.deserialize(topic, headers, data);
+ }
+
+ @Override
+ public void close() {
+ Deserializer.super.close();
+ }
+}
diff --git a/common/src/main/java/org/dromara/hertzbeat/common/serialize/AlertSerializer.java b/common/src/main/java/org/dromara/hertzbeat/common/serialize/AlertSerializer.java
new file mode 100644
index 00000000000..924921686c7
--- /dev/null
+++ b/common/src/main/java/org/dromara/hertzbeat/common/serialize/AlertSerializer.java
@@ -0,0 +1,39 @@
+package org.dromara.hertzbeat.common.serialize;
+
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serializer;
+import org.dromara.hertzbeat.common.entity.alerter.Alert;
+import org.dromara.hertzbeat.common.util.JsonUtil;
+
+import java.util.Map;
+
+/**
+ * kafka告警记录序列化类
+ * @author tablerow
+ */
+public class AlertSerializer implements Serializer {
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ Serializer.super.configure(configs, isKey);
+ }
+
+ @Override
+ public byte[] serialize(String s, Alert alert) {
+ if (alert == null){
+ return null;
+ }
+ return JsonUtil.toJson(alert).getBytes();
+ }
+
+ @Override
+ public byte[] serialize(String topic, Headers headers, Alert data) {
+ return Serializer.super.serialize(topic, headers, data);
+ }
+
+ @Override
+ public void close() {
+ Serializer.super.close();
+ }
+}
diff --git a/common/src/main/java/org/dromara/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java b/common/src/main/java/org/dromara/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java
new file mode 100644
index 00000000000..7a4a0198464
--- /dev/null
+++ b/common/src/main/java/org/dromara/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java
@@ -0,0 +1,39 @@
+package org.dromara.hertzbeat.common.serialize;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.dromara.hertzbeat.common.entity.message.CollectRep;
+
+import java.util.Map;
+
+/**
+ * kafka指标组监控数据反序列化类
+ * @author tablerow
+ */
+public class KafkaMetricsDataDeserializer implements Deserializer {
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ Deserializer.super.configure(configs, isKey);
+ }
+
+ @Override
+ public CollectRep.MetricsData deserialize(String s, byte[] bytes){
+ try {
+ return CollectRep.MetricsData.parseFrom(bytes);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public CollectRep.MetricsData deserialize(String topic, Headers headers, byte[] data) {
+ return Deserializer.super.deserialize(topic, headers, data);
+ }
+
+ @Override
+ public void close() {
+ Deserializer.super.close();
+ }
+}
diff --git a/common/src/main/java/org/dromara/hertzbeat/common/serialize/KafkaMetricsDataSerializer.java b/common/src/main/java/org/dromara/hertzbeat/common/serialize/KafkaMetricsDataSerializer.java
new file mode 100644
index 00000000000..45718af0473
--- /dev/null
+++ b/common/src/main/java/org/dromara/hertzbeat/common/serialize/KafkaMetricsDataSerializer.java
@@ -0,0 +1,36 @@
+package org.dromara.hertzbeat.common.serialize;
+
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serializer;
+import org.dromara.hertzbeat.common.entity.message.CollectRep;
+
+import java.util.Map;
+
+
+/**
+ * kafka指标组监控数据序列化类
+ * @author tablerow
+ */
+public class KafkaMetricsDataSerializer implements Serializer {
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ Serializer.super.configure(configs, isKey);
+ }
+
+ @Override
+ public byte[] serialize(String s, CollectRep.MetricsData metricsData) {
+ return metricsData.toByteArray();
+ }
+
+ @Override
+ public byte[] serialize(String topic, Headers headers, CollectRep.MetricsData data) {
+ return Serializer.super.serialize(topic, headers, data);
+ }
+
+ @Override
+ public void close() {
+ Serializer.super.close();
+ }
+}
diff --git a/manager/src/main/resources/application-test.yml b/manager/src/main/resources/application-test.yml
index 431306d5a98..96de70b0194 100644
--- a/manager/src/main/resources/application-test.yml
+++ b/manager/src/main/resources/application-test.yml
@@ -39,6 +39,16 @@ spring:
enable: true
debug: false
+common:
+ queue:
+ type: memory
+ kafka:
+ servers: 127.0.0.1:9092
+ alert-topic: async-collect-data-debug
+ alert-metric-topic: temp-alert-data
+ persistent-storage-topic: persistent-storage-data
+ real-time-memoryStorage-topic: memory-data
+
warehouse:
store:
jpa:
diff --git a/manager/src/main/resources/application.yml b/manager/src/main/resources/application.yml
index 3c2de16854d..71e91cc1e21 100644
--- a/manager/src/main/resources/application.yml
+++ b/manager/src/main/resources/application.yml
@@ -101,6 +101,18 @@ spring:
enable: true
debug: false
+common:
+ queue:
+ # memory or kafka
+ type: memory
+ # properties when queue type is kafka
+ kafka:
+ servers: 127.0.0.1:9092
+ alert-topic: async-collect-data-debug
+ alert-metric-topic: temp-alert-data
+ persistent-storage-topic: persistent-storage-data
+ real-time-memoryStorage-topic: memory-data
+
warehouse:
store:
# store history metrics data, enable only one below