Skip to content

Commit

Permalink
[ospp] add push style collector (#1222)
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-934 authored and tomsun28 committed Jan 16, 2024
1 parent 2dd304c commit 624e5d5
Show file tree
Hide file tree
Showing 44 changed files with 1,049 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,37 +54,37 @@ public class CommonHttpClient {
/**
* 此连接池所能提供的最大连接数
*/
private final static int MAX_TOTAL_CONNECTIONS = 50000;
private static final int MAX_TOTAL_CONNECTIONS = 50000;

/**
* 每个路由所能分配的最大连接数
*/
private final static int MAX_PER_ROUTE_CONNECTIONS = 80;
private static final int MAX_PER_ROUTE_CONNECTIONS = 80;

/**
* 从连接池中获取连接的默认超时时间 4秒
*/
private final static int REQUIRE_CONNECT_TIMEOUT = 4000;
private static final int REQUIRE_CONNECT_TIMEOUT = 4000;

/**
* 双端建立连接超时时间 4秒
*/
private final static int CONNECT_TIMEOUT = 4000;
private static final int CONNECT_TIMEOUT = 4000;

/**
* socketReadTimeout 响应tcp报文的最大间隔超时时间
*/
private final static int SOCKET_TIMEOUT = 60000;
private static final int SOCKET_TIMEOUT = 60000;

/**
* 空闲连接免检的有效时间,被重用的空闲连接若超过此时间,需检查此连接的可用性
*/
private final static int INACTIVITY_VALIDATED_TIME = 10000;
private static final int INACTIVITY_VALIDATED_TIME = 10000;

/**
* ssl版本
*/
private final static String[] SUPPORTED_SSL = {"TLSv1","TLSv1.1","TLSv1.2","SSLv3"};
private static final String[] SUPPORTED_SSL = {"TLSv1","TLSv1.1","TLSv1.2","SSLv3"};

static {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package org.dromara.hertzbeat.collector.collect.push;

import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.dromara.hertzbeat.collector.collect.AbstractCollect;
import org.dromara.hertzbeat.collector.collect.common.http.CommonHttpClient;
import org.dromara.hertzbeat.collector.dispatch.DispatchConstants;
import org.dromara.hertzbeat.collector.util.CollectUtil;
import org.dromara.hertzbeat.common.constants.CollectorConstants;
import org.dromara.hertzbeat.common.entity.dto.Message;
import org.dromara.hertzbeat.common.entity.job.Metrics;
import org.dromara.hertzbeat.common.entity.job.protocol.PushProtocol;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.entity.push.PushMetricsDto;
import org.dromara.hertzbeat.common.util.CommonUtil;
import org.dromara.hertzbeat.common.util.IpDomainUtil;
import org.dromara.hertzbeat.common.util.JsonUtil;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* push style collect
*
* @author vinci
*/
@Slf4j
public class PushCollectImpl extends AbstractCollect {

private static Map<Long, Long> timeMap = new ConcurrentHashMap<>();

// ms
private static final Integer timeout = 3000;

private static final Integer SUCCESS_CODE = 200;

// 第一次采集多久之前的数据,其实没有办法确定,因为无法确定上次何时采集,难以避免重启后重复采集的现象,默认30s
private static final Integer firstCollectInterval = 30000;

public PushCollectImpl() {
}

@Override
public void collect(CollectRep.MetricsData.Builder builder,
long appId, String app, Metrics metrics) {
long curTime = System.currentTimeMillis();

PushProtocol pushProtocol = metrics.getPush();

Long time = timeMap.getOrDefault(appId, curTime - firstCollectInterval);
timeMap.put(appId, curTime);

HttpContext httpContext = createHttpContext(pushProtocol);
HttpUriRequest request = createHttpRequest(pushProtocol, appId, time);

try {
CloseableHttpResponse response = CommonHttpClient.getHttpClient().execute(request, httpContext);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != SUCCESS_CODE) {
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("StatusCode " + statusCode);
return;
}
String resp = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);

parseResponse(builder, resp, metrics);

} catch (Exception e) {
String errorMsg = CommonUtil.getMessageFromThrowable(e);
log.error(errorMsg, e);
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg(errorMsg);
}

}

@Override
public String supportProtocol() {
return DispatchConstants.PROTOCOL_PUSH;
}

private HttpContext createHttpContext(PushProtocol pushProtocol) {
HttpHost host = new HttpHost(pushProtocol.getHost(), Integer.parseInt(pushProtocol.getPort()));
HttpClientContext httpClientContext = new HttpClientContext();
httpClientContext.setTargetHost(host);
return httpClientContext;
}

private HttpUriRequest createHttpRequest(PushProtocol pushProtocol, Long monitorId, Long startTime) {
RequestBuilder requestBuilder = RequestBuilder.get();


// uri
String uri = CollectUtil.replaceUriSpecialChar(pushProtocol.getUri());
if (IpDomainUtil.isHasSchema(pushProtocol.getHost())) {
requestBuilder.setUri(pushProtocol.getHost() + ":" + pushProtocol.getPort() + uri);
} else {
String ipAddressType = IpDomainUtil.checkIpAddressType(pushProtocol.getHost());
String baseUri = CollectorConstants.IPV6.equals(ipAddressType)
? String.format("[%s]:%s", pushProtocol.getHost(), pushProtocol.getPort() + uri)
: String.format("%s:%s", pushProtocol.getHost(), pushProtocol.getPort() + uri);

requestBuilder.setUri(CollectorConstants.HTTP_HEADER + baseUri);
}

requestBuilder.addHeader(HttpHeaders.CONNECTION, "keep-alive");
requestBuilder.addHeader(HttpHeaders.USER_AGENT, "Mozilla/5.0 (Windows NT 6.1; WOW64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.76 Safari/537.36");

requestBuilder.addParameter("id", String.valueOf(monitorId));
requestBuilder.addParameter("time", String.valueOf(startTime));
requestBuilder.addHeader(HttpHeaders.ACCEPT, "application/json");


//requestBuilder.setUri(pushProtocol.getUri());

if (timeout > 0) {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(timeout)
.setSocketTimeout(timeout)
.setRedirectsEnabled(true)
.build();
requestBuilder.setConfig(requestConfig);
}

return requestBuilder.build();
}

private void parseResponse(CollectRep.MetricsData.Builder builder, String resp, Metrics metric) {
// Map<String, Object> jsonMap = JsonUtil.fromJson(resp, new TypeReference<Map<String, Object>>() {
// });
// if (jsonMap == null) {
// throw new NullPointerException("parse result is null");
// }
Message<PushMetricsDto> msg = JsonUtil.fromJson(resp, new TypeReference<Message<PushMetricsDto>>() {
});
if (msg == null) {
throw new NullPointerException("parse result is null");
}
PushMetricsDto pushMetricsDto = msg.getData();
if (pushMetricsDto == null || pushMetricsDto.getMetricsList() == null) {
throw new NullPointerException("parse result is null");
}
for (PushMetricsDto.Metrics pushMetrics : pushMetricsDto.getMetricsList()) {
List<String> metricColumn = new ArrayList<>();
for (Metrics.Field field : metric.getFields()) {
metricColumn.add(pushMetrics.getMetrics().get(field.getField()));
}
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder()
.addAllColumns(metricColumn);
builder.addValues(valueRowBuilder.build());
}
builder.setTime(System.currentTimeMillis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public interface DispatchConstants {
* protocol rocketmq
*/
String PROTOCOL_ROCKETMQ = "rocketmq";
/**
* protocol push
*/
String PROTOCOL_PUSH = "push";
// Protocol type related - end
// 协议类型相关 - end //

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hertzbeat.collector.collect.AbstractCollect;
import org.dromara.hertzbeat.collector.collect.strategy.CollectStrategyFactory;
import org.dromara.hertzbeat.collector.dispatch.timer.Timeout;
import org.dromara.hertzbeat.collector.dispatch.timer.WheelTimerTask;
import org.dromara.hertzbeat.collector.dispatch.unit.UnitConvert;
import org.dromara.hertzbeat.collector.util.CollectUtil;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.entity.job.Job;
import org.dromara.hertzbeat.common.entity.job.Metrics;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hertzbeat.collector.dispatch.DispatchConstants;
import org.dromara.hertzbeat.collector.dispatch.MetricsTaskDispatch;
import org.dromara.hertzbeat.collector.util.CollectUtil;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.entity.job.Configmap;
import org.dromara.hertzbeat.common.entity.job.Job;
import org.dromara.hertzbeat.common.entity.job.Metrics;
import org.dromara.hertzbeat.common.entity.job.protocol.PushProtocol;
import org.dromara.hertzbeat.common.support.SpringContextHolder;
import org.dromara.hertzbeat.common.util.AesUtil;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -39,7 +41,6 @@
* TimerTask实现
*
* @author tomsun28
*
*/
@Slf4j
public class WheelTimerTask implements TimerTask {
Expand Down Expand Up @@ -85,6 +86,9 @@ private void initJobMetrics(Job job) {
JsonElement jsonElement = GSON.toJsonTree(metric);
CollectUtil.replaceSmilingPlaceholder(jsonElement, configmap);
metric = GSON.fromJson(jsonElement, Metrics.class);
if (job.getApp().equals(DispatchConstants.PROTOCOL_PUSH)) {
CollectUtil.replaceFieldsForPushStyleMonitor(metric, configmap);
}
metricsTmp.add(metric);
}
job.setMetrics(metricsTmp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.gson.*;
import org.dromara.hertzbeat.common.entity.job.Configmap;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.entity.job.Configmap;
import org.dromara.hertzbeat.common.entity.job.Metrics;
import org.dromara.hertzbeat.common.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.Iterator;
Expand All @@ -33,8 +34,8 @@

/**
* 采集器工具类
* @author tom
*
* @author tom
*/
@Slf4j
public class CollectUtil {
Expand All @@ -47,10 +48,11 @@ public class CollectUtil {
private static final String CRYING_PLACEHOLDER_REX = "\\^o\\^";
private static final String CRYING_PLACEHOLDER_REGEX = "(\\^o\\^)(\\w|-|$|\\.)+(\\^o\\^)";
private static final Pattern CRYING_PLACEHOLDER_REGEX_PATTERN = Pattern.compile(CRYING_PLACEHOLDER_REGEX);
private static final List<String> UNIT_SYMBOLS = Arrays.asList("%","G", "g", "M", "m", "K", "k", "B", "b");
private static final List<String> UNIT_SYMBOLS = Arrays.asList("%", "G", "g", "M", "m", "K", "k", "B", "b");

/**
* 关键字匹配计数
*
* @param content 内容
* @param keyword 关键字
* @return 匹配次数
Expand Down Expand Up @@ -137,6 +139,7 @@ public void setUnit(String unit) {

/**
* get timeout integer
*
* @param timeout timeout str
* @return timeout
*/
Expand All @@ -146,7 +149,8 @@ public static int getTimeout(String timeout) {

/**
* get timeout integer or default value
* @param timeout timeout str
*
* @param timeout timeout str
* @param defaultTimeout default timeout
* @return timeout
*/
Expand All @@ -164,8 +168,8 @@ public static int getTimeout(String timeout, int defaultTimeout) {
/**
* assert prom field
*/
public static Boolean assertPromRequireField(String aliasField){
if (CommonConstants.PROM_TIME.equals(aliasField) || CommonConstants.PROM_VALUE.equals(aliasField)){
public static Boolean assertPromRequireField(String aliasField) {
if (CommonConstants.PROM_TIME.equals(aliasField) || CommonConstants.PROM_VALUE.equals(aliasField)) {
return true;
}
return false;
Expand All @@ -174,6 +178,7 @@ public static Boolean assertPromRequireField(String aliasField){

/**
* is contains cryPlaceholder -_-
*
* @param jsonElement json element
* @return return true when contains
*/
Expand Down Expand Up @@ -294,7 +299,8 @@ public static JsonElement replaceSmilingPlaceholder(JsonElement jsonElement, Map
Configmap param = configmap.get(key);
if (param != null && param.getType() == CommonConstants.PARAM_TYPE_MAP) {
String jsonValue = (String) param.getValue();
TypeReference<Map<String, String>> typeReference = new TypeReference<>() {};
TypeReference<Map<String, String>> typeReference = new TypeReference<>() {
};
Map<String, String> map = JsonUtil.fromJson(jsonValue, typeReference);
if (map != null) {
map.forEach((name, value) -> {
Expand Down Expand Up @@ -382,7 +388,7 @@ public static JsonElement replaceSmilingPlaceholder(JsonElement jsonElement, Map
index++;
}
} else {
jsonArray.set(index, value == null ? JsonNull.INSTANCE : new JsonPrimitive(value));
jsonArray.set(index, value == null ? JsonNull.INSTANCE : new JsonPrimitive(value));
}
}
} else {
Expand All @@ -399,4 +405,12 @@ public static String replaceUriSpecialChar(String uri) {
// todo more special
return uri;
}


public static void replaceFieldsForPushStyleMonitor(Metrics metrics, Map<String, Configmap> configmap) {

List<Metrics.Field> pushFieldList = JsonUtil.fromJson((String) configmap.get("fields").getValue(), new TypeReference<List<Metrics.Field>>() {
});
metrics.setFields(pushFieldList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ org.dromara.hertzbeat.collector.collect.snmp.SnmpCollectImpl
org.dromara.hertzbeat.collector.collect.ssh.SshCollectImpl
org.dromara.hertzbeat.collector.collect.telnet.TelnetCollectImpl
org.dromara.hertzbeat.collector.collect.ftp.FtpCollectImpl
org.dromara.hertzbeat.collector.collect.mq.RocketmqSingleCollectImpl
org.dromara.hertzbeat.collector.collect.mq.RocketmqSingleCollectImpl
org.dromara.hertzbeat.collector.collect.push.PushCollectImpl
Loading

0 comments on commit 624e5d5

Please sign in to comment.