Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor common kafka data queue #1017

Merged
merged 3 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void startCalculate() {
Runnable runnable = () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
CollectRep.MetricsData metricsData = dataQueue.pollAlertMetricsData();
CollectRep.MetricsData metricsData = dataQueue.pollMetricsDataToAlerter();
if (metricsData != null) {
calculate(metricsData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ public void filterSilenceAndSendData(Alert alert) {
}
}
}
dataQueue.addAlertData(alert);
dataQueue.sendAlertsData(alert);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public AlertSummary getAlertsSummary() {

@Override
public void addNewAlertReport(AlertReport alertReport) {
commonDataQueue.addAlertData(buildAlertData(alertReport));
commonDataQueue.sendAlertsData(buildAlertData(alertReport));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void setSms(SmsProperties sms) {
public static class DataQueueProperties {

private QueueType type = QueueType.Memory;

private KafkaProperties kafka;

public QueueType getType() {
return type;
Expand All @@ -78,6 +80,14 @@ public QueueType getType() {
public void setType(QueueType type) {
this.type = type;
}

public KafkaProperties getKafka() {
return kafka;
}

public void setKafka(KafkaProperties kafka) {
this.kafka = kafka;
}
}

public static enum QueueType {
Expand All @@ -88,6 +98,44 @@ public static enum QueueType {
/** rabbit mq **/
Rabbit_Mq
}

public static class KafkaProperties {
/**
* kafka的连接服务器url
*/
private String servers;
/**
* metrics data topic
*/
private String metricsDataTopic;
/**
* alerts data topic
*/
private String alertsDataTopic;
public String getServers() {
return servers;
}

public void setServers(String servers) {
this.servers = servers;
}

public String getMetricsDataTopic() {
return metricsDataTopic;
}

public void setMetricsDataTopic(String metricsDataTopic) {
this.metricsDataTopic = metricsDataTopic;
}

public String getAlertsDataTopic() {
return alertsDataTopic;
}

public void setAlertsDataTopic(String alertsDataTopic) {
this.alertsDataTopic = alertsDataTopic;
}
}

public static class SmsProperties {
private TencentSmsProperties tencent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,40 @@
*/
public interface CommonDataQueue {

/**
* offer alert data
* @param alert alert data
*/
void addAlertData(Alert alert);

/**
* poll alert data
* @return alert data
* @throws InterruptedException when poll timeout
*/
Alert pollAlertData() throws InterruptedException;
Alert pollAlertsData() throws InterruptedException;

/**
* poll collect metrics data for alerter
* @return metrics data
* @throws InterruptedException when poll timeout
*/
CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException;
CollectRep.MetricsData pollMetricsDataToAlerter() throws InterruptedException;

/**
* poll collect metrics data for Persistent Storage
* @return metrics data
* @throws InterruptedException when poll timeout
*/
CollectRep.MetricsData pollPersistentStorageMetricsData() throws InterruptedException;
CollectRep.MetricsData pollMetricsDataToPersistentStorage() throws InterruptedException;

/**
* poll collect metrics data for real-time Storage
* @return metrics data
* @throws InterruptedException when poll timeout
*/
CollectRep.MetricsData pollRealTimeStorageMetricsData() throws InterruptedException;

CollectRep.MetricsData pollMetricsDataToRealTimeStorage() throws InterruptedException;

/**
* offer alert data
* @param alert alert data
*/
void sendAlertsData(Alert alert);

/**
* send collect metrics data
* @param metricsData metrics data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,69 +36,68 @@
*
*/
@Configuration
@ConditionalOnProperty(prefix = "common.queue", name = "type", havingValue = "Memory",
matchIfMissing = true)
@ConditionalOnProperty(prefix = "common.queue", name = "type", havingValue = "memory", matchIfMissing = true)
@Slf4j
public class InMemoryCommonDataQueue implements CommonDataQueue, DisposableBean {

private final LinkedBlockingQueue<Alert> alertDataQueue;
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToAlertQueue;
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToPersistentStorageQueue;
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToMemoryStorageQueue;
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToRealTimeStorageQueue;

public InMemoryCommonDataQueue() {
alertDataQueue = new LinkedBlockingQueue<>();
metricsDataToAlertQueue = new LinkedBlockingQueue<>();
metricsDataToPersistentStorageQueue = new LinkedBlockingQueue<>();
metricsDataToMemoryStorageQueue = new LinkedBlockingQueue<>();
metricsDataToRealTimeStorageQueue = new LinkedBlockingQueue<>();
}

public Map<String, Integer> getQueueSizeMetricsInfo() {
Map<String, Integer> metrics = new HashMap<>(8);
metrics.put("alertDataQueue", alertDataQueue.size());
metrics.put("metricsDataToAlertQueue", metricsDataToAlertQueue.size());
metrics.put("metricsDataToPersistentStorageQueue", metricsDataToPersistentStorageQueue.size());
metrics.put("metricsDataToMemoryStorageQueue", metricsDataToMemoryStorageQueue.size());
metrics.put("metricsDataToMemoryStorageQueue", metricsDataToRealTimeStorageQueue.size());
return metrics;
}

@Override
public void addAlertData(Alert alert) {
public void sendAlertsData(Alert alert) {
alertDataQueue.offer(alert);
}

@Override
public Alert pollAlertData() throws InterruptedException {
public Alert pollAlertsData() throws InterruptedException {
return alertDataQueue.poll(2, TimeUnit.SECONDS);
}

@Override
public CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException {
public CollectRep.MetricsData pollMetricsDataToAlerter() throws InterruptedException {
return metricsDataToAlertQueue.poll(2, TimeUnit.SECONDS);
}

@Override
public CollectRep.MetricsData pollPersistentStorageMetricsData() throws InterruptedException {
public CollectRep.MetricsData pollMetricsDataToPersistentStorage() throws InterruptedException {
return metricsDataToPersistentStorageQueue.poll(2, TimeUnit.SECONDS);
}

@Override
public CollectRep.MetricsData pollRealTimeStorageMetricsData() throws InterruptedException {
return metricsDataToMemoryStorageQueue.poll(2, TimeUnit.SECONDS);
public CollectRep.MetricsData pollMetricsDataToRealTimeStorage() throws InterruptedException {
return metricsDataToRealTimeStorageQueue.poll(2, TimeUnit.SECONDS);
}

@Override
public void sendMetricsData(CollectRep.MetricsData metricsData) {
metricsDataToAlertQueue.offer(metricsData);
metricsDataToPersistentStorageQueue.offer(metricsData);
metricsDataToMemoryStorageQueue.offer(metricsData);
metricsDataToRealTimeStorageQueue.offer(metricsData);
}

@Override
public void destroy() {
alertDataQueue.clear();
metricsDataToAlertQueue.clear();
metricsDataToPersistentStorageQueue.clear();
metricsDataToMemoryStorageQueue.clear();
metricsDataToRealTimeStorageQueue.clear();
}
}
Loading