Skip to content

Commit

Permalink
add time type to support query_time of mysql and mariadb (#1364)
Browse files Browse the repository at this point in the history
Signed-off-by: Clownsw <msmliexx1@gmail.com>
  • Loading branch information
Clownsw authored Nov 30, 2023
1 parent b469419 commit 628cb4c
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,29 @@
import com.googlecode.aviator.exception.CompileExpressionErrorException;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
import com.googlecode.aviator.exception.ExpressionSyntaxErrorException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.dromara.hertzbeat.alert.AlerterWorkerPool;
import org.dromara.hertzbeat.alert.dao.AlertMonitorDao;
import org.dromara.hertzbeat.alert.reduce.AlarmCommonReduce;
import org.dromara.hertzbeat.alert.service.AlertDefineService;
import org.dromara.hertzbeat.alert.service.AlertService;
import org.dromara.hertzbeat.common.entity.manager.TagItem;
import org.dromara.hertzbeat.common.queue.CommonDataQueue;
import org.dromara.hertzbeat.alert.dao.AlertMonitorDao;
import org.dromara.hertzbeat.alert.util.AlertTemplateUtil;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.entity.alerter.Alert;
import org.dromara.hertzbeat.common.entity.alerter.AlertDefine;
import org.dromara.hertzbeat.alert.service.AlertDefineService;
import org.dromara.hertzbeat.alert.util.AlertTemplateUtil;
import org.dromara.hertzbeat.common.entity.manager.Monitor;
import org.dromara.hertzbeat.common.entity.manager.TagItem;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.queue.CommonDataQueue;
import org.dromara.hertzbeat.common.support.event.MonitorDeletedEvent;
import org.dromara.hertzbeat.common.support.event.SystemConfigChangeEvent;
import org.dromara.hertzbeat.common.util.CommonUtil;
import org.dromara.hertzbeat.common.util.ResourceBundleUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import javax.persistence.criteria.Predicate;
import java.util.*;
Expand Down Expand Up @@ -86,7 +86,7 @@ public class CalculateAlarm {

public CalculateAlarm(AlerterWorkerPool workerPool, CommonDataQueue dataQueue,
AlertDefineService alertDefineService, AlertMonitorDao monitorDao,
AlarmCommonReduce alarmCommonReduce, AlertService alertService) {
AlarmCommonReduce alarmCommonReduce, AlertService alertService) {
this.workerPool = workerPool;
this.dataQueue = dataQueue;
this.alarmCommonReduce = alarmCommonReduce;
Expand Down Expand Up @@ -120,7 +120,7 @@ private void startCalculate() {
calculate(metricsData);
}
} catch (InterruptedException ignored) {

} catch (Exception e) {
log.error("calculate alarm error: {}.", e.getMessage(), e);
}
Expand Down Expand Up @@ -154,10 +154,12 @@ private void calculate(CollectRep.MetricsData metricsData) {
for (Map.Entry<String, List<AlertDefine>> entry : defineMap.entrySet()) {
List<AlertDefine> defines = entry.getValue();
for (AlertDefine define : defines) {
String expr = define.getExpr();
if (!StringUtils.hasText(expr)) {
final String expr = define.getExpr();

if (StringUtils.isBlank(expr)) {
continue;
}

if (expr.contains(SYSTEM_VALUE_ROW_COUNT)) {
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount);
try {
Expand Down Expand Up @@ -189,18 +191,26 @@ private void calculate(CollectRep.MetricsData metricsData) {
if (CommonConstants.NULL_VALUE.equals(valueStr)) {
continue;
}
CollectRep.Field field = fields.get(index);

if (field.getType() == CommonConstants.TYPE_NUMBER) {
Double doubleValue = CommonUtil.parseStrDouble(valueStr);
if (doubleValue != null) {
final CollectRep.Field field = fields.get(index);
final int fieldType = field.getType();

if (fieldType == CommonConstants.TYPE_NUMBER) {
final Double doubleValue;
if ((doubleValue = CommonUtil.parseStrDouble(valueStr)) != null) {
fieldValueMap.put(field.getName(), doubleValue);
}
} else if (fieldType == CommonConstants.TYPE_TIME) {
final Integer integerValue;
if ((integerValue = CommonUtil.parseStrInteger(valueStr)) != null) {
fieldValueMap.put(field.getName(), integerValue);
}
} else {
if (!"".equals(valueStr)) {
if (StringUtils.isNotEmpty(valueStr)) {
fieldValueMap.put(field.getName(), valueStr);
}
}

if (field.getLabel()) {
instanceBuilder.append(valueStr).append("-");
}
Expand Down Expand Up @@ -232,15 +242,15 @@ private void handleRecoveredAlert(long currentTimeMilli, long monitorId, String
Map<String, String> tags = notResolvedAlert.getTags();
String content = this.bundle.getString("alerter.alarm.recover") + " : " + expr;
Alert resumeAlert = Alert.builder()
.tags(tags)
.target(define.getApp() + "." + define.getMetric() + "." + define.getField())
.content(content)
.priority(CommonConstants.ALERT_PRIORITY_CODE_WARNING)
.status(CommonConstants.ALERT_STATUS_CODE_RESTORED)
.firstAlarmTime(currentTimeMilli)
.lastAlarmTime(currentTimeMilli)
.triggerTimes(1)
.build();
.tags(tags)
.target(define.getApp() + "." + define.getMetric() + "." + define.getField())
.content(content)
.priority(CommonConstants.ALERT_PRIORITY_CODE_WARNING)
.status(CommonConstants.ALERT_STATUS_CODE_RESTORED)
.firstAlarmTime(currentTimeMilli)
.lastAlarmTime(currentTimeMilli)
.triggerTimes(1)
.build();
alarmCommonReduce.reduceAndSendAlarm(resumeAlert);
}
}
Expand Down Expand Up @@ -275,17 +285,17 @@ private void afterThresholdRuleMatch(long currentTimeMilli, long monitorId, Stri
}
}
Alert alert = Alert.builder()
.tags(tags)
.priority(define.getPriority())
.status(ALERT_STATUS_CODE_PENDING)
.target(app + "." + metrics + "." + define.getField())
.triggerTimes(1)
.firstAlarmTime(currentTimeMilli)
.lastAlarmTime(currentTimeMilli)
// Keyword matching and substitution in the template
// 模板中关键字匹配替换
.content(AlertTemplateUtil.render(define.getTemplate(), fieldValueMap))
.build();
.tags(tags)
.priority(define.getPriority())
.status(ALERT_STATUS_CODE_PENDING)
.target(app + "." + metrics + "." + define.getField())
.triggerTimes(1)
.firstAlarmTime(currentTimeMilli)
.lastAlarmTime(currentTimeMilli)
// Keyword matching and substitution in the template
// 模板中关键字匹配替换
.content(AlertTemplateUtil.render(define.getTemplate(), fieldValueMap))
.build();
int defineTimes = define.getTimes() == null ? 1 : define.getTimes();
if (1 >= defineTimes) {
String notResolvedAlertKey = String.valueOf(monitorId) + define.getId() + fieldValueMap.get("instance");
Expand Down Expand Up @@ -329,7 +339,7 @@ private void handlerAvailableMetrics(long monitorId, String app, CollectRep.Metr
tags.put("metrics", CommonConstants.AVAILABILITY);
tags.put("code", metricsData.getCode().name());
Map<String, Object> valueMap = tags.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (!CollectionUtils.isEmpty(avaAlertDefine.getTags())) {
for (TagItem tagItem : avaAlertDefine.getTags()) {
Expand All @@ -339,14 +349,14 @@ private void handlerAvailableMetrics(long monitorId, String app, CollectRep.Metr
}
if (preAlert == null) {
Alert.AlertBuilder alertBuilder = Alert.builder()
.tags(tags)
.priority(avaAlertDefine.getPriority())
.status(ALERT_STATUS_CODE_PENDING)
.target(CommonConstants.AVAILABILITY)
.content(AlertTemplateUtil.render(avaAlertDefine.getTemplate(), valueMap))
.firstAlarmTime(currentTimeMill)
.lastAlarmTime(currentTimeMill)
.triggerTimes(1);
.tags(tags)
.priority(avaAlertDefine.getPriority())
.status(ALERT_STATUS_CODE_PENDING)
.target(CommonConstants.AVAILABILITY)
.content(AlertTemplateUtil.render(avaAlertDefine.getTemplate(), valueMap))
.firstAlarmTime(currentTimeMill)
.lastAlarmTime(currentTimeMill)
.triggerTimes(1);
if (avaAlertDefine.getTimes() == null || avaAlertDefine.getTimes() <= 1) {
String notResolvedAlertKey = monitorId + CommonConstants.AVAILABILITY;
notRecoveredAlertMap.put(notResolvedAlertKey, alertBuilder.build());
Expand Down Expand Up @@ -389,15 +399,15 @@ private void handlerAvailableMetrics(long monitorId, String app, CollectRep.Metr
}
String content = this.bundle.getString("alerter.availability.recover");
Alert resumeAlert = Alert.builder()
.tags(tags)
.target(CommonConstants.AVAILABILITY)
.content(content)
.priority(CommonConstants.ALERT_PRIORITY_CODE_WARNING)
.status(CommonConstants.ALERT_STATUS_CODE_RESTORED)
.firstAlarmTime(currentTimeMill)
.lastAlarmTime(currentTimeMill)
.triggerTimes(1)
.build();
.tags(tags)
.target(CommonConstants.AVAILABILITY)
.content(content)
.priority(CommonConstants.ALERT_PRIORITY_CODE_WARNING)
.status(CommonConstants.ALERT_STATUS_CODE_RESTORED)
.firstAlarmTime(currentTimeMill)
.lastAlarmTime(currentTimeMill)
.triggerTimes(1)
.build();
alarmCommonReduce.reduceAndSendAlarm(resumeAlert);
Runnable updateStatusJob = () -> {
// todo update pre all type alarm status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@
import org.dromara.hertzbeat.common.entity.job.Job;
import org.dromara.hertzbeat.common.entity.job.Metrics;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.util.CommonUtil;
import org.dromara.hertzbeat.common.util.Pair;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -282,7 +279,7 @@ private void calculateFields(Metrics metrics, CollectRep.MetricsData.Builder col
value = String.valueOf(objValue);
}
} catch (Exception e) {
log.info("[calculates execute warning] {}.", e.getMessage());
log.info("[calculates execute warning] {}.", e.getMessage());
}
} else {
// does not exist then map the alias value
Expand All @@ -293,13 +290,23 @@ private void calculateFields(Metrics metrics, CollectRep.MetricsData.Builder col
} else {
value = aliasFieldValueMap.get(realField);
}
if (CommonConstants.TYPE_NUMBER == field.getType() && value != null) {
CollectUtil.DoubleAndUnit doubleAndUnit = CollectUtil
.extractDoubleAndUnitFromStr(value);
value = String.valueOf(doubleAndUnit.getValue());
aliasFieldUnit = doubleAndUnit.getUnit();

if (value != null) {
final byte fieldType = field.getType();

if (fieldType == CommonConstants.TYPE_NUMBER) {
CollectUtil.DoubleAndUnit doubleAndUnit = CollectUtil
.extractDoubleAndUnitFromStr(value);
final Double tempValue = doubleAndUnit.getValue();
value = tempValue == null ? null : String.valueOf(tempValue);
aliasFieldUnit = doubleAndUnit.getUnit();
} else if (fieldType == CommonConstants.TYPE_TIME) {
final int tempValue;
value = (tempValue = CommonUtil.parseTimeStrToSecond(value)) == -1 ? null : String.valueOf(tempValue);
}
}
}

// 单位处理
Pair<String, String> unitPair = fieldUnitMap.get(realField);
if (aliasFieldUnit != null) {
Expand Down Expand Up @@ -335,15 +342,14 @@ private void calculateFields(Metrics metrics, CollectRep.MetricsData.Builder col


/**
*
* @param cal
* @param fieldAliasMap
* @return
*/
private Object[] transformCal(String cal, Map<String, String> fieldAliasMap) {
int splitIndex = cal.indexOf("=");
String field = cal.substring(0, splitIndex).trim();
String expressionStr = cal.substring(splitIndex + 1).trim().replace("\\#","#");
String expressionStr = cal.substring(splitIndex + 1).trim().replace("\\#", "#");
Expression expression;
try {
expression = AviatorEvaluator.compile(expressionStr, true);
Expand All @@ -357,6 +363,7 @@ private Object[] transformCal(String cal, Map<String, String> fieldAliasMap) {

/**
* transform unit
*
* @param unit
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
/**
* Public Constant
*
*
*/
public interface CommonConstants {

Expand Down Expand Up @@ -150,20 +151,24 @@ public interface CommonConstants {
*/
byte TYPE_SECRET = 2;

/**
* Field parameter type: time
* 字段参数类型: 时间
*/
byte TYPE_TIME = 3;

/**
* Collection indicator value: null placeholder for empty value
* 采集指标值:null空值占位符
*/
String NULL_VALUE = "&nbsp;";

/**
*
*
*/
String PROM_TIME = "timestamp";

/**
*
*
*/
String PROM_VALUE = "value";
Expand All @@ -190,12 +195,12 @@ public interface CommonConstants {
* 参数类型 密码
*/
byte PARAM_TYPE_PASSWORD = 2;

/**
* Parameter Type Map values
*/
byte PARAM_TYPE_MAP = 3;

/**
* Parameter Type arrays values
*/
Expand Down Expand Up @@ -244,7 +249,7 @@ public interface CommonConstants {
* 内有标签: app 监控类型
*/
String TAG_MONITOR_APP = "app";

/**
* 内有标签: alarm type
*/
Expand All @@ -264,37 +269,37 @@ public interface CommonConstants {
* cache key notice_rule
*/
String CACHE_NOTICE_RULE = "notice_rule";

/**
* cache key alert silence
*/
String CACHE_ALERT_SILENCE = "alert_silence";

/**
* cache key alert converge
*/
String CACHE_ALERT_CONVERGE = "alert_converge";

/**
* collector status online 0
*/
byte COLLECTOR_STATUS_ONLINE = 0;

/**
* collector status offline 1
*/
byte COLLECTOR_STATUS_OFFLINE = 1;

/**
* default main collector name
*/
String MAIN_COLLECTOR_NODE = "main-default-collector";

/**
* locale spilt
*/
String LOCALE_SEPARATOR = "_";

/**
* ignore label
* 处理未配置恢复告警,但需要使用恢复告警变更任务状态的情况
Expand Down
Loading

0 comments on commit 628cb4c

Please sign in to comment.