Skip to content

Commit

Permalink
refactor common kafka data queue (#1017)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomsun28 authored Jun 7, 2023
1 parent c48bbeb commit 510f0e4
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void startCalculate() {
Runnable runnable = () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
CollectRep.MetricsData metricsData = dataQueue.pollAlertMetricsData();
CollectRep.MetricsData metricsData = dataQueue.pollMetricsDataToAlerter();
if (metricsData != null) {
calculate(metricsData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ public void filterSilenceAndSendData(Alert alert) {
}
}
}
dataQueue.addAlertData(alert);
dataQueue.sendAlertsData(alert);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public AlertSummary getAlertsSummary() {

@Override
public void addNewAlertReport(AlertReport alertReport) {
commonDataQueue.addAlertData(buildAlertData(alertReport));
commonDataQueue.sendAlertsData(buildAlertData(alertReport));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void setSms(SmsProperties sms) {
public static class DataQueueProperties {

private QueueType type = QueueType.Memory;

private KafkaProperties kafka;

public QueueType getType() {
return type;
Expand All @@ -78,6 +80,14 @@ public QueueType getType() {
public void setType(QueueType type) {
this.type = type;
}

public KafkaProperties getKafka() {
return kafka;
}

public void setKafka(KafkaProperties kafka) {
this.kafka = kafka;
}
}

public static enum QueueType {
Expand All @@ -88,6 +98,44 @@ public static enum QueueType {
/** rabbit mq **/
Rabbit_Mq
}

public static class KafkaProperties {
/**
* kafka的连接服务器url
*/
private String servers;
/**
* metrics data topic
*/
private String metricsDataTopic;
/**
* alerts data topic
*/
private String alertsDataTopic;
public String getServers() {
return servers;
}

public void setServers(String servers) {
this.servers = servers;
}

public String getMetricsDataTopic() {
return metricsDataTopic;
}

public void setMetricsDataTopic(String metricsDataTopic) {
this.metricsDataTopic = metricsDataTopic;
}

public String getAlertsDataTopic() {
return alertsDataTopic;
}

public void setAlertsDataTopic(String alertsDataTopic) {
this.alertsDataTopic = alertsDataTopic;
}
}

public static class SmsProperties {
private TencentSmsProperties tencent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,40 @@
*/
public interface CommonDataQueue {

/**
* offer alert data
* @param alert alert data
*/
void addAlertData(Alert alert);

/**
* poll alert data
* @return alert data
* @throws InterruptedException when poll timeout
*/
Alert pollAlertData() throws InterruptedException;
Alert pollAlertsData() throws InterruptedException;

/**
* poll collect metrics data for alerter
* @return metrics data
* @throws InterruptedException when poll timeout
*/
CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException;
CollectRep.MetricsData pollMetricsDataToAlerter() throws InterruptedException;

/**
* poll collect metrics data for Persistent Storage
* @return metrics data
* @throws InterruptedException when poll timeout
*/
CollectRep.MetricsData pollPersistentStorageMetricsData() throws InterruptedException;
CollectRep.MetricsData pollMetricsDataToPersistentStorage() throws InterruptedException;

/**
* poll collect metrics data for real-time Storage
* @return metrics data
* @throws InterruptedException when poll timeout
*/
CollectRep.MetricsData pollRealTimeStorageMetricsData() throws InterruptedException;

CollectRep.MetricsData pollMetricsDataToRealTimeStorage() throws InterruptedException;

/**
* offer alert data
* @param alert alert data
*/
void sendAlertsData(Alert alert);

/**
* send collect metrics data
* @param metricsData metrics data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,69 +36,68 @@
*
*/
@Configuration
@ConditionalOnProperty(prefix = "common.queue", name = "type", havingValue = "Memory",
matchIfMissing = true)
@ConditionalOnProperty(prefix = "common.queue", name = "type", havingValue = "memory", matchIfMissing = true)
@Slf4j
public class InMemoryCommonDataQueue implements CommonDataQueue, DisposableBean {

private final LinkedBlockingQueue<Alert> alertDataQueue;
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToAlertQueue;
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToPersistentStorageQueue;
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToMemoryStorageQueue;
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToRealTimeStorageQueue;

public InMemoryCommonDataQueue() {
alertDataQueue = new LinkedBlockingQueue<>();
metricsDataToAlertQueue = new LinkedBlockingQueue<>();
metricsDataToPersistentStorageQueue = new LinkedBlockingQueue<>();
metricsDataToMemoryStorageQueue = new LinkedBlockingQueue<>();
metricsDataToRealTimeStorageQueue = new LinkedBlockingQueue<>();
}

public Map<String, Integer> getQueueSizeMetricsInfo() {
Map<String, Integer> metrics = new HashMap<>(8);
metrics.put("alertDataQueue", alertDataQueue.size());
metrics.put("metricsDataToAlertQueue", metricsDataToAlertQueue.size());
metrics.put("metricsDataToPersistentStorageQueue", metricsDataToPersistentStorageQueue.size());
metrics.put("metricsDataToMemoryStorageQueue", metricsDataToMemoryStorageQueue.size());
metrics.put("metricsDataToMemoryStorageQueue", metricsDataToRealTimeStorageQueue.size());
return metrics;
}

@Override
public void addAlertData(Alert alert) {
public void sendAlertsData(Alert alert) {
alertDataQueue.offer(alert);
}

@Override
public Alert pollAlertData() throws InterruptedException {
public Alert pollAlertsData() throws InterruptedException {
return alertDataQueue.poll(2, TimeUnit.SECONDS);
}

@Override
public CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException {
public CollectRep.MetricsData pollMetricsDataToAlerter() throws InterruptedException {
return metricsDataToAlertQueue.poll(2, TimeUnit.SECONDS);
}

@Override
public CollectRep.MetricsData pollPersistentStorageMetricsData() throws InterruptedException {
public CollectRep.MetricsData pollMetricsDataToPersistentStorage() throws InterruptedException {
return metricsDataToPersistentStorageQueue.poll(2, TimeUnit.SECONDS);
}

@Override
public CollectRep.MetricsData pollRealTimeStorageMetricsData() throws InterruptedException {
return metricsDataToMemoryStorageQueue.poll(2, TimeUnit.SECONDS);
public CollectRep.MetricsData pollMetricsDataToRealTimeStorage() throws InterruptedException {
return metricsDataToRealTimeStorageQueue.poll(2, TimeUnit.SECONDS);
}

@Override
public void sendMetricsData(CollectRep.MetricsData metricsData) {
metricsDataToAlertQueue.offer(metricsData);
metricsDataToPersistentStorageQueue.offer(metricsData);
metricsDataToMemoryStorageQueue.offer(metricsData);
metricsDataToRealTimeStorageQueue.offer(metricsData);
}

@Override
public void destroy() {
alertDataQueue.clear();
metricsDataToAlertQueue.clear();
metricsDataToPersistentStorageQueue.clear();
metricsDataToMemoryStorageQueue.clear();
metricsDataToRealTimeStorageQueue.clear();
}
}
Loading

0 comments on commit 510f0e4

Please sign in to comment.