Skip to content

Commit

Permalink
[hertzbeat] refactor common collect metrics data and alert data queue (
Browse files Browse the repository at this point in the history
…#320)

  [hertzbeat] refactor alert data queue

  [hertzbeat] refactor collect metrics data queue
  • Loading branch information
tomsun28 authored Oct 2, 2022
1 parent 30e9dfa commit e949f5a
Show file tree
Hide file tree
Showing 20 changed files with 233 additions and 163 deletions.
6 changes: 0 additions & 6 deletions alerter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@
<groupId>com.usthe.tancloud</groupId>
<artifactId>common</artifactId>
<version>1.0</version>
</dependency>
<!-- collector -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>collector</artifactId>
<version>1.0</version>
<scope>provided</scope>
</dependency>
<!-- spring -->
Expand Down
50 changes: 0 additions & 50 deletions alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
import com.googlecode.aviator.Expression;
import com.usthe.alert.AlerterProperties;
import com.usthe.alert.AlerterWorkerPool;
import com.usthe.alert.AlerterDataQueue;
import com.usthe.common.queue.CommonDataQueue;
import com.usthe.alert.dao.AlertMonitorDao;
import com.usthe.common.entity.alerter.Alert;
import com.usthe.common.entity.alerter.AlertDefine;
import com.usthe.alert.service.AlertDefineService;
import com.usthe.alert.util.AlertTemplateUtil;
import com.usthe.collector.dispatch.export.MetricsDataExporter;
import com.usthe.common.entity.manager.Monitor;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
Expand All @@ -49,8 +48,7 @@
public class CalculateAlarm {

private AlerterWorkerPool workerPool;
private AlerterDataQueue dataQueue;
private MetricsDataExporter dataExporter;
private CommonDataQueue dataQueue;
private AlertDefineService alertDefineService;
private AlerterProperties alerterProperties;
/**
Expand All @@ -62,12 +60,11 @@ public class CalculateAlarm {

private ResourceBundle bundle;

public CalculateAlarm (AlerterWorkerPool workerPool, AlerterDataQueue dataQueue,
AlertDefineService alertDefineService, MetricsDataExporter dataExporter,
AlertMonitorDao monitorDao, AlerterProperties alerterProperties) {
public CalculateAlarm (AlerterWorkerPool workerPool, CommonDataQueue dataQueue,
AlertDefineService alertDefineService, AlertMonitorDao monitorDao,
AlerterProperties alerterProperties) {
this.workerPool = workerPool;
this.dataQueue = dataQueue;
this.dataExporter = dataExporter;
this.alertDefineService = alertDefineService;
this.alerterProperties = alerterProperties;
this.bundle = ResourceBundleUtil.getBundle("alerter");
Expand Down Expand Up @@ -105,7 +102,7 @@ private void startCalculate() {
Runnable runnable = () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
CollectRep.MetricsData metricsData = dataExporter.pollAlertMetricsData();
CollectRep.MetricsData metricsData = dataQueue.pollAlertMetricsData();
if (metricsData != null) {
calculate(metricsData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.usthe.alert.service.impl;

import com.usthe.alert.AlerterDataQueue;
import com.usthe.common.queue.CommonDataQueue;
import com.usthe.alert.dao.AlertDao;
import com.usthe.alert.dto.AlertPriorityNum;
import com.usthe.alert.dto.AlertSummary;
Expand Down Expand Up @@ -57,7 +57,7 @@ public class AlertServiceImpl implements AlertService {
private AlertDao alertDao;

@Autowired
private AlerterDataQueue alerterDataQueue;
private CommonDataQueue commonDataQueue;

@Override
public void addAlert(Alert alert) throws RuntimeException {
Expand Down Expand Up @@ -129,7 +129,7 @@ public AlertSummary getAlertsSummary() {

@Override
public void addNewAlertReport(AlertReport alertReport) {
alerterDataQueue.addAlertData(buildAlertData(alertReport));
commonDataQueue.addAlertData(buildAlertData(alertReport));
}

/**
Expand Down
1 change: 0 additions & 1 deletion alerter/src/main/resources/META-INF/spring.factories
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ com.usthe.alert.service.impl.AlertServiceImpl,\
com.usthe.alert.controller.AlertDefineController,\
com.usthe.alert.AlerterWorkerPool,\
com.usthe.alert.AlerterProperties,\
com.usthe.alert.AlerterDataQueue,\
com.usthe.alert.AlerterConfiguration,\
com.usthe.alert.calculate.CalculateAlarm,\
com.usthe.alert.controller.AlertsController,\
Expand Down
1 change: 1 addition & 0 deletions collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
<groupId>com.usthe.tancloud</groupId>
<artifactId>common</artifactId>
<version>1.0</version>
<scope>provided</scope>
</dependency>
<!-- etcd -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.usthe.collector.dispatch.export.MetricsDataExporter;
import com.usthe.collector.dispatch.timer.Timeout;
import com.usthe.collector.dispatch.timer.TimerDispatch;
import com.usthe.collector.dispatch.timer.WheelTimerTask;
Expand All @@ -28,6 +27,7 @@
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.queue.CommonDataQueue;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -70,19 +70,19 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
*/
private TimerDispatch timerDispatch;
/**
* kafka collection data exporter
* kafka采集数据导出器
* collection data exporter
* 采集数据导出器
*/
private MetricsDataExporter kafkaDataExporter;
private CommonDataQueue commonDataQueue;
/**
* Metric group task and start time mapping map
* 指标组任务与开始时间映射map
*/
private Map<String, MetricsTime> metricsTimeoutMonitorMap;

public CommonDispatcher(MetricsCollectorQueue jobRequestQueue, TimerDispatch timerDispatch,
MetricsDataExporter kafkaDataExporter, WorkerPool workerPool) {
this.kafkaDataExporter = kafkaDataExporter;
CommonDataQueue commonDataQueue, WorkerPool workerPool) {
this.commonDataQueue = commonDataQueue;
this.jobRequestQueue = jobRequestQueue;
this.timerDispatch = timerDispatch;
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2, 1,
Expand Down Expand Up @@ -184,7 +184,7 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
if (job.isCyclic()) {
// If it is an asynchronous periodic cyclic task, directly send the collected data of the indicator group to the message middleware
// 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件
kafkaDataExporter.send(metricsData);
commonDataQueue.sendMetricsData(metricsData);
if (log.isDebugEnabled()) {
log.debug("Cyclic Job: {}",metricsData.getMetrics());
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -30,7 +29,6 @@
* @author tomsun28
* @date 2021/11/3 15:22
*/
@Component
@Slf4j
public class MetricsDataExporter implements DisposableBean {

Expand Down
1 change: 0 additions & 1 deletion collector/src/main/resources/META-INF/spring.factories
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ com.usthe.collector.dispatch.DispatchProperties,\
com.usthe.collector.dispatch.MetricsCollectorQueue,\
com.usthe.collector.dispatch.WorkerPool,\
com.usthe.collector.dispatch.entrance.internal.CollectJobService,\
com.usthe.collector.dispatch.export.MetricsDataExporter,\
com.usthe.collector.util.SpringContextHolder,\
com.usthe.collector.collect.database.JdbcSpiLoader,\
com.usthe.collector.collect.http.promethus.PrometheusParseCreater
Expand Down
35 changes: 35 additions & 0 deletions common/src/main/java/com/usthe/common/config/CommonProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,46 @@ public class CommonProperties {
*/
private String secretKey;

/**
* data queue impl
*/
private DataQueueProperties queue;

public String getSecretKey() {
return secretKey;
}

public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}

public DataQueueProperties getQueue() {
return queue;
}

public void setQueue(DataQueueProperties queue) {
this.queue = queue;
}

public static class DataQueueProperties {

private QueueType type = QueueType.Memory;

public QueueType getType() {
return type;
}

public void setType(QueueType type) {
this.type = type;
}
}

public static enum QueueType {
/** in memory **/
Memory,
/** kafka **/
Kafka,
/** rabbit mq **/
Rabbit_Mq
}
}
69 changes: 69 additions & 0 deletions common/src/main/java/com/usthe/common/queue/CommonDataQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.usthe.common.queue;

import com.usthe.common.entity.alerter.Alert;
import com.usthe.common.entity.message.CollectRep;

/**
* common data queue
* @author tom
* @date 2021/11/24 17:58
*/
public interface CommonDataQueue {

/**
* offer alert data
* @param alert alert data
*/
void addAlertData(Alert alert);

/**
* poll alert data
* @return alert data
* @throws InterruptedException when poll timeout
*/
Alert pollAlertData() throws InterruptedException;

/**
* poll collect metrics data for alerter
* @return metrics data
* @throws InterruptedException when poll timeout
*/
CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException;

/**
* poll collect metrics data for Persistent Storage
* @return metrics data
* @throws InterruptedException when poll timeout
*/
CollectRep.MetricsData pollPersistentStorageMetricsData() throws InterruptedException;

/**
* poll collect metrics data for real-time Storage
* @return metrics data
* @throws InterruptedException when poll timeout
*/
CollectRep.MetricsData pollRealTimeStorageMetricsData() throws InterruptedException;

/**
* send collect metrics data
* @param metricsData metrics data
*/
void sendMetricsData(CollectRep.MetricsData metricsData);
}
Loading

0 comments on commit e949f5a

Please sign in to comment.