diff --git a/alerter/src/main/java/org/dromara/hertzbeat/alert/calculate/CalculateAlarm.java b/alerter/src/main/java/org/dromara/hertzbeat/alert/calculate/CalculateAlarm.java index 77142c245a2..1cf27367db8 100644 --- a/alerter/src/main/java/org/dromara/hertzbeat/alert/calculate/CalculateAlarm.java +++ b/alerter/src/main/java/org/dromara/hertzbeat/alert/calculate/CalculateAlarm.java @@ -43,6 +43,7 @@ import org.springframework.context.event.EventListener; import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import javax.persistence.criteria.Predicate; @@ -64,7 +65,7 @@ public class CalculateAlarm { private static final String SYSTEM_VALUE_ROW_COUNT = "system_value_row_count"; - + /** * The alarm in the process is triggered * 触发中告警信息 @@ -170,50 +171,54 @@ private void calculate(CollectRep.MetricsData metricsData) { } } for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { - if (!valueRow.getColumnsList().isEmpty()) { - fieldValueMap.clear(); - fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount); - String instance = valueRow.getInstance(); - if (!"".equals(instance)) { - fieldValueMap.put("instance", instance); + + if (CollectionUtils.isEmpty(valueRow.getColumnsList())) { + continue; + } + fieldValueMap.clear(); + fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount); + String instance = valueRow.getInstance(); + if (!"".equals(instance)) { + fieldValueMap.put("instance", instance); + } + for (int index = 0; index < valueRow.getColumnsList().size(); index++) { + String valueStr = valueRow.getColumns(index); + if (CommonConstants.NULL_VALUE.equals(valueStr)) { + continue; } - for (int index = 0; index < valueRow.getColumnsList().size(); index++) { - String valueStr = valueRow.getColumns(index); - if (!CommonConstants.NULL_VALUE.equals(valueStr)) { - CollectRep.Field field = fields.get(index); - if (field.getType() == CommonConstants.TYPE_NUMBER) { - Double doubleValue = CommonUtil.parseStrDouble(valueStr); - if (doubleValue != null) { - fieldValueMap.put(field.getName(), doubleValue); - } - } else { - if (!"".equals(valueStr)) { - fieldValueMap.put(field.getName(), valueStr); - } - } + CollectRep.Field field = fields.get(index); + + if (field.getType() == CommonConstants.TYPE_NUMBER) { + Double doubleValue = CommonUtil.parseStrDouble(valueStr); + if (doubleValue != null) { + fieldValueMap.put(field.getName(), doubleValue); } - } - try { - boolean match = execAlertExpression(fieldValueMap, expr); - if (match) { - // If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered - // 阈值规则匹配,判断已触发阈值次数,触发告警 - afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, fieldValueMap, define); - // 若此阈值已被触发,则其它数据行的触发忽略 - break; - } else if (define.isRecoverNotice()) { - String notResolvedAlertKey = String.valueOf(monitorId) + define.getId() + (!"".equals(instance) ? instance : null); - handleRecoveredAlert(currentTimeMilli, monitorId, app, define, expr, notResolvedAlertKey); + } else { + if (!"".equals(valueStr)) { + fieldValueMap.put(field.getName(), valueStr); } - } catch (Exception e) { - log.warn(e.getMessage(), e); } } + try { + boolean match = execAlertExpression(fieldValueMap, expr); + if (match) { + // If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered + // 阈值规则匹配,判断已触发阈值次数,触发告警 + afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, fieldValueMap, define); + // 若此阈值已被触发,则其它数据行的触发忽略 + break; + } else if (define.isRecoverNotice()) { + String notResolvedAlertKey = String.valueOf(monitorId) + define.getId() + (!"".equals(instance) ? instance : null); + handleRecoveredAlert(currentTimeMilli, monitorId, app, define, expr, notResolvedAlertKey); + } + } catch (Exception e) { + log.warn(e.getMessage(), e); + } } } } } - + private void handleRecoveredAlert(long currentTimeMilli, long monitorId, String app, AlertDefine define, String expr, String notResolvedAlertKey) { Alert notResolvedAlert = notRecoveredAlertMap.remove(notResolvedAlertKey); if (notResolvedAlert != null) { @@ -234,7 +239,7 @@ private void handleRecoveredAlert(long currentTimeMilli, long monitorId, String alarmCommonReduce.reduceAndSendAlarm(resumeAlert); } } - + private void afterThresholdRuleMatch(long currentTimeMilli, long monitorId, String app, String metrics, Map fieldValueMap, AlertDefine define) { String monitorAlertKey = String.valueOf(monitorId) + define.getId(); Alert triggeredAlert = triggeredAlertMap.get(monitorAlertKey); @@ -257,7 +262,7 @@ private void afterThresholdRuleMatch(long currentTimeMilli, long monitorId, Stri Map tags = new HashMap<>(6); tags.put(CommonConstants.TAG_MONITOR_ID, String.valueOf(monitorId)); tags.put(CommonConstants.TAG_MONITOR_APP, app); - if (define.getTags() != null && !define.getTags().isEmpty()) { + if (!CollectionUtils.isEmpty(define.getTags())) { for (TagItem tagItem : define.getTags()) { fieldValueMap.put(tagItem.getName(), tagItem.getValue()); tags.put(tagItem.getName(), tagItem.getValue()); @@ -286,7 +291,7 @@ private void afterThresholdRuleMatch(long currentTimeMilli, long monitorId, Stri } } } - + private boolean execAlertExpression(Map fieldValueMap, String expr) { Boolean match = false; try { @@ -302,7 +307,7 @@ private boolean execAlertExpression(Map fieldValueMap, String ex } return match; } - + private void handlerAvailableMetrics(long monitorId, String app, CollectRep.MetricsData metricsData) { AlertDefine avaAlertDefine = alertDefineService.getMonitorBindAlertAvaDefine(monitorId, app, CommonConstants.AVAILABILITY); if (avaAlertDefine == null) { @@ -318,7 +323,8 @@ private void handlerAvailableMetrics(long monitorId, String app, CollectRep.Metr tags.put("code", metricsData.getCode().name()); Map valueMap = tags.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - if (avaAlertDefine.getTags() != null && !avaAlertDefine.getTags().isEmpty()) { + + if (!CollectionUtils.isEmpty(avaAlertDefine.getTags())) { for (TagItem tagItem : avaAlertDefine.getTags()) { valueMap.put(tagItem.getName(), tagItem.getValue()); tags.put(tagItem.getName(), tagItem.getValue()); @@ -433,17 +439,17 @@ private List queryAvailabilityAlerts(long monitorId, Alert restoreAlert) //query results return alertService.getAlerts(specification); } - + @EventListener(SystemConfigChangeEvent.class) public void onSystemConfigChangeEvent(SystemConfigChangeEvent event) { log.info("calculate alarm receive system config change event: {}.", event.getSource()); this.bundle = ResourceBundleUtil.getBundle("alerter"); } - + @EventListener(MonitorDeletedEvent.class) public void onMonitorDeletedEvent(MonitorDeletedEvent event) { log.info("calculate alarm receive monitor {} has been deleted.", event.getMonitorId()); this.triggeredAlertMap.remove(String.valueOf(event.getMonitorId())); } - + } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java index fdc5f1278bb..4d872da14cc 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java @@ -19,6 +19,10 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.dromara.hertzbeat.collector.dispatch.timer.Timeout; import org.dromara.hertzbeat.collector.dispatch.timer.TimerDispatch; import org.dromara.hertzbeat.collector.dispatch.timer.WheelTimerTask; @@ -29,26 +33,22 @@ import org.dromara.hertzbeat.common.entity.job.Metrics; import org.dromara.hertzbeat.common.entity.message.CollectRep; import org.dromara.hertzbeat.common.queue.CommonDataQueue; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Indicator group collection task and response data scheduler * 指标组采集任务与响应数据调度器 * * @author tomsun28 - * */ @Component @Slf4j @@ -125,7 +125,8 @@ public CommonDispatcher(MetricsCollectorQueue jobRequestQueue, metricsCollect.setRunPriority((byte) (metricsCollect.getRunPriority() + 1)); jobRequestQueue.addJob(metricsCollect); } - } catch (InterruptedException ignored) {} + } catch (InterruptedException ignored) { + } } catch (Exception e) { log.error("[Dispatcher]-{}.", e.getMessage(), e); } @@ -241,31 +242,33 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met // 当前级别指标组执行完成,开始执行下一级别的指标组 // use pre collect metrics data to replace next metrics config params List> configmapList = getConfigmapFromPreCollectData(metricsData); - metricsSet.forEach(metricItem -> { - if (configmapList != null && !configmapList.isEmpty() && CollectUtil.containCryPlaceholder(GSON.toJsonTree(metricItem))) { - int subTaskNum = Math.min(configmapList.size(), MAX_SUB_TASK_NUM); - AtomicInteger subTaskNumAtomic = new AtomicInteger(subTaskNum); - AtomicReference metricsDataReference = new AtomicReference<>(); - for (int index = 0; index < subTaskNum; index ++) { - Map configmap = configmapList.get(index); - JsonElement metricJson = GSON.toJsonTree(metricItem); - CollectUtil.replaceCryPlaceholder(metricJson, configmap); - Metrics metric = GSON.fromJson(metricJson, Metrics.class); - metric.setSubTaskNum(subTaskNumAtomic); - metric.setSubTaskId(index); - metric.setSubTaskDataRef(metricsDataReference); - MetricsCollect metricsCollect = new MetricsCollect(metric, timeout, this, unitConvertList); - jobRequestQueue.addJob(metricsCollect); - metricsTimeoutMonitorMap.put(job.getId() + "-" + metric.getName() + "-sub-" + index, - new MetricsTime(System.currentTimeMillis(), metric, timeout)); - } - } else { + for (Metrics metricItem : metricsSet) { + if (CollectionUtils.isEmpty(configmapList) || CollectUtil.notContainCryPlaceholder(GSON.toJsonTree(metricItem))) { MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList); jobRequestQueue.addJob(metricsCollect); metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(), new MetricsTime(System.currentTimeMillis(), metricItem, timeout)); + continue; } - }); + + int subTaskNum = Math.min(configmapList.size(), MAX_SUB_TASK_NUM); + AtomicInteger subTaskNumAtomic = new AtomicInteger(subTaskNum); + AtomicReference metricsDataReference = new AtomicReference<>(); + for (int index = 0; index < subTaskNum; index++) { + Map configmap = configmapList.get(index); + JsonElement metricJson = GSON.toJsonTree(metricItem); + CollectUtil.replaceCryPlaceholder(metricJson, configmap); + Metrics metric = GSON.fromJson(metricJson, Metrics.class); + metric.setSubTaskNum(subTaskNumAtomic); + metric.setSubTaskId(index); + metric.setSubTaskDataRef(metricsDataReference); + MetricsCollect metricsCollect = new MetricsCollect(metric, timeout, this, unitConvertList); + jobRequestQueue.addJob(metricsCollect); + metricsTimeoutMonitorMap.put(job.getId() + "-" + metric.getName() + "-sub-" + index, + new MetricsTime(System.currentTimeMillis(), metric, timeout)); + } + + } } else { // The list of indicator groups at the current execution level has not been fully executed. // It needs to wait for the execution of other indicator groups of the same level to complete the execution and enter the next level for execution. @@ -314,22 +317,17 @@ private List> getConfigmapFromPreCollectData(CollectRep.M if (metricsData.getValuesCount() <= 0 || metricsData.getFieldsCount() <= 0) { return null; } - List> mapList = new LinkedList<>(); - for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { - if (valueRow.getColumnsCount() != metricsData.getFieldsCount()) { - continue; - } - Map configmapMap = new HashMap<>(valueRow.getColumnsCount()); - int index = 0; - for (CollectRep.Field field : metricsData.getFieldsList()) { - String value = valueRow.getColumns(index); - index++; - Configmap configmap = new Configmap(field.getName(), value, Integer.valueOf(field.getType()).byteValue()); - configmapMap.put(field.getName(), configmap); - } - mapList.add(configmapMap); - } - return mapList; + return CollectionUtils.emptyIfNull(metricsData.getValuesList()) + .stream() + .filter(ele -> ele.getColumnsCount() == metricsData.getFieldsCount()) + .map(ele -> IntStream.range(0, metricsData.getFieldsList().size()) + .boxed() + .map(index -> { + String value = ele.getColumns(index); + CollectRep.Field field = metricsData.getFieldsList().get(index); + return new Configmap(field.getName(), value, Integer.valueOf(field.getType()).byteValue()); + }).collect(Collectors.toMap(Configmap::getKey, item -> item, (a, b) -> b))) + .collect(Collectors.toList()); } @Data diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/util/CollectUtil.java b/collector/src/main/java/org/dromara/hertzbeat/collector/util/CollectUtil.java index d61de73de1b..85daf2340cd 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/util/CollectUtil.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/util/CollectUtil.java @@ -182,6 +182,10 @@ public static boolean containCryPlaceholder(JsonElement jsonElement) { return CRYING_PLACEHOLDER_REGEX_PATTERN.matcher(jsonStr).find(); } + public static boolean notContainCryPlaceholder(JsonElement jsonElement) { + return !containCryPlaceholder(jsonElement); + } + /** * json parameter replacement * json 参数替换 diff --git a/common/pom.xml b/common/pom.xml index 2bed925cc40..7217120f7dd 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -107,5 +107,10 @@ org.apache.kafka kafka-clients + + + org.apache.commons + commons-collections4 + diff --git a/pom.xml b/pom.xml index 46dae7c3bcb..9d751c669c2 100644 --- a/pom.xml +++ b/pom.xml @@ -65,7 +65,7 @@ 2.1.214 3.0.0 0.13.3 - + 4.4 @@ -274,6 +274,13 @@ ${caffeine.version} + + + org.apache.commons + commons-collections4 + ${commons-collections4.version} + +