From d9ebdda306ebc9f8390a06269f8982d56d4d7cf3 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Sun, 6 Nov 2022 22:51:26 +0800 Subject: [PATCH] [collector] support trigger and grading multiple subtasks through -_- placeholder expression (#418) [collector] support more powerful replace placeholder ^_^ and -_- [collector] support trigger and grading multiple subtasks through -_- placeholder expression --- .../collector/dispatch/CommonDispatcher.java | 72 +++++++++++++------ .../com/usthe/collector/util/CollectUtil.java | 18 +++-- .../com/usthe/common/entity/job/Metrics.java | 64 +++++++++++++++++ 3 files changed, 127 insertions(+), 27 deletions(-) diff --git a/collector/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java b/collector/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java index 56c079b2a4f..f636043ca46 100644 --- a/collector/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java +++ b/collector/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java @@ -40,6 +40,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * Indicator group collection task and response data scheduler @@ -184,7 +185,17 @@ public void dispatchMetricsTask(Timeout timeout) { public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) { WheelTimerTask timerJob = (WheelTimerTask) timeout.task(); Job job = timerJob.getJob(); - metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName()); + if (metrics.isHasSubTask()) { + metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName() + "-sub-" + metrics.getSubTaskId()); + boolean isLastTask = metrics.consumeSubTaskResponse(metricsData); + if (isLastTask) { + metricsData = metrics.getSubTaskDataTmp(); + } else { + return; + } + } else { + metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName()); + } Set metricsSet = job.getNextCollectMetrics(metrics, false); if (job.isCyclic()) { // If it is an asynchronous periodic cyclic task, directly send the collected data of the indicator group to the message middleware @@ -223,17 +234,29 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met // The execution of the current level indicator group is completed, and the execution of the next level indicator group starts // 当前级别指标组执行完成,开始执行下一级别的指标组 // use pre collect metrics data to replace next metrics config params - Map configmap = getConfigmapFromPreCollectData(metricsData); + List> configmapList = getConfigmapFromPreCollectData(metricsData); metricsSet.forEach(metricItem -> { - if (configmap != null && !configmap.isEmpty()) { - JsonElement jsonElement = GSON.toJsonTree(metricItem); - CollectUtil.replaceCryPlaceholder(jsonElement, configmap); - metricItem = GSON.fromJson(jsonElement, Metrics.class); + JsonElement jsonElement = GSON.toJsonTree(metricItem); + if (configmapList != null && !configmapList.isEmpty() && CollectUtil.containCryPlaceholder(jsonElement)) { + AtomicInteger subTaskNum = new AtomicInteger(configmapList.size()); + for (int index = 0; index < configmapList.size(); index ++) { + Map configmap = configmapList.get(index); + jsonElement = GSON.toJsonTree(metricItem); + CollectUtil.replaceCryPlaceholder(jsonElement, configmap); + metricItem = GSON.fromJson(jsonElement, Metrics.class); + metricItem.setSubTaskNum(subTaskNum); + metricItem.setSubTaskId(index); + MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList); + jobRequestQueue.addJob(metricsCollect); + metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName() + "-sub-" + index, + new MetricsTime(System.currentTimeMillis(), metricItem, timeout)); + } + } else { + MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList); + jobRequestQueue.addJob(metricsCollect); + metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(), + new MetricsTime(System.currentTimeMillis(), metricItem, timeout)); } - MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList); - jobRequestQueue.addJob(metricsCollect); - metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(), - new MetricsTime(System.currentTimeMillis(), metricItem, timeout)); }); } else { // The list of indicator groups at the current execution level has not been fully executed. @@ -279,23 +302,26 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met } } - private Map getConfigmapFromPreCollectData(CollectRep.MetricsData metricsData) { + private List> getConfigmapFromPreCollectData(CollectRep.MetricsData metricsData) { if (metricsData.getValuesCount() <= 0 || metricsData.getFieldsCount() <= 0) { return null; } - CollectRep.ValueRow valueRow = metricsData.getValues(0); - if (valueRow.getColumnsCount() != metricsData.getFieldsCount()) { - return null; - } - Map configmapMap = new HashMap<>(valueRow.getColumnsCount()); - int index = 0; - for (CollectRep.Field field : metricsData.getFieldsList()) { - String value = valueRow.getColumns(index); - index++; - Configmap configmap = new Configmap(field.getName(), value, Integer.valueOf(field.getType()).byteValue()); - configmapMap.put(field.getName(), configmap); + List> mapList = new LinkedList<>(); + for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { + if (valueRow.getColumnsCount() != metricsData.getFieldsCount()) { + continue; + } + Map configmapMap = new HashMap<>(valueRow.getColumnsCount()); + int index = 0; + for (CollectRep.Field field : metricsData.getFieldsList()) { + String value = valueRow.getColumns(index); + index++; + Configmap configmap = new Configmap(field.getName(), value, Integer.valueOf(field.getType()).byteValue()); + configmapMap.put(field.getName(), configmap); + } + mapList.add(configmapMap); } - return configmapMap; + return mapList; } @Data diff --git a/collector/src/main/java/com/usthe/collector/util/CollectUtil.java b/collector/src/main/java/com/usthe/collector/util/CollectUtil.java index 3bdc9483140..e852c29b59b 100644 --- a/collector/src/main/java/com/usthe/collector/util/CollectUtil.java +++ b/collector/src/main/java/com/usthe/collector/util/CollectUtil.java @@ -177,6 +177,16 @@ public static Boolean assertPromRequireField(String aliasField){ } + /** + * is contains cryPlaceholder -_- + * @param jsonElement json element + * @return return true when contains + */ + public static boolean containCryPlaceholder(JsonElement jsonElement) { + String jsonStr = jsonElement.toString(); + return CRYING_PLACEHOLDER_REGEX_PATTERN.matcher(jsonStr).find(); + } + /** * json parameter replacement * json 参数替换 @@ -197,7 +207,7 @@ public static JsonElement replaceCryPlaceholder(JsonElement jsonElement, Map 1) { + CollectRep.MetricsData.Builder dataBuilder = CollectRep.MetricsData.newBuilder(subTaskDataTmp); + for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { + if (valueRow.getColumnsCount() == dataBuilder.getFieldsCount()) { + dataBuilder.addValues(valueRow); + } else { + log.error("consume subTask data value not mapping filed"); + } + } + subTaskDataTmp = dataBuilder.build(); + } + } + return index == 0; + } + } + @Override public boolean equals(Object o) { if (this == o) {