Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some codes opt #1214

Merged
merged 6 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.springframework.context.event.EventListener;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import javax.persistence.criteria.Predicate;
Expand All @@ -64,7 +65,7 @@
public class CalculateAlarm {

private static final String SYSTEM_VALUE_ROW_COUNT = "system_value_row_count";

/**
* The alarm in the process is triggered
* 触发中告警信息
Expand Down Expand Up @@ -170,50 +171,54 @@ private void calculate(CollectRep.MetricsData metricsData) {
}
}
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
if (!valueRow.getColumnsList().isEmpty()) {
fieldValueMap.clear();
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount);
String instance = valueRow.getInstance();
if (!"".equals(instance)) {
fieldValueMap.put("instance", instance);

if (CollectionUtils.isEmpty(valueRow.getColumnsList())) {
continue;
}
fieldValueMap.clear();
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount);
String instance = valueRow.getInstance();
if (!"".equals(instance)) {
fieldValueMap.put("instance", instance);
}
for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
String valueStr = valueRow.getColumns(index);
if (CommonConstants.NULL_VALUE.equals(valueStr)) {
continue;
}
for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
String valueStr = valueRow.getColumns(index);
if (!CommonConstants.NULL_VALUE.equals(valueStr)) {
CollectRep.Field field = fields.get(index);
if (field.getType() == CommonConstants.TYPE_NUMBER) {
Double doubleValue = CommonUtil.parseStrDouble(valueStr);
if (doubleValue != null) {
fieldValueMap.put(field.getName(), doubleValue);
}
} else {
if (!"".equals(valueStr)) {
fieldValueMap.put(field.getName(), valueStr);
}
}
CollectRep.Field field = fields.get(index);

if (field.getType() == CommonConstants.TYPE_NUMBER) {
Double doubleValue = CommonUtil.parseStrDouble(valueStr);
if (doubleValue != null) {
fieldValueMap.put(field.getName(), doubleValue);
}
}
try {
boolean match = execAlertExpression(fieldValueMap, expr);
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
// 阈值规则匹配,判断已触发阈值次数,触发告警
afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, fieldValueMap, define);
// 若此阈值已被触发,则其它数据行的触发忽略
break;
} else if (define.isRecoverNotice()) {
String notResolvedAlertKey = String.valueOf(monitorId) + define.getId() + (!"".equals(instance) ? instance : null);
handleRecoveredAlert(currentTimeMilli, monitorId, app, define, expr, notResolvedAlertKey);
} else {
if (!"".equals(valueStr)) {
fieldValueMap.put(field.getName(), valueStr);
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
}
try {
boolean match = execAlertExpression(fieldValueMap, expr);
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
// 阈值规则匹配,判断已触发阈值次数,触发告警
afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, fieldValueMap, define);
// 若此阈值已被触发,则其它数据行的触发忽略
break;
} else if (define.isRecoverNotice()) {
String notResolvedAlertKey = String.valueOf(monitorId) + define.getId() + (!"".equals(instance) ? instance : null);
handleRecoveredAlert(currentTimeMilli, monitorId, app, define, expr, notResolvedAlertKey);
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
}
}
}
}

private void handleRecoveredAlert(long currentTimeMilli, long monitorId, String app, AlertDefine define, String expr, String notResolvedAlertKey) {
Alert notResolvedAlert = notRecoveredAlertMap.remove(notResolvedAlertKey);
if (notResolvedAlert != null) {
Expand All @@ -234,7 +239,7 @@ private void handleRecoveredAlert(long currentTimeMilli, long monitorId, String
alarmCommonReduce.reduceAndSendAlarm(resumeAlert);
}
}

private void afterThresholdRuleMatch(long currentTimeMilli, long monitorId, String app, String metrics, Map<String, Object> fieldValueMap, AlertDefine define) {
String monitorAlertKey = String.valueOf(monitorId) + define.getId();
Alert triggeredAlert = triggeredAlertMap.get(monitorAlertKey);
Expand All @@ -257,7 +262,7 @@ private void afterThresholdRuleMatch(long currentTimeMilli, long monitorId, Stri
Map<String, String> tags = new HashMap<>(6);
tags.put(CommonConstants.TAG_MONITOR_ID, String.valueOf(monitorId));
tags.put(CommonConstants.TAG_MONITOR_APP, app);
if (define.getTags() != null && !define.getTags().isEmpty()) {
if (!CollectionUtils.isEmpty(define.getTags())) {
for (TagItem tagItem : define.getTags()) {
fieldValueMap.put(tagItem.getName(), tagItem.getValue());
tags.put(tagItem.getName(), tagItem.getValue());
Expand Down Expand Up @@ -286,7 +291,7 @@ private void afterThresholdRuleMatch(long currentTimeMilli, long monitorId, Stri
}
}
}

private boolean execAlertExpression(Map<String, Object> fieldValueMap, String expr) {
Boolean match = false;
try {
Expand All @@ -302,7 +307,7 @@ private boolean execAlertExpression(Map<String, Object> fieldValueMap, String ex
}
return match;
}

private void handlerAvailableMetrics(long monitorId, String app, CollectRep.MetricsData metricsData) {
AlertDefine avaAlertDefine = alertDefineService.getMonitorBindAlertAvaDefine(monitorId, app, CommonConstants.AVAILABILITY);
if (avaAlertDefine == null) {
Expand All @@ -318,7 +323,8 @@ private void handlerAvailableMetrics(long monitorId, String app, CollectRep.Metr
tags.put("code", metricsData.getCode().name());
Map<String, Object> valueMap = tags.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (avaAlertDefine.getTags() != null && !avaAlertDefine.getTags().isEmpty()) {

if (!CollectionUtils.isEmpty(avaAlertDefine.getTags())) {
for (TagItem tagItem : avaAlertDefine.getTags()) {
valueMap.put(tagItem.getName(), tagItem.getValue());
tags.put(tagItem.getName(), tagItem.getValue());
Expand Down Expand Up @@ -433,17 +439,17 @@ private List<Alert> queryAvailabilityAlerts(long monitorId, Alert restoreAlert)
//query results
return alertService.getAlerts(specification);
}

@EventListener(SystemConfigChangeEvent.class)
public void onSystemConfigChangeEvent(SystemConfigChangeEvent event) {
log.info("calculate alarm receive system config change event: {}.", event.getSource());
this.bundle = ResourceBundleUtil.getBundle("alerter");
}

@EventListener(MonitorDeletedEvent.class)
public void onMonitorDeletedEvent(MonitorDeletedEvent event) {
log.info("calculate alarm receive monitor {} has been deleted.", event.getMonitorId());
this.triggeredAlertMap.remove(String.valueOf(event.getMonitorId()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.hertzbeat.collector.dispatch.timer.Timeout;
import org.dromara.hertzbeat.collector.dispatch.timer.TimerDispatch;
import org.dromara.hertzbeat.collector.dispatch.timer.WheelTimerTask;
Expand All @@ -29,26 +33,22 @@
import org.dromara.hertzbeat.common.entity.job.Metrics;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.queue.CommonDataQueue;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Indicator group collection task and response data scheduler
* 指标组采集任务与响应数据调度器
*
* @author tomsun28
*
*/
@Component
@Slf4j
Expand Down Expand Up @@ -125,7 +125,8 @@ public CommonDispatcher(MetricsCollectorQueue jobRequestQueue,
metricsCollect.setRunPriority((byte) (metricsCollect.getRunPriority() + 1));
jobRequestQueue.addJob(metricsCollect);
}
} catch (InterruptedException ignored) {}
} catch (InterruptedException ignored) {
}
} catch (Exception e) {
log.error("[Dispatcher]-{}.", e.getMessage(), e);
}
Expand Down Expand Up @@ -241,31 +242,33 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
// 当前级别指标组执行完成,开始执行下一级别的指标组
// use pre collect metrics data to replace next metrics config params
List<Map<String, Configmap>> configmapList = getConfigmapFromPreCollectData(metricsData);
metricsSet.forEach(metricItem -> {
if (configmapList != null && !configmapList.isEmpty() && CollectUtil.containCryPlaceholder(GSON.toJsonTree(metricItem))) {
int subTaskNum = Math.min(configmapList.size(), MAX_SUB_TASK_NUM);
AtomicInteger subTaskNumAtomic = new AtomicInteger(subTaskNum);
AtomicReference<CollectRep.MetricsData> metricsDataReference = new AtomicReference<>();
for (int index = 0; index < subTaskNum; index ++) {
Map<String, Configmap> configmap = configmapList.get(index);
JsonElement metricJson = GSON.toJsonTree(metricItem);
CollectUtil.replaceCryPlaceholder(metricJson, configmap);
Metrics metric = GSON.fromJson(metricJson, Metrics.class);
metric.setSubTaskNum(subTaskNumAtomic);
metric.setSubTaskId(index);
metric.setSubTaskDataRef(metricsDataReference);
MetricsCollect metricsCollect = new MetricsCollect(metric, timeout, this, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metric.getName() + "-sub-" + index,
new MetricsTime(System.currentTimeMillis(), metric, timeout));
}
} else {
for (Metrics metricItem : metricsSet) {
if (CollectionUtils.isEmpty(configmapList) || CollectUtil.notContainCryPlaceholder(GSON.toJsonTree(metricItem))) {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(),
new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
continue;
}
});

int subTaskNum = Math.min(configmapList.size(), MAX_SUB_TASK_NUM);
AtomicInteger subTaskNumAtomic = new AtomicInteger(subTaskNum);
AtomicReference<CollectRep.MetricsData> metricsDataReference = new AtomicReference<>();
for (int index = 0; index < subTaskNum; index++) {
Map<String, Configmap> configmap = configmapList.get(index);
JsonElement metricJson = GSON.toJsonTree(metricItem);
CollectUtil.replaceCryPlaceholder(metricJson, configmap);
Metrics metric = GSON.fromJson(metricJson, Metrics.class);
metric.setSubTaskNum(subTaskNumAtomic);
metric.setSubTaskId(index);
metric.setSubTaskDataRef(metricsDataReference);
MetricsCollect metricsCollect = new MetricsCollect(metric, timeout, this, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metric.getName() + "-sub-" + index,
new MetricsTime(System.currentTimeMillis(), metric, timeout));
}

}
} else {
// The list of indicator groups at the current execution level has not been fully executed.
// It needs to wait for the execution of other indicator groups of the same level to complete the execution and enter the next level for execution.
Expand Down Expand Up @@ -314,22 +317,17 @@ private List<Map<String, Configmap>> getConfigmapFromPreCollectData(CollectRep.M
if (metricsData.getValuesCount() <= 0 || metricsData.getFieldsCount() <= 0) {
return null;
}
List<Map<String, Configmap>> mapList = new LinkedList<>();
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
if (valueRow.getColumnsCount() != metricsData.getFieldsCount()) {
continue;
}
Map<String, Configmap> configmapMap = new HashMap<>(valueRow.getColumnsCount());
int index = 0;
for (CollectRep.Field field : metricsData.getFieldsList()) {
String value = valueRow.getColumns(index);
index++;
Configmap configmap = new Configmap(field.getName(), value, Integer.valueOf(field.getType()).byteValue());
configmapMap.put(field.getName(), configmap);
}
mapList.add(configmapMap);
}
return mapList;
return CollectionUtils.emptyIfNull(metricsData.getValuesList())
.stream()
.filter(ele -> ele.getColumnsCount() == metricsData.getFieldsCount())
.map(ele -> IntStream.range(0, metricsData.getFieldsList().size())
.boxed()
.map(index -> {
String value = ele.getColumns(index);
CollectRep.Field field = metricsData.getFieldsList().get(index);
return new Configmap(field.getName(), value, Integer.valueOf(field.getType()).byteValue());
}).collect(Collectors.toMap(Configmap::getKey, item -> item, (a, b) -> b)))
.collect(Collectors.toList());
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ public static boolean containCryPlaceholder(JsonElement jsonElement) {
return CRYING_PLACEHOLDER_REGEX_PATTERN.matcher(jsonStr).find();
}

public static boolean notContainCryPlaceholder(JsonElement jsonElement) {
return !containCryPlaceholder(jsonElement);
}

/**
* json parameter replacement
* json 参数替换
Expand Down
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
</dependencies>
</project>
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<h2.version>2.1.214</h2.version>
<taos-jdbcdriver.version>3.0.0</taos-jdbcdriver.version>
<iotdb-session.version>0.13.3</iotdb-session.version>

<commons-collections4.version>4.4</commons-collections4.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -274,6 +274,13 @@
<!-- 3.x版本支持Java11及以上, 因此这边还是使用2.x的版本-->
<version>${caffeine.version}</version>
</dependency>
<!-- 通用集合操作 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
Loading