diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/http/CommonHttpClient.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/http/CommonHttpClient.java index 1c782c8ba2c..55bdf8f2c40 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/http/CommonHttpClient.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/http/CommonHttpClient.java @@ -54,37 +54,37 @@ public class CommonHttpClient { /** * 此连接池所能提供的最大连接数 */ - private final static int MAX_TOTAL_CONNECTIONS = 50000; + private static final int MAX_TOTAL_CONNECTIONS = 50000; /** * 每个路由所能分配的最大连接数 */ - private final static int MAX_PER_ROUTE_CONNECTIONS = 80; + private static final int MAX_PER_ROUTE_CONNECTIONS = 80; /** * 从连接池中获取连接的默认超时时间 4秒 */ - private final static int REQUIRE_CONNECT_TIMEOUT = 4000; + private static final int REQUIRE_CONNECT_TIMEOUT = 4000; /** * 双端建立连接超时时间 4秒 */ - private final static int CONNECT_TIMEOUT = 4000; + private static final int CONNECT_TIMEOUT = 4000; /** * socketReadTimeout 响应tcp报文的最大间隔超时时间 */ - private final static int SOCKET_TIMEOUT = 60000; + private static final int SOCKET_TIMEOUT = 60000; /** * 空闲连接免检的有效时间,被重用的空闲连接若超过此时间,需检查此连接的可用性 */ - private final static int INACTIVITY_VALIDATED_TIME = 10000; + private static final int INACTIVITY_VALIDATED_TIME = 10000; /** * ssl版本 */ - private final static String[] SUPPORTED_SSL = {"TLSv1","TLSv1.1","TLSv1.2","SSLv3"}; + private static final String[] SUPPORTED_SSL = {"TLSv1","TLSv1.1","TLSv1.2","SSLv3"}; static { try { diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/push/PushCollectImpl.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/push/PushCollectImpl.java new file mode 100644 index 00000000000..db574bd0eeb --- /dev/null +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/push/PushCollectImpl.java @@ -0,0 +1,166 @@ +package org.dromara.hertzbeat.collector.collect.push; + +import com.fasterxml.jackson.core.type.TypeReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpHost; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; +import org.dromara.hertzbeat.collector.collect.AbstractCollect; +import org.dromara.hertzbeat.collector.collect.common.http.CommonHttpClient; +import org.dromara.hertzbeat.collector.dispatch.DispatchConstants; +import org.dromara.hertzbeat.collector.util.CollectUtil; +import org.dromara.hertzbeat.common.constants.CollectorConstants; +import org.dromara.hertzbeat.common.entity.dto.Message; +import org.dromara.hertzbeat.common.entity.job.Metrics; +import org.dromara.hertzbeat.common.entity.job.protocol.PushProtocol; +import org.dromara.hertzbeat.common.entity.message.CollectRep; +import org.dromara.hertzbeat.common.entity.push.PushMetricsDto; +import org.dromara.hertzbeat.common.util.CommonUtil; +import org.dromara.hertzbeat.common.util.IpDomainUtil; +import org.dromara.hertzbeat.common.util.JsonUtil; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * push style collect + * + * + */ +@Slf4j +public class PushCollectImpl extends AbstractCollect { + + private static Map timeMap = new ConcurrentHashMap<>(); + + // ms + private static final Integer timeout = 3000; + + private static final Integer SUCCESS_CODE = 200; + + // 第一次采集多久之前的数据,其实没有办法确定,因为无法确定上次何时采集,难以避免重启后重复采集的现象,默认30s + private static final Integer firstCollectInterval = 30000; + + public PushCollectImpl() { + } + + @Override + public void collect(CollectRep.MetricsData.Builder builder, + long appId, String app, Metrics metrics) { + long curTime = System.currentTimeMillis(); + + PushProtocol pushProtocol = metrics.getPush(); + + Long time = timeMap.getOrDefault(appId, curTime - firstCollectInterval); + timeMap.put(appId, curTime); + + HttpContext httpContext = createHttpContext(pushProtocol); + HttpUriRequest request = createHttpRequest(pushProtocol, appId, time); + + try { + CloseableHttpResponse response = CommonHttpClient.getHttpClient().execute(request, httpContext); + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != SUCCESS_CODE) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("StatusCode " + statusCode); + return; + } + String resp = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + + parseResponse(builder, resp, metrics); + + } catch (Exception e) { + String errorMsg = CommonUtil.getMessageFromThrowable(e); + log.error(errorMsg, e); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg(errorMsg); + } + + } + + @Override + public String supportProtocol() { + return DispatchConstants.PROTOCOL_PUSH; + } + + private HttpContext createHttpContext(PushProtocol pushProtocol) { + HttpHost host = new HttpHost(pushProtocol.getHost(), Integer.parseInt(pushProtocol.getPort())); + HttpClientContext httpClientContext = new HttpClientContext(); + httpClientContext.setTargetHost(host); + return httpClientContext; + } + + private HttpUriRequest createHttpRequest(PushProtocol pushProtocol, Long monitorId, Long startTime) { + RequestBuilder requestBuilder = RequestBuilder.get(); + + + // uri + String uri = CollectUtil.replaceUriSpecialChar(pushProtocol.getUri()); + if (IpDomainUtil.isHasSchema(pushProtocol.getHost())) { + requestBuilder.setUri(pushProtocol.getHost() + ":" + pushProtocol.getPort() + uri); + } else { + String ipAddressType = IpDomainUtil.checkIpAddressType(pushProtocol.getHost()); + String baseUri = CollectorConstants.IPV6.equals(ipAddressType) + ? String.format("[%s]:%s", pushProtocol.getHost(), pushProtocol.getPort() + uri) + : String.format("%s:%s", pushProtocol.getHost(), pushProtocol.getPort() + uri); + + requestBuilder.setUri(CollectorConstants.HTTP_HEADER + baseUri); + } + + requestBuilder.addHeader(HttpHeaders.CONNECTION, "keep-alive"); + requestBuilder.addHeader(HttpHeaders.USER_AGENT, "Mozilla/5.0 (Windows NT 6.1; WOW64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.76 Safari/537.36"); + + requestBuilder.addParameter("id", String.valueOf(monitorId)); + requestBuilder.addParameter("time", String.valueOf(startTime)); + requestBuilder.addHeader(HttpHeaders.ACCEPT, "application/json"); + + + //requestBuilder.setUri(pushProtocol.getUri()); + + if (timeout > 0) { + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(timeout) + .setSocketTimeout(timeout) + .setRedirectsEnabled(true) + .build(); + requestBuilder.setConfig(requestConfig); + } + + return requestBuilder.build(); + } + + private void parseResponse(CollectRep.MetricsData.Builder builder, String resp, Metrics metric) { +// Map jsonMap = JsonUtil.fromJson(resp, new TypeReference>() { +// }); +// if (jsonMap == null) { +// throw new NullPointerException("parse result is null"); +// } + Message msg = JsonUtil.fromJson(resp, new TypeReference>() { + }); + if (msg == null) { + throw new NullPointerException("parse result is null"); + } + PushMetricsDto pushMetricsDto = msg.getData(); + if (pushMetricsDto == null || pushMetricsDto.getMetricsList() == null) { + throw new NullPointerException("parse result is null"); + } + for (PushMetricsDto.Metrics pushMetrics : pushMetricsDto.getMetricsList()) { + List metricColumn = new ArrayList<>(); + for (Metrics.Field field : metric.getFields()) { + metricColumn.add(pushMetrics.getMetrics().get(field.getField())); + } + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder() + .addAllColumns(metricColumn); + builder.addValues(valueRowBuilder.build()); + } + builder.setTime(System.currentTimeMillis()); + } +} diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/DispatchConstants.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/DispatchConstants.java index 57358c308b1..14020fa82e4 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/DispatchConstants.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/DispatchConstants.java @@ -76,6 +76,10 @@ public interface DispatchConstants { * protocol rocketmq */ String PROTOCOL_ROCKETMQ = "rocketmq"; + /** + * protocol push + */ + String PROTOCOL_PUSH = "push"; // Protocol type related - end // 协议类型相关 - end // diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/MetricsCollect.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/MetricsCollect.java index f03eb80a24f..087adf30152 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/MetricsCollect.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/MetricsCollect.java @@ -19,12 +19,15 @@ import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.Expression; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; import org.dromara.hertzbeat.collector.collect.AbstractCollect; import org.dromara.hertzbeat.collector.collect.strategy.CollectStrategyFactory; import org.dromara.hertzbeat.collector.dispatch.timer.Timeout; import org.dromara.hertzbeat.collector.dispatch.timer.WheelTimerTask; import org.dromara.hertzbeat.collector.dispatch.unit.UnitConvert; import org.dromara.hertzbeat.collector.util.CollectUtil; +import org.dromara.hertzbeat.common.constants.CommonConstants; import org.dromara.hertzbeat.common.entity.job.Job; import org.dromara.hertzbeat.common.entity.job.Metrics; import org.dromara.hertzbeat.common.entity.message.CollectRep; diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/timer/WheelTimerTask.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/timer/WheelTimerTask.java index 1c57ea02f50..d4e6fa1cd4f 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/timer/WheelTimerTask.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/timer/WheelTimerTask.java @@ -19,15 +19,17 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; +import lombok.extern.slf4j.Slf4j; +import org.dromara.hertzbeat.collector.dispatch.DispatchConstants; import org.dromara.hertzbeat.collector.dispatch.MetricsTaskDispatch; import org.dromara.hertzbeat.collector.util.CollectUtil; +import org.dromara.hertzbeat.common.constants.CommonConstants; import org.dromara.hertzbeat.common.entity.job.Configmap; import org.dromara.hertzbeat.common.entity.job.Job; import org.dromara.hertzbeat.common.entity.job.Metrics; +import org.dromara.hertzbeat.common.entity.job.protocol.PushProtocol; import org.dromara.hertzbeat.common.support.SpringContextHolder; import org.dromara.hertzbeat.common.util.AesUtil; -import org.dromara.hertzbeat.common.constants.CommonConstants; -import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; @@ -39,7 +41,6 @@ * TimerTask实现 * * - * */ @Slf4j public class WheelTimerTask implements TimerTask { @@ -85,6 +86,9 @@ private void initJobMetrics(Job job) { JsonElement jsonElement = GSON.toJsonTree(metric); CollectUtil.replaceSmilingPlaceholder(jsonElement, configmap); metric = GSON.fromJson(jsonElement, Metrics.class); + if (job.getApp().equals(DispatchConstants.PROTOCOL_PUSH)) { + CollectUtil.replaceFieldsForPushStyleMonitor(metric, configmap); + } metricsTmp.add(metric); } job.setMetrics(metricsTmp); diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/util/CollectUtil.java b/collector/src/main/java/org/dromara/hertzbeat/collector/util/CollectUtil.java index 6cfff5a54ce..9a8da9a2acc 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/util/CollectUtil.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/util/CollectUtil.java @@ -19,10 +19,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.gson.*; -import org.dromara.hertzbeat.common.entity.job.Configmap; +import lombok.extern.slf4j.Slf4j; import org.dromara.hertzbeat.common.constants.CommonConstants; +import org.dromara.hertzbeat.common.entity.job.Configmap; +import org.dromara.hertzbeat.common.entity.job.Metrics; import org.dromara.hertzbeat.common.util.JsonUtil; -import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.Iterator; @@ -47,10 +48,11 @@ public class CollectUtil { private static final String CRYING_PLACEHOLDER_REX = "\\^o\\^"; private static final String CRYING_PLACEHOLDER_REGEX = "(\\^o\\^)(\\w|-|$|\\.)+(\\^o\\^)"; private static final Pattern CRYING_PLACEHOLDER_REGEX_PATTERN = Pattern.compile(CRYING_PLACEHOLDER_REGEX); - private static final List UNIT_SYMBOLS = Arrays.asList("%","G", "g", "M", "m", "K", "k", "B", "b"); + private static final List UNIT_SYMBOLS = Arrays.asList("%", "G", "g", "M", "m", "K", "k", "B", "b"); /** * 关键字匹配计数 + * * @param content 内容 * @param keyword 关键字 * @return 匹配次数 @@ -137,6 +139,7 @@ public void setUnit(String unit) { /** * get timeout integer + * * @param timeout timeout str * @return timeout */ @@ -146,7 +149,8 @@ public static int getTimeout(String timeout) { /** * get timeout integer or default value - * @param timeout timeout str + * + * @param timeout timeout str * @param defaultTimeout default timeout * @return timeout */ @@ -164,8 +168,8 @@ public static int getTimeout(String timeout, int defaultTimeout) { /** * assert prom field */ - public static Boolean assertPromRequireField(String aliasField){ - if (CommonConstants.PROM_TIME.equals(aliasField) || CommonConstants.PROM_VALUE.equals(aliasField)){ + public static Boolean assertPromRequireField(String aliasField) { + if (CommonConstants.PROM_TIME.equals(aliasField) || CommonConstants.PROM_VALUE.equals(aliasField)) { return true; } return false; @@ -174,6 +178,7 @@ public static Boolean assertPromRequireField(String aliasField){ /** * is contains cryPlaceholder -_- + * * @param jsonElement json element * @return return true when contains */ @@ -294,7 +299,8 @@ public static JsonElement replaceSmilingPlaceholder(JsonElement jsonElement, Map Configmap param = configmap.get(key); if (param != null && param.getType() == CommonConstants.PARAM_TYPE_MAP) { String jsonValue = (String) param.getValue(); - TypeReference> typeReference = new TypeReference<>() {}; + TypeReference> typeReference = new TypeReference<>() { + }; Map map = JsonUtil.fromJson(jsonValue, typeReference); if (map != null) { map.forEach((name, value) -> { @@ -382,7 +388,7 @@ public static JsonElement replaceSmilingPlaceholder(JsonElement jsonElement, Map index++; } } else { - jsonArray.set(index, value == null ? JsonNull.INSTANCE : new JsonPrimitive(value)); + jsonArray.set(index, value == null ? JsonNull.INSTANCE : new JsonPrimitive(value)); } } } else { @@ -399,4 +405,12 @@ public static String replaceUriSpecialChar(String uri) { // todo more special return uri; } + + + public static void replaceFieldsForPushStyleMonitor(Metrics metrics, Map configmap) { + + List pushFieldList = JsonUtil.fromJson((String) configmap.get("fields").getValue(), new TypeReference>() { + }); + metrics.setFields(pushFieldList); + } } diff --git a/collector/src/main/resources/META-INF/services/org.dromara.hertzbeat.collector.collect.AbstractCollect b/collector/src/main/resources/META-INF/services/org.dromara.hertzbeat.collector.collect.AbstractCollect index 294550b1eab..99981b7a192 100644 --- a/collector/src/main/resources/META-INF/services/org.dromara.hertzbeat.collector.collect.AbstractCollect +++ b/collector/src/main/resources/META-INF/services/org.dromara.hertzbeat.collector.collect.AbstractCollect @@ -9,4 +9,5 @@ org.dromara.hertzbeat.collector.collect.snmp.SnmpCollectImpl org.dromara.hertzbeat.collector.collect.ssh.SshCollectImpl org.dromara.hertzbeat.collector.collect.telnet.TelnetCollectImpl org.dromara.hertzbeat.collector.collect.ftp.FtpCollectImpl -org.dromara.hertzbeat.collector.collect.mq.RocketmqSingleCollectImpl \ No newline at end of file +org.dromara.hertzbeat.collector.collect.mq.RocketmqSingleCollectImpl +org.dromara.hertzbeat.collector.collect.push.PushCollectImpl \ No newline at end of file diff --git a/collector/src/main/resources/application.yml b/collector/src/main/resources/application.yml index e73b795b55e..d572631abcc 100644 --- a/collector/src/main/resources/application.yml +++ b/collector/src/main/resources/application.yml @@ -33,7 +33,7 @@ spring: config: activate: on-profile: cluster - + collector: dispatch: entrance: @@ -46,6 +46,10 @@ collector: manager-host: ${MANAGER_HOST:}${MANAGER_IP:} manager-port: ${MANAGER_PORT:1158} +push: + uri: "127.0.0.1:1157" + + common: queue: # memory or kafka diff --git a/collector/src/test/java/org/dromara/hertzbeat/collector/util/JsonPathParserTest.java b/collector/src/test/java/org/dromara/hertzbeat/collector/util/JsonPathParserTest.java index 6c2a5038eea..6397495aa78 100644 --- a/collector/src/test/java/org/dromara/hertzbeat/collector/util/JsonPathParserTest.java +++ b/collector/src/test/java/org/dromara/hertzbeat/collector/util/JsonPathParserTest.java @@ -16,7 +16,7 @@ class JsonPathParserTest { private static final String JSON_ARRAY = "[{'name': 'tom', 'speed': '433'},{'name': 'lili', 'speed': '543'}]"; - public final static String JSON_OBJECT = + public static final String JSON_OBJECT = "{ \"store\": {\n" + " \"book\": [ \n" + " { \"category\": \"reference\",\n" + diff --git a/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Job.java b/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Job.java index 915cecb1dbd..2b756e24542 100644 --- a/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Job.java +++ b/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Job.java @@ -19,23 +19,16 @@ import com.fasterxml.jackson.annotation.JsonIgnore; -import org.dromara.hertzbeat.common.entity.manager.ParamDefine; -import org.dromara.hertzbeat.common.entity.message.CollectRep; -import org.dromara.hertzbeat.common.util.JsonUtil; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.dromara.hertzbeat.common.entity.manager.ParamDefine; +import org.dromara.hertzbeat.common.entity.message.CollectRep; +import org.dromara.hertzbeat.common.util.JsonUtil; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; /** @@ -43,7 +36,6 @@ * 采集任务详情 * * - * */ @Data @AllArgsConstructor @@ -89,7 +81,7 @@ public class Job { */ private Map help; /** - * The monitor help link + * The monitor help link */ private Map helpLink; /** @@ -165,7 +157,7 @@ public synchronized void constructPriorMetrics() { .peek(metric -> { // Determine whether to configure aliasFields If not, configure the default // 判断是否配置aliasFields 没有则配置默认 - if (metric.getAliasFields() == null || metric.getAliasFields().isEmpty()) { + if ((metric.getAliasFields() == null || metric.getAliasFields().isEmpty()) && metric.getFields() != null) { metric.setAliasFields(metric.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toList())); } // Set the default indicator group execution priority, if not filled, the default last priority @@ -248,6 +240,6 @@ public void addCollectMetricsData(CollectRep.MetricsData metricsData) { @Override public Job clone() { // deep clone 深度克隆 - return JsonUtil.fromJson(JsonUtil.toJson(this), Job.class); + return JsonUtil.fromJson(JsonUtil.toJson(this), getClass()); } } diff --git a/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Metrics.java b/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Metrics.java index 05dba78be98..36e91c46f19 100644 --- a/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Metrics.java +++ b/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Metrics.java @@ -18,13 +18,13 @@ package org.dromara.hertzbeat.common.entity.job; import com.fasterxml.jackson.annotation.JsonIgnore; -import org.dromara.hertzbeat.common.entity.message.CollectRep; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.dromara.hertzbeat.common.entity.job.protocol.*; +import org.dromara.hertzbeat.common.entity.message.CollectRep; import java.util.List; import java.util.Map; @@ -38,7 +38,6 @@ * 监控采集的指标集合详情 eg: cpu | memory | health * * - * */ @Data @AllArgsConstructor @@ -163,6 +162,10 @@ public class Metrics { * Monitoring configuration information using the public rocketmq protocol 使用公共的rocketmq协议的监控配置信息 */ private RocketmqProtocol rocketmq; + /** + * Monitoring configuration information using push style 使用push方式推送的监控配置信息 + */ + private PushProtocol push; /** * collector use - Temporarily store subTask indicator group response data @@ -187,6 +190,7 @@ public class Metrics { /** * is has subTask + * * @return true - has */ public boolean isHasSubTask() { @@ -195,6 +199,7 @@ public boolean isHasSubTask() { /** * consume subTask + * * @param metricsData response data * @return is last task? */ diff --git a/common/src/main/java/org/dromara/hertzbeat/common/entity/job/protocol/PushProtocol.java b/common/src/main/java/org/dromara/hertzbeat/common/entity/job/protocol/PushProtocol.java new file mode 100644 index 00000000000..98f55cf9606 --- /dev/null +++ b/common/src/main/java/org/dromara/hertzbeat/common/entity/job/protocol/PushProtocol.java @@ -0,0 +1,25 @@ +package org.dromara.hertzbeat.common.entity.job.protocol; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.dromara.hertzbeat.common.entity.dto.Field; + +import java.util.List; + +/** + * push protocol definition + * + * + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class PushProtocol { + private String host; + private String port; + private String uri = "/api/push"; + private List fields; +} diff --git a/common/src/main/java/org/dromara/hertzbeat/common/entity/push/PushMetrics.java b/common/src/main/java/org/dromara/hertzbeat/common/entity/push/PushMetrics.java new file mode 100644 index 00000000000..6fcec111af2 --- /dev/null +++ b/common/src/main/java/org/dromara/hertzbeat/common/entity/push/PushMetrics.java @@ -0,0 +1,33 @@ +package org.dromara.hertzbeat.common.entity.push; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.data.jpa.domain.support.AuditingEntityListener; + +import javax.persistence.*; + +/** + * push metrics entity + * + * + */ +@Entity +@Table(name = "hzb_push_metrics", indexes = { + @Index(name = "push_query_index", columnList = "monitorId"), + @Index(name = "push_query_index", columnList = "time") +}) +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +@EntityListeners(AuditingEntityListener.class) +public class PushMetrics { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + private Long monitorId; + private Long time; + private String metrics; +} diff --git a/common/src/main/java/org/dromara/hertzbeat/common/entity/push/PushMetricsDto.java b/common/src/main/java/org/dromara/hertzbeat/common/entity/push/PushMetricsDto.java new file mode 100644 index 00000000000..4381c80f474 --- /dev/null +++ b/common/src/main/java/org/dromara/hertzbeat/common/entity/push/PushMetricsDto.java @@ -0,0 +1,37 @@ +package org.dromara.hertzbeat.common.entity.push; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * push metrics dto + * + * + */ +@Data +@Builder +@AllArgsConstructor +public class PushMetricsDto { + + List metricsList; + + public PushMetricsDto() { + metricsList = new ArrayList<>(); + } + + @Data + @Builder + @AllArgsConstructor + @NoArgsConstructor + public static class Metrics { + private long monitorId; + private Long time; + private Map metrics; + } +} diff --git a/common/src/main/java/org/dromara/hertzbeat/common/util/SnowFlakeIdGenerator.java b/common/src/main/java/org/dromara/hertzbeat/common/util/SnowFlakeIdGenerator.java index 525a4d5e13b..8d8189c1871 100644 --- a/common/src/main/java/org/dromara/hertzbeat/common/util/SnowFlakeIdGenerator.java +++ b/common/src/main/java/org/dromara/hertzbeat/common/util/SnowFlakeIdGenerator.java @@ -26,7 +26,7 @@ */ public class SnowFlakeIdGenerator { - private final static SnowFlakeIdWorker ID_WORKER; + private static final SnowFlakeIdWorker ID_WORKER; static { ID_WORKER = new SnowFlakeIdWorker(); diff --git a/manager/pom.xml b/manager/pom.xml index bf0a6be8840..da198e25081 100644 --- a/manager/pom.xml +++ b/manager/pom.xml @@ -65,6 +65,11 @@ org.dromara.hertzbeat hertzbeat-remoting + + + org.dromara.hertzbeat + hertzbeat-push + org.springframework.boot diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/controller/AppController.java b/manager/src/main/java/org/dromara/hertzbeat/manager/controller/AppController.java index 191611c1057..33e0ad8c277 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/controller/AppController.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/controller/AppController.java @@ -49,7 +49,7 @@ public class AppController { private static final String[] RISKY_STR_ARR = {"ScriptEngineManager", "URLClassLoader"}; - + @Autowired private AppService appService; @@ -61,6 +61,14 @@ public ResponseEntity>> queryAppParamDefines( return ResponseEntity.ok(Message.success(paramDefines)); } + @GetMapping(path = "/{monitorId}/pushdefine") + @Operation(summary = "The definition structure of the specified monitoring type according to the push query", description = "根据monitorId查询push类型的定义结构") + public ResponseEntity> queryPushDefine( + @Parameter(description = "en: Monitoring type name,zh: 监控类型名称", example = "api") @PathVariable("monitorId") final Long monitorId) { + Job define = appService.getPushDefine(monitorId); + return ResponseEntity.ok(Message.success(define)); + } + @GetMapping(path = "/{app}/define") @Operation(summary = "The definition structure of the specified monitoring type according to the app query", description = "根据app查询指定监控类型的定义结构") public ResponseEntity> queryAppDefine( @@ -96,7 +104,7 @@ public ResponseEntity> newAppDefineYml(@Valid @RequestBody Monitor for (String riskyToken : RISKY_STR_ARR) { if (defineDto.getDefine().contains(riskyToken)) { return ResponseEntity.ok(Message.fail(FAIL_CODE, "can not has malicious remote script")); - } + } } appService.applyMonitorDefineYml(defineDto.getDefine(), false); } catch (Exception e) { diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/controller/MonitorController.java b/manager/src/main/java/org/dromara/hertzbeat/manager/controller/MonitorController.java index 3ea5ad3ea42..8b91b67fc60 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/controller/MonitorController.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/controller/MonitorController.java @@ -17,6 +17,10 @@ package org.dromara.hertzbeat.manager.controller; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.dromara.hertzbeat.common.constants.CommonConstants; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; @@ -24,6 +28,10 @@ import org.dromara.hertzbeat.common.entity.manager.Monitor; import org.dromara.hertzbeat.manager.pojo.dto.MonitorDto; import org.dromara.hertzbeat.manager.service.MonitorService; +import org.dromara.hertzbeat.common.constants.CommonConstants; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerInit.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerInit.java index 09d9b77e2d9..d0226fa4a07 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerInit.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerInit.java @@ -1,15 +1,13 @@ package org.dromara.hertzbeat.manager.scheduler; import lombok.extern.slf4j.Slf4j; +import org.dromara.hertzbeat.collector.util.CollectUtil; import org.dromara.hertzbeat.common.constants.CommonConstants; import org.dromara.hertzbeat.common.entity.dto.CollectorInfo; import org.dromara.hertzbeat.common.entity.job.Configmap; import org.dromara.hertzbeat.common.entity.job.Job; -import org.dromara.hertzbeat.common.entity.manager.Collector; -import org.dromara.hertzbeat.common.entity.manager.CollectorMonitorBind; -import org.dromara.hertzbeat.common.entity.manager.Monitor; -import org.dromara.hertzbeat.common.entity.manager.Param; -import org.dromara.hertzbeat.common.entity.manager.ParamDefine; +import org.dromara.hertzbeat.common.entity.job.Metrics; +import org.dromara.hertzbeat.common.entity.manager.*; import org.dromara.hertzbeat.manager.dao.CollectorDao; import org.dromara.hertzbeat.manager.dao.CollectorMonitorBindDao; import org.dromara.hertzbeat.manager.dao.MonitorDao; @@ -22,6 +20,7 @@ import org.springframework.core.annotation.Order; import org.springframework.util.StringUtils; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -36,30 +35,30 @@ @Order(value = Ordered.LOWEST_PRECEDENCE - 1) @Slf4j public class SchedulerInit implements CommandLineRunner { - + @Autowired private CollectorScheduling collectorScheduling; - + @Autowired private CollectJobScheduling collectJobScheduling; - + private static final String MAIN_COLLECTOR_NODE_IP = "127.0.0.1"; - + @Autowired private AppService appService; - + @Autowired private MonitorDao monitorDao; - + @Autowired private ParamDao paramDao; - + @Autowired private CollectorDao collectorDao; - + @Autowired private CollectorMonitorBindDao collectorMonitorBindDao; - + @Override public void run(String... args) throws Exception { // init pre collector status @@ -69,10 +68,9 @@ public void run(String... args) throws Exception { collectorDao.saveAll(collectors); // insert default consistent node CollectorInfo collectorInfo = CollectorInfo.builder() - .name(CommonConstants.MAIN_COLLECTOR_NODE) - .ip(MAIN_COLLECTOR_NODE_IP) - .mode(CommonConstants.MODE_PUBLIC) - .build(); + .name(CommonConstants.MAIN_COLLECTOR_NODE) + .ip(MAIN_COLLECTOR_NODE_IP) + .build(); collectorScheduling.collectorGoOnline(CommonConstants.MAIN_COLLECTOR_NODE, collectorInfo); // init jobs List monitors = monitorDao.findMonitorsByStatusNotInAndAndJobIdNotNull(Arrays.asList((byte) 0, (byte) 4)); diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/service/AppService.java b/manager/src/main/java/org/dromara/hertzbeat/manager/service/AppService.java index 981cb98031d..a687078a05e 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/service/AppService.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/service/AppService.java @@ -18,8 +18,8 @@ package org.dromara.hertzbeat.manager.service; import org.dromara.hertzbeat.common.entity.job.Job; -import org.dromara.hertzbeat.manager.pojo.dto.Hierarchy; import org.dromara.hertzbeat.common.entity.manager.ParamDefine; +import org.dromara.hertzbeat.manager.pojo.dto.Hierarchy; import java.util.List; import java.util.Map; @@ -29,7 +29,6 @@ * 监控类型管理接口 * * - * */ public interface AppService { @@ -42,6 +41,8 @@ public interface AppService { */ List getAppParamDefines(String app); + Job getPushDefine(Long monitorId); + /** * Get monitor structure definition based on monitor type name * 根据监控类型名称获取监控结构定义 @@ -55,6 +56,7 @@ public interface AppService { /** * 获取app定义的指标 + * * @param app 监控类型 * @return 指标 */ @@ -81,12 +83,14 @@ public interface AppService { /** * Get all app define + * * @return defines */ Map getAllAppDefines(); /** * app define file content str + * * @param app app * @return file content */ @@ -96,12 +100,13 @@ public interface AppService { * update and apply app define yml * * @param ymlContent yml content - * @param isModify is modified? + * @param isModify is modified? */ void applyMonitorDefineYml(String ymlContent, boolean isModify); /** * delete monitor define yml + * * @param app app */ void deleteMonitorDefine(String app); diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/service/MonitorService.java b/manager/src/main/java/org/dromara/hertzbeat/manager/service/MonitorService.java index 8344bc96dde..e11e2422f92 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/service/MonitorService.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/service/MonitorService.java @@ -29,7 +29,6 @@ import org.springframework.web.multipart.MultipartFile; import javax.servlet.http.HttpServletResponse; -import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -38,7 +37,6 @@ * 监控管理服务 * * - * */ public interface MonitorService { @@ -47,8 +45,8 @@ public interface MonitorService { * Monitoring Availability Probes * 监控可用性探测 * - * @param monitor Monitoring entity information 监控实体信息 - * @param params Parameter information 参数信息 + * @param monitor Monitoring entity information 监控实体信息 + * @param params Parameter information 参数信息 * @param collector collector pinned * @throws MonitorDetectException Probe failure throws 探测失败抛出 */ @@ -57,8 +55,8 @@ public interface MonitorService { /** * Add monitoring 新增监控 * - * @param monitor Monitoring Entity 监控实体 - * @param params Parameter information 参数信息 + * @param monitor Monitoring Entity 监控实体 + * @param params Parameter information 参数信息 * @param collector collector pinned * @throws RuntimeException Add process exception throw 新增过程异常抛出 */ @@ -78,8 +76,8 @@ public interface MonitorService { * Modify update monitoring * 修改更新监控 * - * @param monitor Monitor Entity 监控实体 - * @param params Parameter information 参数信息 + * @param monitor Monitor Entity 监控实体 + * @param params Parameter information 参数信息 * @param collector collector pinned * @throws RuntimeException Exception thrown during modification 修改过程中异常抛出 */ @@ -217,10 +215,11 @@ public interface MonitorService { * @param ids monitor id */ void copyMonitors(List ids); - - + + /** * update app collect job by app + * * @param job job content */ void updateAppCollectJob(Job job); diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/service/impl/AppServiceImpl.java b/manager/src/main/java/org/dromara/hertzbeat/manager/service/impl/AppServiceImpl.java index d37944ebfaa..22b4286d65d 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/service/impl/AppServiceImpl.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/service/impl/AppServiceImpl.java @@ -17,17 +17,22 @@ package org.dromara.hertzbeat.manager.service.impl; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.dromara.hertzbeat.collector.dispatch.DispatchConstants; +import org.dromara.hertzbeat.collector.util.CollectUtil; +import org.dromara.hertzbeat.common.entity.job.Configmap; import org.dromara.hertzbeat.common.entity.job.Job; import org.dromara.hertzbeat.common.entity.job.Metrics; import org.dromara.hertzbeat.common.entity.manager.Monitor; +import org.dromara.hertzbeat.common.entity.manager.Param; +import org.dromara.hertzbeat.common.entity.manager.ParamDefine; import org.dromara.hertzbeat.common.support.SpringContextHolder; import org.dromara.hertzbeat.common.util.CommonUtil; import org.dromara.hertzbeat.manager.dao.MonitorDao; +import org.dromara.hertzbeat.manager.dao.ParamDao; import org.dromara.hertzbeat.manager.pojo.dto.Hierarchy; -import org.dromara.hertzbeat.common.entity.manager.ParamDefine; import org.dromara.hertzbeat.manager.service.AppService; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.FileUtils; import org.dromara.hertzbeat.manager.service.MonitorService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; @@ -59,7 +64,6 @@ * 暂时将监控配置和参数配置存放内存 之后存入数据库 * * - * */ @Service @Order(value = Ordered.HIGHEST_PRECEDENCE) @@ -67,10 +71,15 @@ public class AppServiceImpl implements AppService, CommandLineRunner { private static final String JAVA_PATH_SEPARATOR = "/"; - + + private static final String PUSH_PROTOCOL_METRICS_NAME = "metrics"; + @Autowired private MonitorDao monitorDao; + @Autowired + private ParamDao paramDao; + private final Map appDefines = new ConcurrentHashMap<>(); @Override @@ -86,6 +95,37 @@ public List getAppParamDefines(String app) { } } + @Override + public Job getPushDefine(Long monitorId) throws IllegalArgumentException { +// if (!StringUtils.hasText(app)) { +// throw new IllegalArgumentException("The app can not null."); +// } +// Job appDefine = appDefines.get(app.toLowerCase()); +// if (appDefine == null) { +// throw new IllegalArgumentException("The app " + app + " not support."); +// } +// return appDefine.clone(); + Job appDefine = appDefines.get(DispatchConstants.PROTOCOL_PUSH); + if (appDefine == null) { + throw new IllegalArgumentException("The push collector not support."); + } + List metrics = appDefine.getMetrics(); + List metricsTmp = new ArrayList<>(); + for (Metrics metric : metrics) { + if (PUSH_PROTOCOL_METRICS_NAME.equals(metric.getName())) { + List params = paramDao.findParamsByMonitorId(monitorId); + List configmaps = params.stream() + .map(param -> new Configmap(param.getField(), param.getValue(), + param.getType())).collect(Collectors.toList()); + Map configmap = configmaps.stream().collect(Collectors.toMap(Configmap::getKey, item -> item, (key1, key2) -> key1)); + CollectUtil.replaceFieldsForPushStyleMonitor(metric, configmap); + metricsTmp.add(metric); + } + } + appDefine.setMetrics(metricsTmp); + return appDefine; + } + @Override public Job getAppDefine(String app) throws IllegalArgumentException { if (!StringUtils.hasText(app)) { @@ -108,7 +148,7 @@ public List getAppDefineMetricNames(String app) { } metricNames.addAll(appDefine.getMetrics().stream().map(Metrics::getName).collect(Collectors.toList())); } else { - appDefines.forEach((k,v)->{ + appDefines.forEach((k, v) -> { metricNames.addAll(v.getMetrics().stream().map(Metrics::getName).collect(Collectors.toList())); }); } @@ -150,6 +190,9 @@ public Map getI18nResources(String lang) { if (i18nMetricsName != null) { i18nMap.put("monitor.app." + job.getApp() + ".metrics." + metrics.getName(), i18nMetricsName); } + if (metrics.getFields() == null) { + continue; + } for (Metrics.Field field : metrics.getFields()) { Map fieldI18nName = field.getI18n(); String i18nMetricName = CommonUtil.getLangMappingValueFromI18nMap(lang, fieldI18nName); @@ -237,7 +280,7 @@ public String getMonitorDefineFileContent(String app) { } catch (Exception e) { log.error(e.getMessage()); } - } + } } } log.info("load {} define app yml in file: {}", app, defineAppPath); diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/service/impl/MonitorServiceImpl.java b/manager/src/main/java/org/dromara/hertzbeat/manager/service/impl/MonitorServiceImpl.java index 92893028590..8855dd84fc5 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/service/impl/MonitorServiceImpl.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/service/impl/MonitorServiceImpl.java @@ -448,6 +448,7 @@ public void validate(MonitorDto monitorDto, Boolean isModify) throws IllegalArgu + param.getValue() + " is invalid checkbox value"); } break; + case "metrics-field": case "key-value": if (JsonUtil.fromJson(param.getValue(), new TypeReference<>() { }) == null) { diff --git a/manager/src/main/resources/define/app-push.yml b/manager/src/main/resources/define/app-push.yml new file mode 100644 index 00000000000..11d8fdaa28a --- /dev/null +++ b/manager/src/main/resources/define/app-push.yml @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# 此监控类型所属类别:service-应用服务监控 db-数据库监控 custom-自定义监控 os-操作系统监控 +category: service +# 监控应用类型名称(与文件名保持一致) eg: linux windows tomcat mysql aws... +app: push +# The app api i18n name +# app api国际化名称 +name: + zh-CN: 推送方式监控 + en-US: Push Style Monitor +# app api所需输入参数定义(根据定义渲染页面UI) +# Input params define for app api(render web ui by the definition) +params: + # field-param field key + # field-变量字段标识符 + - field: host + # name-param field display i18n name + # name-参数字段显示名称 + name: + zh-CN: 推送模块Host + en-US: Push Module Host + # type-param field type(most mapping the html input type) + # type-字段类型,样式(大部分映射input标签type属性) + type: host + # required-true or false + # 是否是必输项 true-必填 false-可选 + required: true + # field-param field key + # field-变量字段标识符 + defaultValue: 127.0.0.1 + - field: port + # name-param field display i18n name + # name-参数字段显示名称 + name: + zh-CN: 端口 + en-US: Port + # type-param field type(most mapping the html input type) + # type-字段类型,样式(大部分映射input标签type属性) + type: number + # when type is number, range is required + # 当type为number时,用range表示范围 + range: '[0,65535]' + # required-true or false + # required-是否是必输项 true-必填 false-可选 + required: true + # default value + # 默认值 + defaultValue: 1157 +# - field: uri +# # name-param field display i18n name +# # name-参数字段显示名称 +# name: +# zh-CN: 相对路径 +# en-US: URI +# # type-param field type(most mapping the html input type) +# # type-字段类型,样式(大部分映射input标签type属性) +# type: text +# # when type is text, use limit to limit string length +# # 当type为text时,用limit表示字符串限制大小 +# limit: 200 +# # required-true or false +# # required-是否是必输项 true-必填 false-可选 +# required: false +# # 参数输入框提示信息 +# # param field input placeholder +# placeholder: 'Website uri path(no ip port) EG:/v2/book/bar' + # field-param field key + - field: fields + # name-param field display i18n name + # name-参数字段显示名称 + name: + zh-CN: 监控数据字段 + en-US: Metrics fields + # type-param field type(most mapping the html input type) + # type-字段类型,样式(大部分映射input标签type属性) + type: metrics-field + # required-true or false + # required-是否是必输项 true-必填 false-可选 + required: true + # field-param field key +# collect metrics config list +# 采集指标组配置列表 +metrics: + # metrics - all + # 监控指标组 - all + - name: metrics + # metrics group scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel + # priority 0's metrics group is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue + # 指标组调度优先级(0->127)->(优先级高->低) 优先级低的指标组会等优先级高的指标组采集完成后才会被调度, 相同优先级的指标组会并行调度采集 + # 优先级为0的指标组为可用性指标组,即它会被首先调度,采集成功才会继续调度其它指标组,采集失败则中断调度 + priority: 0 + # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk + protocol: push + # the config content when protocol is http + push: + # http host: ipv4 ipv6 domain + host: ^_^host^_^ + # http port + port: ^_^port^_^ + # http uri + uri: /api/push diff --git a/manager/src/main/resources/sureness.yml b/manager/src/main/resources/sureness.yml index 6f653fe4dfb..ddcbc8e1426 100644 --- a/manager/src/main/resources/sureness.yml +++ b/manager/src/main/resources/sureness.yml @@ -61,6 +61,7 @@ excludedResource: - /api/metrics===get - /api/apps/hierarchy===get - /actuator/**===get + - /api/push/**===* # web ui 前端静态资源 - /===get - /dashboard/**===get diff --git a/pom.xml b/pom.xml index dc30ea3dfac..5037c7b4f3e 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,7 @@ collector warehouse remoting + push @@ -101,6 +102,12 @@ hertzbeat-remoting ${hertzbeat.version} + + + org.dromara.hertzbeat + hertzbeat-push + ${hertzbeat.version} + diff --git a/push/pom.xml b/push/pom.xml new file mode 100644 index 00000000000..9d256a22ebc --- /dev/null +++ b/push/pom.xml @@ -0,0 +1,60 @@ + + + + hertzbeat + org.dromara.hertzbeat + 1.0 + + 4.0.0 + + hertzbeat-push + ${project.artifactId} + + 3.2.0 + 3.3.0 + + + + + + org.dromara.hertzbeat + hertzbeat-common + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-autoconfigure + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + org.springdoc + springdoc-openapi-ui + + + + com.h2database + h2 + runtime + + + + + + + \ No newline at end of file diff --git a/push/src/main/java/org/dromara/hertzbeat/push/config/PushAutoConfiguration.java b/push/src/main/java/org/dromara/hertzbeat/push/config/PushAutoConfiguration.java new file mode 100644 index 00000000000..d46178cfc0d --- /dev/null +++ b/push/src/main/java/org/dromara/hertzbeat/push/config/PushAutoConfiguration.java @@ -0,0 +1,12 @@ +package org.dromara.hertzbeat.push.config; + +import org.springframework.context.annotation.ComponentScan; + +/** + * push configuration + * + * + */ +@ComponentScan(basePackages = "org.dromara.hertzbeat.push") +public class PushAutoConfiguration { +} diff --git a/push/src/main/java/org/dromara/hertzbeat/push/controller/PushController.java b/push/src/main/java/org/dromara/hertzbeat/push/controller/PushController.java new file mode 100644 index 00000000000..09c8f53a113 --- /dev/null +++ b/push/src/main/java/org/dromara/hertzbeat/push/controller/PushController.java @@ -0,0 +1,43 @@ +package org.dromara.hertzbeat.push.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.dromara.hertzbeat.common.entity.dto.Message; +import org.dromara.hertzbeat.common.entity.push.PushMetricsDto; +import org.dromara.hertzbeat.push.service.PushService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; + +/** + * push controller + * + * + */ +@Tag(name = "Metrics Push API | 监控数据推送API") +@RestController +@RequestMapping(value = "/api/push", produces = {APPLICATION_JSON_VALUE}) +public class PushController { + + @Autowired + private PushService pushService; + + @PostMapping + @Operation(summary = "Push metric data to hertzbeat", description = "推送监控数据到hertzbeat") + public ResponseEntity> pushMetrics(@RequestBody PushMetricsDto pushMetricsDto) { + pushService.pushMetricsData(pushMetricsDto); + return ResponseEntity.ok(Message.success("Push success")); + } + + @GetMapping() + @Operation(summary = "Get metric data for hertzbeat", description = "获取监控数据") + public ResponseEntity> getMetrics( + @Parameter(description = "监控id", example = "6565463543") @RequestParam("id") final Long id, + @Parameter(description = "上一次拉取的时间", example = "6565463543") @RequestParam("time") final Long time) { + PushMetricsDto pushMetricsDto = pushService.getPushMetricData(id, time); + return ResponseEntity.ok(Message.success(pushMetricsDto)); + } +} diff --git a/push/src/main/java/org/dromara/hertzbeat/push/dao/PushMetricsDao.java b/push/src/main/java/org/dromara/hertzbeat/push/dao/PushMetricsDao.java new file mode 100644 index 00000000000..8cd9cfb14fe --- /dev/null +++ b/push/src/main/java/org/dromara/hertzbeat/push/dao/PushMetricsDao.java @@ -0,0 +1,18 @@ +package org.dromara.hertzbeat.push.dao; + +import org.dromara.hertzbeat.common.entity.push.PushMetrics; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +/** + * push metrics dao + * + * + */ +public interface PushMetricsDao extends JpaRepository { + PushMetrics findFirstByMonitorIdOrderByTimeDesc(Long monitorId); + @Transactional + void deleteAllByTimeBefore(Long time); +} diff --git a/push/src/main/java/org/dromara/hertzbeat/push/dao/PushMonitorDao.java b/push/src/main/java/org/dromara/hertzbeat/push/dao/PushMonitorDao.java new file mode 100644 index 00000000000..d1d7466ecc2 --- /dev/null +++ b/push/src/main/java/org/dromara/hertzbeat/push/dao/PushMonitorDao.java @@ -0,0 +1,12 @@ +package org.dromara.hertzbeat.push.dao; + +import org.dromara.hertzbeat.common.entity.manager.Monitor; +import org.springframework.data.jpa.repository.JpaRepository; + +/** + * push monitor dao + * + * + */ +public interface PushMonitorDao extends JpaRepository { +} diff --git a/push/src/main/java/org/dromara/hertzbeat/push/service/PushService.java b/push/src/main/java/org/dromara/hertzbeat/push/service/PushService.java new file mode 100644 index 00000000000..3654333e17a --- /dev/null +++ b/push/src/main/java/org/dromara/hertzbeat/push/service/PushService.java @@ -0,0 +1,16 @@ +package org.dromara.hertzbeat.push.service; + +import org.dromara.hertzbeat.common.entity.push.PushMetricsDto; +import org.springframework.stereotype.Service; + +/** + * push metrics + * + * + */ +@Service +public interface PushService { + void pushMetricsData(PushMetricsDto pushMetricsData); + + PushMetricsDto getPushMetricData(final Long monitorId, final Long time); +} diff --git a/push/src/main/java/org/dromara/hertzbeat/push/service/impl/PushServiceImpl.java b/push/src/main/java/org/dromara/hertzbeat/push/service/impl/PushServiceImpl.java new file mode 100644 index 00000000000..b404cfdc518 --- /dev/null +++ b/push/src/main/java/org/dromara/hertzbeat/push/service/impl/PushServiceImpl.java @@ -0,0 +1,125 @@ +package org.dromara.hertzbeat.push.service.impl; + + +import com.fasterxml.jackson.core.type.TypeReference; +import lombok.extern.slf4j.Slf4j; +import org.dromara.hertzbeat.common.entity.manager.Monitor; +import org.dromara.hertzbeat.common.entity.push.PushMetrics; +import org.dromara.hertzbeat.common.entity.push.PushMetricsDto; +import org.dromara.hertzbeat.common.util.JsonUtil; +import org.dromara.hertzbeat.push.dao.PushMetricsDao; +import org.dromara.hertzbeat.push.dao.PushMonitorDao; +import org.dromara.hertzbeat.push.service.PushService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.*; + +/** + * push service impl + * + * + */ +@Slf4j +@Service +public class PushServiceImpl implements PushService { + + @Autowired + private PushMonitorDao monitorDao; + + @Autowired + private PushMetricsDao metricsDao; + + private final Map monitorIdCache; // key: monitorId, value: time stamp of last query + + private static final long cacheTimeout = 5000; // ms + + private final Map lastPushMetrics; + + private static final long deleteMetricsPeriod = 1000 * 60 * 60 * 12; + + private static final long deleteBeforeTime = deleteMetricsPeriod / 2; + + PushServiceImpl(){ + monitorIdCache = new HashMap<>(); + lastPushMetrics = new HashMap<>(); + + new Timer().schedule(new TimerTask() { + @Override + public void run() { + try{ + deletePeriodically(); + }catch (Exception e) { + log.error("periodical deletion failed. {}", e.getMessage()); + } + } + }, 1000, deleteMetricsPeriod); + } + + public void deletePeriodically(){ + metricsDao.deleteAllByTimeBefore(System.currentTimeMillis() - deleteBeforeTime); + } + + @Override + public void pushMetricsData(PushMetricsDto pushMetricsDto) throws RuntimeException { + List pushMetricsList = new ArrayList<>(); + long curTime = System.currentTimeMillis(); + for (PushMetricsDto.Metrics metrics : pushMetricsDto.getMetricsList()) { + long monitorId = metrics.getMonitorId(); + metrics.setTime(curTime); + + if (!monitorIdCache.containsKey(monitorId) && (monitorIdCache.containsKey(monitorId) && curTime > monitorIdCache.get(monitorId) + cacheTimeout)) { + Optional queryOption = monitorDao.findById(monitorId); + if (queryOption.isEmpty()) { + monitorIdCache.remove(monitorId); + continue; + } + monitorIdCache.put(monitorId, curTime); + } + + PushMetrics pushMetrics = PushMetrics.builder() + .monitorId(metrics.getMonitorId()) + .time(curTime) + .metrics(JsonUtil.toJson(metrics.getMetrics())).build(); + lastPushMetrics.put(monitorId, metrics); + pushMetricsList.add(pushMetrics); + } + + metricsDao.saveAll(pushMetricsList); + + } + + @Override + public PushMetricsDto getPushMetricData(final Long monitorId, final Long time) { + PushMetricsDto.Metrics metrics; + PushMetricsDto pushMetricsDto = new PushMetricsDto(); + if (lastPushMetrics.containsKey(monitorId)) { + metrics = lastPushMetrics.get(monitorId); + } + else { + try { + PushMetrics pushMetrics = metricsDao.findFirstByMonitorIdOrderByTimeDesc(monitorId); + if (pushMetrics == null || pushMetrics.getMetrics() == null) { + return pushMetricsDto; + } + Map jsonMap = JsonUtil.fromJson(pushMetrics.getMetrics(), new TypeReference>() { + }); + metrics = PushMetricsDto.Metrics.builder().monitorId(monitorId).metrics(jsonMap).time(pushMetrics.getTime()).build(); + lastPushMetrics.put(monitorId, metrics); + } + catch (Exception e) { + log.error("no metrics found, monitor id: {}, {}, {}", monitorId, e.getMessage(), e); + return pushMetricsDto; + } + } + if (time > metrics.getTime()) { + return pushMetricsDto; + } + pushMetricsDto.getMetricsList().add(metrics); + // 目前先不删除 + // metricsDao.deleteAllById(toBeDelMetricsId); + return pushMetricsDto; + } + +} diff --git a/push/src/main/resources/META-INF/spring.factories b/push/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000000..b4ea0b6db63 --- /dev/null +++ b/push/src/main/resources/META-INF/spring.factories @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +org.dromara.hertzbeat.push.config.PushAutoConfiguration \ No newline at end of file diff --git a/remoting/pom.xml b/remoting/pom.xml index 8ca8016affc..60e932c9a4a 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -11,6 +11,10 @@ hertzbeat-remoting ${project.artifactId} + + 3.2.0 + 3.3.0 + diff --git a/web-app/src/app/routes/monitor/monitor-detail/monitor-detail.component.ts b/web-app/src/app/routes/monitor/monitor-detail/monitor-detail.component.ts index 4a1dd3189ff..28999c1741f 100644 --- a/web-app/src/app/routes/monitor/monitor-detail/monitor-detail.component.ts +++ b/web-app/src/app/routes/monitor/monitor-detail/monitor-detail.component.ts @@ -26,6 +26,7 @@ export class MonitorDetailComponent implements OnInit, OnDestroy { private cdr: ChangeDetectorRef, @Inject(ALAIN_I18N_TOKEN) private i18nSvc: I18NService ) {} + isSpinning: boolean = false; monitorId!: number; app!: string; @@ -57,7 +58,11 @@ export class MonitorDetailComponent implements OnInit, OnDestroy { switchMap((message: Message) => { if (message.code == 0) { // 查询过滤出此监控下可计算聚合的数字指标 - return this.appDefineSvc.getAppDefine(this.app); + if (this.app == 'push') { + return this.appDefineSvc.getPushDefine(this.monitorId); + } else { + return this.appDefineSvc.getAppDefine(this.app); + } } else { // 不提供历史图表服务 return throwError(message.msg); diff --git a/web-app/src/app/routes/monitor/monitor-edit/monitor-edit.component.html b/web-app/src/app/routes/monitor/monitor-edit/monitor-edit.component.html index ca01f2f64a6..a225df53066 100644 --- a/web-app/src/app/routes/monitor/monitor-edit/monitor-edit.component.html +++ b/web-app/src/app/routes/monitor/monitor-edit/monitor-edit.component.html @@ -170,6 +170,23 @@ valueAlias="Header Value" > + + {{ paramDefine.name }} + + + + @@ -349,7 +366,14 @@ {{ 'monitor.intervals' | i18n }} - + diff --git a/web-app/src/app/routes/monitor/monitor-new/monitor-new.component.html b/web-app/src/app/routes/monitor/monitor-new/monitor-new.component.html index 988ecfe779f..90098636b3b 100644 --- a/web-app/src/app/routes/monitor/monitor-new/monitor-new.component.html +++ b/web-app/src/app/routes/monitor/monitor-new/monitor-new.component.html @@ -179,6 +179,23 @@ [valueAlias]="paramDefine.valueAlias ? paramDefine.valueAlias : 'Value'" > + + {{ paramDefine.name }} + + + + @@ -358,7 +375,14 @@ {{ 'monitor.intervals' | i18n }} - + diff --git a/web-app/src/app/service/app-define.service.ts b/web-app/src/app/service/app-define.service.ts index 3b67c34d779..c1475e189dd 100644 --- a/web-app/src/app/service/app-define.service.ts +++ b/web-app/src/app/service/app-define.service.ts @@ -21,6 +21,13 @@ export class AppDefineService { return this.http.get>(paramDefineUri); } + public getPushDefine(monitorId: number | undefined | null): Observable> { + if (monitorId === null || monitorId === undefined) { + console.log('getPushDefine monitorIdcan not null'); + } + return this.http.get>(`/apps/${monitorId}/pushdefine`); + } + public getAppDefine(app: string | undefined | null): Observable> { if (app === null || app === undefined) { console.log('getAppDefine app can not null'); diff --git a/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.html b/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.html new file mode 100644 index 00000000000..1c85125ca83 --- /dev/null +++ b/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.html @@ -0,0 +1,15 @@ +
+
+ +
+
+ +
+
+ +
+
+ + +
+
diff --git a/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.less b/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.less new file mode 100644 index 00000000000..6d29b59afb4 --- /dev/null +++ b/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.less @@ -0,0 +1,12 @@ +.dynamic-button { + position: relative; + top: 20%; + margin-left: 12%; + font-size: 15px; + cursor: pointer; + transition: all 0.3s; +} + +.dynamic-button:hover { + font-size: 26px; +} diff --git a/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.spec.ts b/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.spec.ts new file mode 100644 index 00000000000..b77b8b7005c --- /dev/null +++ b/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.spec.ts @@ -0,0 +1,24 @@ +import { ComponentFixture, TestBed } from '@angular/core/testing'; + +import { MetricsFieldInputComponent } from './metrics-field-input.component'; + +describe('MetricsFieldInputComponent', () => { + let component: MetricsFieldInputComponent; + let fixture: ComponentFixture; + + beforeEach(async () => { + await TestBed.configureTestingModule({ + declarations: [MetricsFieldInputComponent] + }).compileComponents(); + }); + + beforeEach(() => { + fixture = TestBed.createComponent(MetricsFieldInputComponent); + component = fixture.componentInstance; + fixture.detectChanges(); + }); + + it('should create', () => { + expect(component).toBeTruthy(); + }); +}); diff --git a/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.ts b/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.ts new file mode 100644 index 00000000000..6816a087faa --- /dev/null +++ b/web-app/src/app/shared/components/metrics-field-input/metrics-field-input.component.ts @@ -0,0 +1,68 @@ +import { Component, EventEmitter, Input, OnInit, Output } from '@angular/core'; + +@Component({ + selector: 'app-metrics-field-input', + templateUrl: './metrics-field-input.component.html', + styleUrls: ['./metrics-field-input.component.less'] +}) +export class MetricsFieldInputComponent implements OnInit { + constructor() {} + + @Input() value!: any; + @Output() readonly valueChange = new EventEmitter(); + + @Input() + FieldAlias: string = 'field'; + @Input() + UnitAlias: string = 'unit'; + @Input() + TypeAlias: string = 'type'; + + fields: any[] = []; + + ngOnInit(): void { + if (this.value == undefined) { + this.fields.push({ + field: '', + unit: '', + type: '' + }); + } else { + this.value = JSON.parse(this.value); + } + if (this.value) { + for (let item of this.value) { + this.fields.push({ + field: item.field, + unit: item.unit, + type: item.type + }); + } + } + } + + addNew(e?: MouseEvent) { + if (e) { + e.preventDefault(); + } + this.fields.push({ + field: '', + unit: '', + type: '' + }); + } + + removeCurrent(index: number, e?: MouseEvent) { + if (e) { + e.preventDefault(); + } + if (this.fields.length > 1) { + this.fields.splice(index, 1); + } + } + + onChange() { + this.value = this.fields; + this.valueChange.emit(JSON.stringify(this.value)); + } +} diff --git a/web-app/src/app/shared/shared.module.ts b/web-app/src/app/shared/shared.module.ts index d8ed1288899..b2fd2e6c765 100644 --- a/web-app/src/app/shared/shared.module.ts +++ b/web-app/src/app/shared/shared.module.ts @@ -10,6 +10,7 @@ import { NzTagModule } from 'ng-zorro-antd/tag'; import { HelpMassageShowComponent } from './components/help-massage-show/help-massage-show.component'; import { KeyValueInputComponent } from './components/key-value-input/key-value-input.component'; +import { MetricsFieldInputComponent } from './components/metrics-field-input/metrics-field-input.component'; import { I18nElsePipe } from './pipe/i18n-else.pipe'; import { TimezonePipe } from './pipe/timezone.pipe'; import { SHARED_DELON_MODULES } from './shared-delon.module'; @@ -23,7 +24,7 @@ const ThirdModules: Array> = []; // #region your components & directives -const COMPONENTS: Array> = [KeyValueInputComponent, HelpMassageShowComponent]; +const COMPONENTS: Array> = [KeyValueInputComponent, HelpMassageShowComponent, MetricsFieldInputComponent]; const DIRECTIVES: Array> = [TimezonePipe, I18nElsePipe]; // #endregion