Skip to content

Commit

Permalink
refactor some codes optimization (#1214)
Browse files Browse the repository at this point in the history
Signed-off-by: lingluojun <247677857yh@gmail.com>
  • Loading branch information
LINGLUOJUN authored Sep 4, 2023
1 parent 97ca5ab commit 8a6c2ab
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.springframework.context.event.EventListener;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import javax.persistence.criteria.Predicate;
Expand All @@ -64,7 +65,7 @@
public class CalculateAlarm {

private static final String SYSTEM_VALUE_ROW_COUNT = "system_value_row_count";

/**
* The alarm in the process is triggered
* 触发中告警信息
Expand Down Expand Up @@ -170,50 +171,54 @@ private void calculate(CollectRep.MetricsData metricsData) {
}
}
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
if (!valueRow.getColumnsList().isEmpty()) {
fieldValueMap.clear();
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount);
String instance = valueRow.getInstance();
if (!"".equals(instance)) {
fieldValueMap.put("instance", instance);

if (CollectionUtils.isEmpty(valueRow.getColumnsList())) {
continue;
}
fieldValueMap.clear();
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount);
String instance = valueRow.getInstance();
if (!"".equals(instance)) {
fieldValueMap.put("instance", instance);
}
for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
String valueStr = valueRow.getColumns(index);
if (CommonConstants.NULL_VALUE.equals(valueStr)) {
continue;
}
for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
String valueStr = valueRow.getColumns(index);
if (!CommonConstants.NULL_VALUE.equals(valueStr)) {
CollectRep.Field field = fields.get(index);
if (field.getType() == CommonConstants.TYPE_NUMBER) {
Double doubleValue = CommonUtil.parseStrDouble(valueStr);
if (doubleValue != null) {
fieldValueMap.put(field.getName(), doubleValue);
}
} else {
if (!"".equals(valueStr)) {
fieldValueMap.put(field.getName(), valueStr);
}
}
CollectRep.Field field = fields.get(index);

if (field.getType() == CommonConstants.TYPE_NUMBER) {
Double doubleValue = CommonUtil.parseStrDouble(valueStr);
if (doubleValue != null) {
fieldValueMap.put(field.getName(), doubleValue);
}
}
try {
boolean match = execAlertExpression(fieldValueMap, expr);
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
// 阈值规则匹配,判断已触发阈值次数,触发告警
afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, fieldValueMap, define);
// 若此阈值已被触发,则其它数据行的触发忽略
break;
} else if (define.isRecoverNotice()) {
String notResolvedAlertKey = String.valueOf(monitorId) + define.getId() + (!"".equals(instance) ? instance : null);
handleRecoveredAlert(currentTimeMilli, monitorId, app, define, expr, notResolvedAlertKey);
} else {
if (!"".equals(valueStr)) {
fieldValueMap.put(field.getName(), valueStr);
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
}
try {
boolean match = execAlertExpression(fieldValueMap, expr);
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
// 阈值规则匹配,判断已触发阈值次数,触发告警
afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, fieldValueMap, define);
// 若此阈值已被触发,则其它数据行的触发忽略
break;
} else if (define.isRecoverNotice()) {
String notResolvedAlertKey = String.valueOf(monitorId) + define.getId() + (!"".equals(instance) ? instance : null);
handleRecoveredAlert(currentTimeMilli, monitorId, app, define, expr, notResolvedAlertKey);
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
}
}
}
}

private void handleRecoveredAlert(long currentTimeMilli, long monitorId, String app, AlertDefine define, String expr, String notResolvedAlertKey) {
Alert notResolvedAlert = notRecoveredAlertMap.remove(notResolvedAlertKey);
if (notResolvedAlert != null) {
Expand All @@ -234,7 +239,7 @@ private void handleRecoveredAlert(long currentTimeMilli, long monitorId, String
alarmCommonReduce.reduceAndSendAlarm(resumeAlert);
}
}

private void afterThresholdRuleMatch(long currentTimeMilli, long monitorId, String app, String metrics, Map<String, Object> fieldValueMap, AlertDefine define) {
String monitorAlertKey = String.valueOf(monitorId) + define.getId();
Alert triggeredAlert = triggeredAlertMap.get(monitorAlertKey);
Expand All @@ -257,7 +262,7 @@ private void afterThresholdRuleMatch(long currentTimeMilli, long monitorId, Stri
Map<String, String> tags = new HashMap<>(6);
tags.put(CommonConstants.TAG_MONITOR_ID, String.valueOf(monitorId));
tags.put(CommonConstants.TAG_MONITOR_APP, app);
if (define.getTags() != null && !define.getTags().isEmpty()) {
if (!CollectionUtils.isEmpty(define.getTags())) {
for (TagItem tagItem : define.getTags()) {
fieldValueMap.put(tagItem.getName(), tagItem.getValue());
tags.put(tagItem.getName(), tagItem.getValue());
Expand Down Expand Up @@ -286,7 +291,7 @@ private void afterThresholdRuleMatch(long currentTimeMilli, long monitorId, Stri
}
}
}

private boolean execAlertExpression(Map<String, Object> fieldValueMap, String expr) {
Boolean match = false;
try {
Expand All @@ -302,7 +307,7 @@ private boolean execAlertExpression(Map<String, Object> fieldValueMap, String ex
}
return match;
}

private void handlerAvailableMetrics(long monitorId, String app, CollectRep.MetricsData metricsData) {
AlertDefine avaAlertDefine = alertDefineService.getMonitorBindAlertAvaDefine(monitorId, app, CommonConstants.AVAILABILITY);
if (avaAlertDefine == null) {
Expand All @@ -318,7 +323,8 @@ private void handlerAvailableMetrics(long monitorId, String app, CollectRep.Metr
tags.put("code", metricsData.getCode().name());
Map<String, Object> valueMap = tags.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (avaAlertDefine.getTags() != null && !avaAlertDefine.getTags().isEmpty()) {

if (!CollectionUtils.isEmpty(avaAlertDefine.getTags())) {
for (TagItem tagItem : avaAlertDefine.getTags()) {
valueMap.put(tagItem.getName(), tagItem.getValue());
tags.put(tagItem.getName(), tagItem.getValue());
Expand Down Expand Up @@ -433,17 +439,17 @@ private List<Alert> queryAvailabilityAlerts(long monitorId, Alert restoreAlert)
//query results
return alertService.getAlerts(specification);
}

@EventListener(SystemConfigChangeEvent.class)
public void onSystemConfigChangeEvent(SystemConfigChangeEvent event) {
log.info("calculate alarm receive system config change event: {}.", event.getSource());
this.bundle = ResourceBundleUtil.getBundle("alerter");
}

@EventListener(MonitorDeletedEvent.class)
public void onMonitorDeletedEvent(MonitorDeletedEvent event) {
log.info("calculate alarm receive monitor {} has been deleted.", event.getMonitorId());
this.triggeredAlertMap.remove(String.valueOf(event.getMonitorId()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.hertzbeat.collector.dispatch.timer.Timeout;
import org.dromara.hertzbeat.collector.dispatch.timer.TimerDispatch;
import org.dromara.hertzbeat.collector.dispatch.timer.WheelTimerTask;
Expand All @@ -29,26 +33,22 @@
import org.dromara.hertzbeat.common.entity.job.Metrics;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.queue.CommonDataQueue;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Indicator group collection task and response data scheduler
* 指标组采集任务与响应数据调度器
*
* @author tomsun28
*
*/
@Component
@Slf4j
Expand Down Expand Up @@ -125,7 +125,8 @@ public CommonDispatcher(MetricsCollectorQueue jobRequestQueue,
metricsCollect.setRunPriority((byte) (metricsCollect.getRunPriority() + 1));
jobRequestQueue.addJob(metricsCollect);
}
} catch (InterruptedException ignored) {}
} catch (InterruptedException ignored) {
}
} catch (Exception e) {
log.error("[Dispatcher]-{}.", e.getMessage(), e);
}
Expand Down Expand Up @@ -241,31 +242,33 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
// 当前级别指标组执行完成,开始执行下一级别的指标组
// use pre collect metrics data to replace next metrics config params
List<Map<String, Configmap>> configmapList = getConfigmapFromPreCollectData(metricsData);
metricsSet.forEach(metricItem -> {
if (configmapList != null && !configmapList.isEmpty() && CollectUtil.containCryPlaceholder(GSON.toJsonTree(metricItem))) {
int subTaskNum = Math.min(configmapList.size(), MAX_SUB_TASK_NUM);
AtomicInteger subTaskNumAtomic = new AtomicInteger(subTaskNum);
AtomicReference<CollectRep.MetricsData> metricsDataReference = new AtomicReference<>();
for (int index = 0; index < subTaskNum; index ++) {
Map<String, Configmap> configmap = configmapList.get(index);
JsonElement metricJson = GSON.toJsonTree(metricItem);
CollectUtil.replaceCryPlaceholder(metricJson, configmap);
Metrics metric = GSON.fromJson(metricJson, Metrics.class);
metric.setSubTaskNum(subTaskNumAtomic);
metric.setSubTaskId(index);
metric.setSubTaskDataRef(metricsDataReference);
MetricsCollect metricsCollect = new MetricsCollect(metric, timeout, this, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metric.getName() + "-sub-" + index,
new MetricsTime(System.currentTimeMillis(), metric, timeout));
}
} else {
for (Metrics metricItem : metricsSet) {
if (CollectionUtils.isEmpty(configmapList) || CollectUtil.notContainCryPlaceholder(GSON.toJsonTree(metricItem))) {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(),
new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
continue;
}
});

int subTaskNum = Math.min(configmapList.size(), MAX_SUB_TASK_NUM);
AtomicInteger subTaskNumAtomic = new AtomicInteger(subTaskNum);
AtomicReference<CollectRep.MetricsData> metricsDataReference = new AtomicReference<>();
for (int index = 0; index < subTaskNum; index++) {
Map<String, Configmap> configmap = configmapList.get(index);
JsonElement metricJson = GSON.toJsonTree(metricItem);
CollectUtil.replaceCryPlaceholder(metricJson, configmap);
Metrics metric = GSON.fromJson(metricJson, Metrics.class);
metric.setSubTaskNum(subTaskNumAtomic);
metric.setSubTaskId(index);
metric.setSubTaskDataRef(metricsDataReference);
MetricsCollect metricsCollect = new MetricsCollect(metric, timeout, this, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metric.getName() + "-sub-" + index,
new MetricsTime(System.currentTimeMillis(), metric, timeout));
}

}
} else {
// The list of indicator groups at the current execution level has not been fully executed.
// It needs to wait for the execution of other indicator groups of the same level to complete the execution and enter the next level for execution.
Expand Down Expand Up @@ -314,22 +317,17 @@ private List<Map<String, Configmap>> getConfigmapFromPreCollectData(CollectRep.M
if (metricsData.getValuesCount() <= 0 || metricsData.getFieldsCount() <= 0) {
return null;
}
List<Map<String, Configmap>> mapList = new LinkedList<>();
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
if (valueRow.getColumnsCount() != metricsData.getFieldsCount()) {
continue;
}
Map<String, Configmap> configmapMap = new HashMap<>(valueRow.getColumnsCount());
int index = 0;
for (CollectRep.Field field : metricsData.getFieldsList()) {
String value = valueRow.getColumns(index);
index++;
Configmap configmap = new Configmap(field.getName(), value, Integer.valueOf(field.getType()).byteValue());
configmapMap.put(field.getName(), configmap);
}
mapList.add(configmapMap);
}
return mapList;
return CollectionUtils.emptyIfNull(metricsData.getValuesList())
.stream()
.filter(ele -> ele.getColumnsCount() == metricsData.getFieldsCount())
.map(ele -> IntStream.range(0, metricsData.getFieldsList().size())
.boxed()
.map(index -> {
String value = ele.getColumns(index);
CollectRep.Field field = metricsData.getFieldsList().get(index);
return new Configmap(field.getName(), value, Integer.valueOf(field.getType()).byteValue());
}).collect(Collectors.toMap(Configmap::getKey, item -> item, (a, b) -> b)))
.collect(Collectors.toList());
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ public static boolean containCryPlaceholder(JsonElement jsonElement) {
return CRYING_PLACEHOLDER_REGEX_PATTERN.matcher(jsonStr).find();
}

public static boolean notContainCryPlaceholder(JsonElement jsonElement) {
return !containCryPlaceholder(jsonElement);
}

/**
* json parameter replacement
* json 参数替换
Expand Down
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
</dependencies>
</project>
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<h2.version>2.1.214</h2.version>
<taos-jdbcdriver.version>3.0.0</taos-jdbcdriver.version>
<iotdb-session.version>0.13.3</iotdb-session.version>

<commons-collections4.version>4.4</commons-collections4.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -274,6 +274,13 @@
<!-- 3.x版本支持Java11及以上, 因此这边还是使用2.x的版本-->
<version>${caffeine.version}</version>
</dependency>
<!-- 通用集合操作 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down

0 comments on commit 8a6c2ab

Please sign in to comment.