Skip to content

Commit

Permalink
[collector] support trigger and grading multiple subtasks through -_-…
Browse files Browse the repository at this point in the history
… placeholder expression (#418)

  [collector] support more powerful replace placeholder ^_^ and -_-

  [collector] support trigger and grading multiple subtasks through -_- placeholder expression
  • Loading branch information
tomsun28 committed Nov 6, 2022
1 parent 17807d8 commit d4cb2c3
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Indicator group collection task and response data scheduler
Expand Down Expand Up @@ -184,7 +185,17 @@ public void dispatchMetricsTask(Timeout timeout) {
public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) {
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
Job job = timerJob.getJob();
metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName());
if (metrics.isHasSubTask()) {
metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName() + "-sub-" + metrics.getSubTaskId());
boolean isLastTask = metrics.consumeSubTaskResponse(metricsData);
if (isLastTask) {
metricsData = metrics.getSubTaskDataTmp();
} else {
return;
}
} else {
metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName());
}
Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
if (job.isCyclic()) {
// If it is an asynchronous periodic cyclic task, directly send the collected data of the indicator group to the message middleware
Expand Down Expand Up @@ -223,17 +234,29 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
// The execution of the current level indicator group is completed, and the execution of the next level indicator group starts
// 当前级别指标组执行完成,开始执行下一级别的指标组
// use pre collect metrics data to replace next metrics config params
Map<String, Configmap> configmap = getConfigmapFromPreCollectData(metricsData);
List<Map<String, Configmap>> configmapList = getConfigmapFromPreCollectData(metricsData);
metricsSet.forEach(metricItem -> {
if (configmap != null && !configmap.isEmpty()) {
JsonElement jsonElement = GSON.toJsonTree(metricItem);
CollectUtil.replaceCryPlaceholder(jsonElement, configmap);
metricItem = GSON.fromJson(jsonElement, Metrics.class);
JsonElement jsonElement = GSON.toJsonTree(metricItem);
if (configmapList != null && !configmapList.isEmpty() && CollectUtil.containCryPlaceholder(jsonElement)) {
AtomicInteger subTaskNum = new AtomicInteger(configmapList.size());
for (int index = 0; index < configmapList.size(); index ++) {
Map<String, Configmap> configmap = configmapList.get(index);
jsonElement = GSON.toJsonTree(metricItem);
CollectUtil.replaceCryPlaceholder(jsonElement, configmap);
metricItem = GSON.fromJson(jsonElement, Metrics.class);
metricItem.setSubTaskNum(subTaskNum);
metricItem.setSubTaskId(index);
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName() + "-sub-" + index,
new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
}
} else {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(),
new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
}
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(),
new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
});
} else {
// The list of indicator groups at the current execution level has not been fully executed.
Expand Down Expand Up @@ -279,23 +302,26 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
}
}

private Map<String, Configmap> getConfigmapFromPreCollectData(CollectRep.MetricsData metricsData) {
private List<Map<String, Configmap>> getConfigmapFromPreCollectData(CollectRep.MetricsData metricsData) {
if (metricsData.getValuesCount() <= 0 || metricsData.getFieldsCount() <= 0) {
return null;
}
CollectRep.ValueRow valueRow = metricsData.getValues(0);
if (valueRow.getColumnsCount() != metricsData.getFieldsCount()) {
return null;
}
Map<String, Configmap> configmapMap = new HashMap<>(valueRow.getColumnsCount());
int index = 0;
for (CollectRep.Field field : metricsData.getFieldsList()) {
String value = valueRow.getColumns(index);
index++;
Configmap configmap = new Configmap(field.getName(), value, Integer.valueOf(field.getType()).byteValue());
configmapMap.put(field.getName(), configmap);
List<Map<String, Configmap>> mapList = new LinkedList<>();
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
if (valueRow.getColumnsCount() != metricsData.getFieldsCount()) {
continue;
}
Map<String, Configmap> configmapMap = new HashMap<>(valueRow.getColumnsCount());
int index = 0;
for (CollectRep.Field field : metricsData.getFieldsList()) {
String value = valueRow.getColumns(index);
index++;
Configmap configmap = new Configmap(field.getName(), value, Integer.valueOf(field.getType()).byteValue());
configmapMap.put(field.getName(), configmap);
}
mapList.add(configmapMap);
}
return configmapMap;
return mapList;
}

@Data
Expand Down
18 changes: 14 additions & 4 deletions collector/src/main/java/com/usthe/collector/util/CollectUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ public static Boolean assertPromRequireField(String aliasField){
}


/**
* is contains cryPlaceholder -_-
* @param jsonElement json element
* @return return true when contains
*/
public static boolean containCryPlaceholder(JsonElement jsonElement) {
String jsonStr = jsonElement.toString();
return CRYING_PLACEHOLDER_REGEX_PATTERN.matcher(jsonStr).find();
}

/**
* json parameter replacement
* json 参数替换
Expand All @@ -197,7 +207,7 @@ public static JsonElement replaceCryPlaceholder(JsonElement jsonElement, Map<Str
// Check if there are special characters Replace
String value = element.getAsString();
Matcher cryingMatcher = CRYING_PLACEHOLDER_REGEX_PATTERN.matcher(value);
if (cryingMatcher.matches()) {
if (cryingMatcher.find()) {
cryingMatcher.reset();
while (cryingMatcher.find()) {
String group = cryingMatcher.group();
Expand Down Expand Up @@ -232,7 +242,7 @@ public static JsonElement replaceCryPlaceholder(JsonElement jsonElement, Map<Str
// Check if there are special characters Replace
String value = element.getAsString();
Matcher cryingMatcher = CRYING_PLACEHOLDER_REGEX_PATTERN.matcher(value);
if (cryingMatcher.matches()) {
if (cryingMatcher.find()) {
cryingMatcher.reset();
while (cryingMatcher.find()) {
String group = cryingMatcher.group();
Expand Down Expand Up @@ -305,7 +315,7 @@ public static JsonElement replaceSmilingPlaceholder(JsonElement jsonElement, Map
// 判断是否含有特殊字符 替换
String value = element.getAsString();
Matcher smilingMatcher = SMILING_PLACEHOLDER_REGEX_PATTERN.matcher(value);
if (smilingMatcher.matches()) {
if (smilingMatcher.find()) {
smilingMatcher.reset();
while (smilingMatcher.find()) {
String group = smilingMatcher.group();
Expand Down Expand Up @@ -341,7 +351,7 @@ public static JsonElement replaceSmilingPlaceholder(JsonElement jsonElement, Map
// 判断是否含有特殊字符 替换
String value = element.getAsString();
Matcher smilingMatcher = SMILING_PLACEHOLDER_REGEX_PATTERN.matcher(value);
if (smilingMatcher.matches()) {
if (smilingMatcher.find()) {
smilingMatcher.reset();
while (smilingMatcher.find()) {
String group = smilingMatcher.group();
Expand Down
64 changes: 64 additions & 0 deletions common/src/main/java/com/usthe/common/entity/job/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package com.usthe.common.entity.job;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.usthe.common.entity.job.protocol.*;
import com.usthe.common.entity.message.CollectRep;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Details of the collection of indicators collected by monitoring
Expand All @@ -38,6 +42,7 @@
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Slf4j
public class Metrics {

/**
Expand Down Expand Up @@ -133,6 +138,65 @@ public class Metrics {
*/
private SnmpProtocol snmp;

/**
* collector use - Temporarily store subTask indicator group response data
* collector使用 - 临时存储分级任务指标响应数据
*/
@JsonIgnore
private transient CollectRep.MetricsData subTaskDataTmp;

/**
* collector use - Temporarily store subTask running num
* collector使用 - 分级任务正在运行中的数量
*/
@JsonIgnore
private transient AtomicInteger subTaskNum;

/**
* collector use - Temporarily store subTask id
* collector使用 - 分级任务ID
*/
@JsonIgnore
private transient Integer subTaskId;

/**
* is has subTask
* @return true - has
*/
public boolean isHasSubTask() {
return subTaskNum != null;
}

/**
* consume subTask
* @param metricsData response data
* @return is last task?
*/
public boolean consumeSubTaskResponse(CollectRep.MetricsData metricsData) {
if (subTaskNum == null) {
return true;
}
synchronized (subTaskNum) {
int index = subTaskNum.decrementAndGet();
if (subTaskDataTmp == null) {
subTaskDataTmp = metricsData;
} else {
if (metricsData.getValuesCount() > 1) {
CollectRep.MetricsData.Builder dataBuilder = CollectRep.MetricsData.newBuilder(subTaskDataTmp);
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
if (valueRow.getColumnsCount() == dataBuilder.getFieldsCount()) {
dataBuilder.addValues(valueRow);
} else {
log.error("consume subTask data value not mapping filed");
}
}
subTaskDataTmp = dataBuilder.build();
}
}
return index == 0;
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down

0 comments on commit d4cb2c3

Please sign in to comment.