From 58354b648fb178fae486cb8ba2bfa04ae8a12867 Mon Sep 17 00:00:00 2001 From: cuipiheqiuqiu <76642201+cuipiheqiuqiu@users.noreply.github.com> Date: Mon, 7 Nov 2022 23:48:19 +0800 Subject: [PATCH] WIP:feature support k8s monitor, http monitor nacos, service&http_micro monitor msa (#421) [collector] bugfix: Solve imprecise cyclicJob execution interval problem. 1.wheelTimer tickDuration from 10s changed to 1s,ticksPerWheel remain unchanged. 2.now support second level job interval. [manager] elasticsearch support ssl as an option [monitor] 1.K8sClient monitor k8s 2.http monitor nacos 3.service&http_micro monitor msa [collector]fix pmd problem --- collector/pom.xml | 11 + .../collect/AbstractParseResponse.java | 80 ++++ .../collect/http/HttpCollectImpl.java | 6 +- .../MicroServiceActuatorHttpCollectImpl.java | 353 ++++++++++++++ .../http/micro/AbstractMicroParse.java | 109 +++++ .../collect/http/micro/MicroCommonParse.java | 91 ++++ .../collect/http/micro/MicroLastParser.java | 78 +++ .../collect/http/micro/MicroParseCreater.java | 28 ++ .../http/micro/MicroRequestsParse.java | 57 +++ .../collector/collect/k8s/K8sCollectImpl.java | 240 ++++++++++ .../collect/k8s/K8sMetricsModel.java | 35 ++ .../collector/collect/k8s/K8sMetricsUtil.java | 305 ++++++++++++ .../microservice/JsonPathParseResponse.java | 134 ++++++ .../MicroServiceParentCollectImpl.java | 113 +++++ .../parse/DefaultParseResponse.java | 75 +++ .../parse/MicroParseResponse.java | 38 ++ .../parse/PrometheusParseResponse.java | 32 ++ .../parse/SiteMapParseResponse.java | 133 ++++++ .../parse/WebsiteParseResponse.java | 44 ++ .../parse/XmlPathParseResponse.java | 29 ++ .../strategy/CollectStrategyFactory.java | 44 ++ .../strategy/ParseStrategyFactory.java | 46 ++ .../collector/dispatch/DispatchConstants.java | 37 ++ .../collector/dispatch/MetricsCollect.java | 13 + .../usthe/collector/util/JsonPathParser.java | 46 +- .../com/usthe/collector/util/K8sClient.java | 119 +++++ .../com/usthe/common/entity/job/Metrics.java | 108 ++++- .../entity/job/protocol/HttpProtocol.java | 3 + .../entity/job/protocol/K8sProtocol.java | 36 ++ .../entity/job/protocol/ServiceProtocol.java | 45 ++ .../usthe/common/model/ServicePodModel.java | 52 ++ .../java/com/usthe/common/util/GsonUtil.java | 9 + .../define/app/app-elasticsearch.yml | 8 + .../main/resources/define/app/appmanager.yml | 448 ++++++++++++++++++ manager/src/main/resources/define/app/k8s.yml | 93 ++++ .../src/main/resources/define/app/nacos.yml | 57 +++ .../resources/define/param/appmanager.yml | 24 + .../src/main/resources/define/param/k8s.yml | 20 + .../src/main/resources/define/param/nacos.yml | 14 + .../define/param/param-elasticsearch.yml | 7 + 40 files changed, 3208 insertions(+), 12 deletions(-) create mode 100644 collector/src/main/java/com/usthe/collector/collect/AbstractParseResponse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/http/MicroServiceActuatorHttpCollectImpl.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/http/micro/AbstractMicroParse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/http/micro/MicroCommonParse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/http/micro/MicroLastParser.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/http/micro/MicroParseCreater.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/http/micro/MicroRequestsParse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/k8s/K8sCollectImpl.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/k8s/K8sMetricsModel.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/k8s/K8sMetricsUtil.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/microservice/JsonPathParseResponse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/microservice/MicroServiceParentCollectImpl.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/microservice/parse/DefaultParseResponse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/microservice/parse/MicroParseResponse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/microservice/parse/PrometheusParseResponse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/microservice/parse/SiteMapParseResponse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/microservice/parse/WebsiteParseResponse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/microservice/parse/XmlPathParseResponse.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/strategy/CollectStrategyFactory.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/strategy/ParseStrategyFactory.java create mode 100644 collector/src/main/java/com/usthe/collector/util/K8sClient.java create mode 100644 common/src/main/java/com/usthe/common/entity/job/protocol/K8sProtocol.java create mode 100644 common/src/main/java/com/usthe/common/entity/job/protocol/ServiceProtocol.java create mode 100644 common/src/main/java/com/usthe/common/model/ServicePodModel.java create mode 100644 manager/src/main/resources/define/app/appmanager.yml create mode 100644 manager/src/main/resources/define/app/k8s.yml create mode 100644 manager/src/main/resources/define/app/nacos.yml create mode 100644 manager/src/main/resources/define/param/appmanager.yml create mode 100644 manager/src/main/resources/define/param/k8s.yml create mode 100644 manager/src/main/resources/define/param/nacos.yml diff --git a/collector/pom.xml b/collector/pom.xml index 665e549d46d..b90785b57a2 100644 --- a/collector/pom.xml +++ b/collector/pom.xml @@ -49,6 +49,11 @@ 1.0 provided + + + org.springframework.boot + spring-boot-starter-validation + io.etcd @@ -148,6 +153,12 @@ snmp4j 3.6.7 + + + io.kubernetes + client-java + 10.0.1 + \ No newline at end of file diff --git a/collector/src/main/java/com/usthe/collector/collect/AbstractParseResponse.java b/collector/src/main/java/com/usthe/collector/collect/AbstractParseResponse.java new file mode 100644 index 00000000000..c2e76adb10e --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/AbstractParseResponse.java @@ -0,0 +1,80 @@ +package com.usthe.collector.collect; + +import com.usthe.common.entity.job.protocol.ServiceProtocol; +import com.usthe.common.model.ServicePodModel; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.entity.message.CollectRep; +import org.springframework.beans.factory.InitializingBean; + +import java.util.List; +import java.util.Map; + + +/** + * 不同数据格式解析抽象类 + * + * + */ + +public interface AbstractParseResponse extends InitializingBean { + /** + * 通用解析抽象方法 + * + * @param resp + * @param aliasFields + * @param http + * @param builder + * @param responseTime + */ + public default void parseResponse(String resp, List aliasFields, HttpProtocol http, + CollectRep.MetricsData.Builder builder, Long responseTime) { + + } + + /** + * k8s解析方式 + * + * @param metrics + * @param resp + * @param podMap + * @param aliasFields + * @param service + * @param builder + * @param responseTime + */ + public default void parseK8sApi(Metrics metrics, Object resp, Map podMap, List aliasFields, ServiceProtocol service, + CollectRep.MetricsData.Builder builder, Long responseTime) { + + } + + /** + * 微服务响应体解析方法 + * @param resp + * @param fields + * @param aliasFields + * @param jsonScript + * @param http + * @param tempcloums + * @param kv + */ + public default void parseResponse(String resp,List fields, List aliasFields ,List jsonScript, HttpProtocol http, + Map> tempcloums,String kv){ + + } + + /** + * 微服务响应体解析方法 + * @param resp + * @param field + * @param aliasField + * @param jsonScript + * @param http + * @param tempcloums + * @param kv + */ + public default void parseResponse(String resp,String field, String aliasField ,String jsonScript, HttpProtocol http, + Map> tempcloums,String kv){ + + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java index f69ed02fc2e..97615b6916b 100644 --- a/collector/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java +++ b/collector/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java @@ -86,7 +86,7 @@ @Slf4j public class HttpCollectImpl extends AbstractCollect { - private HttpCollectImpl() {} + public HttpCollectImpl() {} public static HttpCollectImpl getInstance() { return Singleton.INSTANCE; @@ -397,7 +397,7 @@ private void parseResponseByDefault(String resp, List aliasFields, HttpP * @param httpProtocol http protocol * @return context */ - private HttpContext createHttpContext(HttpProtocol httpProtocol) { + public HttpContext createHttpContext(HttpProtocol httpProtocol) { HttpProtocol.Authorization auth = httpProtocol.getAuthorization(); if (auth != null && DispatchConstants.DIGEST_AUTH.equals(auth.getType())) { HttpClientContext clientContext = new HttpClientContext(); @@ -422,7 +422,7 @@ private HttpContext createHttpContext(HttpProtocol httpProtocol) { * @param httpProtocol http参数配置 * @return 请求体 */ - private HttpUriRequest createHttpRequest(HttpProtocol httpProtocol) { + public HttpUriRequest createHttpRequest(HttpProtocol httpProtocol) { RequestBuilder requestBuilder; // method String httpMethod = httpProtocol.getMethod().toUpperCase(); diff --git a/collector/src/main/java/com/usthe/collector/collect/http/MicroServiceActuatorHttpCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/http/MicroServiceActuatorHttpCollectImpl.java new file mode 100644 index 00000000000..40a5bd07b37 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/http/MicroServiceActuatorHttpCollectImpl.java @@ -0,0 +1,353 @@ +package com.usthe.collector.collect.http; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import com.usthe.collector.collect.AbstractParseResponse; +import com.usthe.collector.collect.common.http.CommonHttpClient; + +import com.usthe.collector.collect.strategy.CollectStrategyFactory; +import com.usthe.collector.collect.strategy.ParseStrategyFactory; +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.collector.util.JsonPathParser; +import com.usthe.common.entity.job.Configmap; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.AesUtil; +import com.usthe.common.util.CommonConstants; +import com.usthe.common.util.GsonUtil; +import com.jayway.jsonpath.TypeRef; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpStatus; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; + +import javax.net.ssl.SSLException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.stream.Collectors; + + + +/** + * msa http https collect + * + * + */ +@Slf4j +public class MicroServiceActuatorHttpCollectImpl extends HttpCollectImpl { + public static final int SIZE = 50; + + + private MicroServiceActuatorHttpCollectImpl() { + super(); + } + + public static MicroServiceActuatorHttpCollectImpl getInstance() { + return Singleton.INSTANCE; + } + + + @Override + public void collect(CollectRep.MetricsData.Builder builder,long appId, String app, Metrics metrics) { + // 校验参数 + try { + if (metrics == null || metrics.getHttp() == null) { + throw new Exception("Http/Https collect must has http params"); + } + } catch (Exception e) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg(e.getMessage()); + return; + } + List row = new ArrayList<>(); + try{ + HttpProtocol tempHttp = metrics.getHttp(); + List param = metrics.getChildParam(); + Configmap configmap = param.stream().filter(c -> DispatchConstants.CHILD_REQUESTS.equals(c.getKey())).findFirst().orElse(null); + List> requestValue = (List>)configmap.getValue(); + Map parentMetrics = metrics.getParentMetrics(); + List fieldList = metrics.getAliasFields(); + Map> tempColums = new HashMap<>(16); + //遍历不同请求的相关指标采集 + for (Map map : requestValue) { + //当前请求采集那些指标名称 + List fields = (List) map.get("fields"); + //当前请求需采集指标的具体名称 + List aliasFields = (List) map.get("aliasFields"); + String url = String.valueOf(map.get("url")); + String method = String.valueOf(map.get("method")); + //选择那个链路解析数据 + String chain = String.valueOf(map.get("chain")); + List metaData = (List) map.get("metaData"); + HttpProtocol http = GsonUtil.fromJson(GsonUtil.toJson(tempHttp), HttpProtocol.class); + JsonElement jsonElement = GsonUtil.toJsonTree(http); + jsonElement = replaceSpecialValue(jsonElement, param); + http = GsonUtil.fromJson(jsonElement, HttpProtocol.class); + http.setMethod(method); + http.setUrl(url); + http.setChain(chain); + //根据情况判断类型走不同的采集方式 + if(map.get("params") instanceof List){ + List> params =(List>)map.get("params"); + for (int i = 0; i < fields.size(); i++) { + if(params!=null && i pMap = params.get(i); + String field = pMap.get("field"); + String value = pMap.get("value"); + if (StringUtils.isNotEmpty(field)) { + Map paramMap = new HashMap(16); + paramMap.put(field, value); + http.setParams(paramMap); + } + } + String resp = request(http, builder); + String parseType = metrics.getHttp().getParseType(); + AbstractParseResponse abstractParseResponse = ParseStrategyFactory.invoke(parseType); + abstractParseResponse.parseResponse(resp, fields.get(i),aliasFields!=null?aliasFields.get(i):fields.get(i), metaData.get(i), http, tempColums,null); + } + }else{ + Map params =(Map)map.get("params"); + if(params!=null&&!params.isEmpty()){ + String script = params.get("script"); + String field = params.get("field"); + String value = params.get("value"); + if(StringUtils.isEmpty(script)){ + if(StringUtils.isNotEmpty(field)){ + Map paramMap = new HashMap(16); + paramMap.put(field,value); + http.setParams(paramMap); + } + String resp = request(http, builder); + String parseType = metrics.getHttp().getParseType(); + AbstractParseResponse abstractParseResponse = ParseStrategyFactory.invoke(parseType); + abstractParseResponse.parseResponse(resp,fields,aliasFields!=null?aliasFields:fields,metaData,http,tempColums,null); + }else{ + String resp = request(http, builder); + List results = JsonPathParser.parseContentWithJsonPath(resp, script, new TypeRef>() {}); + if(results!=null&&!results.isEmpty()) { + for (String p : results.subList(0,results.size()>SIZE?SIZE:results.size())) { + http.setMethod(method); + http.setUrl(url); + if (StringUtils.isNotEmpty(field)) { + Map paramMap = new HashMap(16); + paramMap.put(field, String.format(value, p)); + http.setParams(paramMap); + } + String var3 = request(http, builder); + String parseType = metrics.getHttp().getParseType(); + AbstractParseResponse abstractParseResponse = ParseStrategyFactory.invoke(parseType); + abstractParseResponse.parseResponse(var3, fields,aliasFields!=null?aliasFields:fields, metaData, http, tempColums,String.format(value, p)); + } + } + } + }else{ + String resp = request(http, builder); + String parseType = metrics.getHttp().getParseType(); + AbstractParseResponse abstractParseResponse = ParseStrategyFactory.invoke(parseType); + abstractParseResponse.parseResponse(resp,fields,aliasFields!=null?aliasFields:fields,metaData,http,tempColums,null); + } + } + } + //获取其中行数最大的值 + int bigcloums = 0 ; + Iterator>> iterator = tempColums.entrySet().stream().iterator(); + while (iterator.hasNext()){ + List value = iterator.next().getValue(); + if(value!=null && value.size()>bigcloums){ + bigcloums=value.size(); + } + } + //通过采集到的数据创建来创建列 + for (int i=0;i list = tempColums.get(field); + for (int i=0;i= HttpStatus.SC_BAD_REQUEST) { + // 1XX 4XX 5XX 状态码 失败 + throw new Exception("StatusCode " + statusCode); + } else { + // 2xx 3xx 状态码 成功 + return EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + } + } + + private static class Singleton { + private static final MicroServiceActuatorHttpCollectImpl INSTANCE = new MicroServiceActuatorHttpCollectImpl(); + } + + + /** + * json parameter replacement json参数替换 + * + * @param jsonElement json + * @param config parameter list 参数list + * @return json + */ + private JsonElement replaceSpecialValue(JsonElement jsonElement, List config) { + Map configmap = config.stream() + .peek(item -> { + // 对加密串进行解密 + if (item.getType() == CommonConstants.PARAM_TYPE_PASSWORD && item.getValue() != null) { + String decodeValue = AesUtil.aesDecode(String.valueOf(item.getValue())); + if (decodeValue == null) { + log.error("Aes Decode value {} error.", item.getValue()); + } + item.setValue(decodeValue); + } else if (item.getValue() != null && item.getValue() instanceof String) { + item.setValue(((String) item.getValue()).trim()); + } + }) + .collect(Collectors.toMap(Configmap::getKey, item -> item)); + + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + Iterator> iterator = jsonObject.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + JsonElement element = entry.getValue(); + String key = entry.getKey(); + // Replace the attributes of the KEY-VALUE case such as http headers params + // 替换KEY-VALUE情况的属性 比如http headers params + if (key != null && key.startsWith("^o^") && key.endsWith("^o^")) { + key = key.replaceAll("\\^o\\^", ""); + Configmap param = configmap.get(key); + if (param != null && param.getType() == (byte) 3) { + String jsonValue = (String) param.getValue(); + Map map = GsonUtil.fromJson(jsonValue, Map.class); + if (map != null) { + map.forEach((name, value) -> { + if (name != null && !"".equals(name.trim())) { + jsonObject.addProperty(name, value); + } + }); + } + } + iterator.remove(); + continue; + } + // Replace normal VALUE value + // 替换正常的VALUE值 + if (element.isJsonPrimitive()) { + // Check if there are special characters Replace + // 判断是否含有特殊字符 替换 + String value = element.getAsString(); + if (value.startsWith("^o^") && value.endsWith("^o^")) { + value = value.replaceAll("\\^o\\^", ""); + Configmap param = configmap.get(value); + if (param != null) { + value = (String) param.getValue(); + jsonObject.addProperty(entry.getKey(), value); + } else { + iterator.remove(); + } + } + } else { + jsonObject.add(entry.getKey(), replaceSpecialValue(entry.getValue(), config)); + } + } + } else if (jsonElement.isJsonArray()) { + JsonArray jsonArray = jsonElement.getAsJsonArray(); + Iterator iterator = jsonArray.iterator(); + int index = 0; + while (iterator.hasNext()) { + JsonElement element = iterator.next(); + if (element.isJsonPrimitive()) { + // Check if there are special characters Replace + // 判断是否含有特殊字符 替换 + String value = element.getAsString(); + if (value.startsWith("^o^") && value.endsWith("^o^")) { + value = value.replaceAll("\\^o\\^", ""); + Configmap param = configmap.get(value); + if (param != null) { + value = (String) param.getValue(); + jsonArray.set(index, new JsonPrimitive(value)); + } else { + iterator.remove(); + } + } + } else { + jsonArray.set(index, replaceSpecialValue(element, config)); + } + index++; + } + } + return jsonElement; + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/http/micro/AbstractMicroParse.java b/collector/src/main/java/com/usthe/collector/collect/http/micro/AbstractMicroParse.java new file mode 100644 index 00000000000..aa19d7510f5 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/http/micro/AbstractMicroParse.java @@ -0,0 +1,109 @@ +package com.usthe.collector.collect.http.micro; + +import com.usthe.common.entity.job.protocol.HttpProtocol; + +import java.util.List; +import java.util.Map; + +/** + * 微服务解析链路 + * + * + */ +public abstract class AbstractMicroParse { + + /** + * 下游节点 + */ + private AbstractMicroParse microParse; + + AbstractMicroParse() { + } + + /** + * 设置实例 + * @param microParse + * @return + */ + public AbstractMicroParse setInstance(AbstractMicroParse microParse) { + this.microParse = microParse; + return this; + } + + /** + * 检查符合那个链路对象 + * @param http + * @return + */ + abstract Boolean checkType(HttpProtocol http); + + /** + * 微服务响应-通用解析方法 + * @param resp + * @param fields + * @param aliasFields + * @param jsonScript + * @param http + * @param tempcloums + * @param kv + */ + abstract void parse(String resp, List fields,List aliasFields,List jsonScript, HttpProtocol http, + Map> tempcloums,String kv); + + /** + * 微服务响应-获取请求次数解析方法 + * @param resp + * @param field + * @param aliasField + * @param jsonScript + * @param http + * @param tempcloums + * @param kv + */ + abstract void parse(String resp,String field, String aliasField ,String jsonScript, HttpProtocol http, + Map> tempcloums,String kv); + + /** + * 微服务响应-默认解析方法 + * @param resp + * @param fields + * @param aliasFields + * @param jsonScript + * @param http + * @param tempcloums + * @param kv + */ + public void handle(String resp, List fields,List aliasFields,List jsonScript, HttpProtocol http, + Map> tempcloums,String kv) { + if (checkType(http)) { + parse(resp,fields, aliasFields,jsonScript, http, + tempcloums,kv); + } else { + microParse.handle(resp, fields,aliasFields, jsonScript,http, + tempcloums,kv); + } + } + + /** + * 获取链路对象处理方法 + * @param resp + * @param field + * @param aliasField + * @param jsonScript + * @param http + * @param tempcloums + * @param kv + */ + public void handle(String resp, String field,String aliasField,String jsonScript, HttpProtocol http, + Map> tempcloums,String kv) { + if (checkType(http)) { + parse(resp,field, aliasField,jsonScript, http, + tempcloums,kv); + } else { + microParse.handle(resp,field, aliasField, jsonScript,http, + tempcloums,kv); + } + } + + +} diff --git a/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroCommonParse.java b/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroCommonParse.java new file mode 100644 index 00000000000..47456752c41 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroCommonParse.java @@ -0,0 +1,91 @@ +package com.usthe.collector.collect.http.micro; + +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.collector.util.JsonPathParser; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.util.CommonConstants; +import com.jayway.jsonpath.TypeRef; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 微服务响应通用解析 + * + * + */ + +public class MicroCommonParse extends AbstractMicroParse { + @Override + Boolean checkType(HttpProtocol http) { + if(DispatchConstants.PARSE_CHAIN_COMMON.equals(http.getChain())){ + return true; + } + return false; + } + + @Override + void parse(String resp, List fields,List aliasFields, List jsonScript, HttpProtocol http, Map> tempcloums,String kv) { + for (int i = 0; i < aliasFields.size(); i++) { + TypeRef>> typeRef = new TypeRef>>() {}; + List> results = JsonPathParser.parseContentWithJsonPath(resp,jsonScript.get(i),typeRef); + if(results==null||results.isEmpty()){ + if(tempcloums.containsKey(fields.get(i))){ + continue; + }else { + tempcloums.put(fields.get(i), null); + } + }else{ + Map m = new HashMap<>(16); + results.stream().forEach(map -> { + String key = ""; + Object value = null; + if (map.containsKey("statistic")) { + key = String.valueOf(map.get("statistic")).toLowerCase(); + value = String.valueOf(map.get("value")); + } else if (map.containsKey("tag")) { + key = String.valueOf(map.get("tag")).toLowerCase(); + value = String.valueOf(map.get("values")); + } + m.put(key,value); + + }); + if(kv!=null&&kv.contains(aliasFields.get(i))){ + if(tempcloums.containsKey(fields.get(i))){ + List list = tempcloums.get(fields.get(i)); + list.add(String.valueOf(kv.split(":")[1])); + }else { + ArrayList objects = new ArrayList<>(); + objects.add(String.valueOf(kv.split(":")[1])); + tempcloums.put(fields.get(i),objects); + } + }else if(m.containsKey(aliasFields.get(i))){ + if(tempcloums.containsKey(fields.get(i))){ + List list = tempcloums.get(fields.get(i)); + list.add(String.valueOf(m.get(aliasFields.get(i)))); + }else { + ArrayList objects = new ArrayList<>(); + objects.add(String.valueOf(m.get(aliasFields.get(i)))); + tempcloums.put(fields.get(i),objects); + } + }else{ + if(tempcloums.containsKey(fields.get(i))){ + List list = tempcloums.get(fields.get(i)); + list.add(CommonConstants.NULL_VALUE); + }else { + ArrayList objects = new ArrayList<>(); + objects.add(CommonConstants.NULL_VALUE); + tempcloums.put(fields.get(i),objects); + } + } + } + } + } + + @Override + void parse(String resp,String field, String aliasField, String jsonScript, HttpProtocol http, Map> tempcloums,String kv) { + + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroLastParser.java b/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroLastParser.java new file mode 100644 index 00000000000..5b394b023e7 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroLastParser.java @@ -0,0 +1,78 @@ +package com.usthe.collector.collect.http.micro; + +import com.usthe.collector.util.JsonPathParser; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.util.CommonConstants; +import com.jayway.jsonpath.TypeRef; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +/** + * + * + * @description: 微服务默认解析 + * + */ +@Slf4j +@NoArgsConstructor +public class MicroLastParser extends AbstractMicroParse { + @Override + public Boolean checkType(HttpProtocol http) { + return true; + } + + @Override + public void parse(String resp,List fields, List aliasFields ,List jsonScript, HttpProtocol http, + Map> tempcloums,String kv) { + for (int i = 0; i < aliasFields.size(); i++) { + TypeRef>> typeRef = new TypeRef>>() {}; + List> results = JsonPathParser.parseContentWithJsonPath(resp,jsonScript.get(i),typeRef); + if(results==null||results.isEmpty()){ + if(tempcloums.containsKey(fields.get(i))){ + continue; + }else { + tempcloums.put(fields.get(i), null); + } + }else{ + for (Map result : results) { + if(kv!=null&&kv.contains(aliasFields.get(i))){ + if(tempcloums.containsKey(fields.get(i))){ + List list = tempcloums.get(fields.get(i)); + list.add(String.valueOf(kv.split(":")[1])); + }else { + ArrayList objects = new ArrayList<>(); + objects.add(String.valueOf(result.get(aliasFields.get(i)))); + tempcloums.put(fields.get(i),objects); + } + }else if(result.containsKey(aliasFields.get(i))){ + if(tempcloums.containsKey(fields.get(i))){ + List list = tempcloums.get(fields.get(i)); + list.add(String.valueOf(result.get(aliasFields.get(i)))); + }else { + ArrayList objects = new ArrayList<>(); + objects.add(String.valueOf(result.get(aliasFields.get(i)))); + tempcloums.put(fields.get(i),objects); + } + }else{ + if(tempcloums.containsKey(fields.get(i))){ + List list = tempcloums.get(fields.get(i)); + list.add(CommonConstants.NULL_VALUE); + }else { + ArrayList objects = new ArrayList<>(); + objects.add(CommonConstants.NULL_VALUE); + tempcloums.put(fields.get(i),objects); + } + } + } + } + } + } + + @Override + void parse(String resp,String field, String aliasField, String jsonScript, HttpProtocol http, Map> tempcloums,String kv) { + + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroParseCreater.java b/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroParseCreater.java new file mode 100644 index 00000000000..a273a28c063 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroParseCreater.java @@ -0,0 +1,28 @@ +package com.usthe.collector.collect.http.micro; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +/** + * + * + * @description: 微服务链路拼装 + */ +@Slf4j +@Component +public class MicroParseCreater implements InitializingBean { + private static AbstractMicroParse microParse = new MicroRequestsParse(); + + private static void create() { + microParse.setInstance(new MicroCommonParse().setInstance(new MicroLastParser())); + } + public static AbstractMicroParse getMicroParse(){ + return microParse; + } + + @Override + public void afterPropertiesSet() throws Exception { + create(); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroRequestsParse.java b/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroRequestsParse.java new file mode 100644 index 00000000000..36b4ba080d7 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/http/micro/MicroRequestsParse.java @@ -0,0 +1,57 @@ +package com.usthe.collector.collect.http.micro; + +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.collector.util.JsonPathParser; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.jayway.jsonpath.TypeRef; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +/** + * + * + * @description:微服务响应体请求次数解析 + */ +public class MicroRequestsParse extends AbstractMicroParse { + + public static final String DEFAULT_SCRIPT = "$"; + @Override + Boolean checkType(HttpProtocol http) { + if(DispatchConstants.PARSE_CHAIN_REQUESTS.equals(http.getChain())){ + return true; + } + return false; + } + + @Override + void parse(String resp, List fields,List aliasFields, List jsonScript, HttpProtocol http, Map> tempcloums, String kv) { + + } + + + @Override + void parse(String resp,String field, String aliasField ,String jsonScript, HttpProtocol http, + Map> tempcloums,String kv) { + if(DEFAULT_SCRIPT.equals(jsonScript)){ + TypeRef>> typeRef = new TypeRef>>(){}; + List> result = JsonPathParser.parseContentWithJsonPath(resp, jsonScript,typeRef); + if(result!=null&&!result.isEmpty()&&result.get(0).containsKey(aliasField)){ + tempcloums.put(field, Arrays.asList(String.valueOf(result.get(0).get(aliasField)))); + }else{ + tempcloums.put(field,null); + } + }else{ + TypeRef> typeRef = new TypeRef>(){}; + List result = JsonPathParser.parseContentWithJsonPath(resp, jsonScript,typeRef); + if(result!=null&&!result.isEmpty()){ + Double d = result.get(0); + tempcloums.put(field, Arrays.asList(String.valueOf(d))); + }else{ + tempcloums.put(field,null); + } + } + + + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/k8s/K8sCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/k8s/K8sCollectImpl.java new file mode 100644 index 00000000000..a5bb21388c5 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/k8s/K8sCollectImpl.java @@ -0,0 +1,240 @@ +package com.usthe.collector.collect.k8s; + +import com.usthe.collector.collect.AbstractCollect; +import com.usthe.collector.util.K8sClient; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.job.protocol.K8sProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.entity.message.CollectRep.MetricsData.Builder; +import io.kubernetes.client.openapi.models.V1NamespaceList; +import io.kubernetes.client.openapi.models.V1NodeList; +import io.kubernetes.client.openapi.models.V1PodList; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + + +/** + * k8s客户端查询的采集实现 - 1 + * + * + */ +@Slf4j +public class K8sCollectImpl extends AbstractCollect { + //todo 这部分代码 cv来的 配置文件不支持自定义没实现的指标,不够灵活,比如k8s service类型缺失,还是考虑用http api方式 + + public static K8sCollectImpl getInstance() { + return K8sCollectImpl.Singleton.INSTANCE; + } + + private K8sCollectImpl(){} + + @Override + public void collect(Builder builder, long appId, String app, Metrics metrics) { + long startTime = System.currentTimeMillis(); + //1. 获取k8s协议配置信息,初始化k8s客户端 + K8sClient k8sClient = initK8sClient(builder, metrics); + if(null==k8sClient){ + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("kubernetes collect create k8sClient failed!"); + return; + } + //2. 根据配置信息携带的type,选择不同的k8s客户端方法获取数据 + if(null==metrics.getK8s() || StringUtils.EMPTY.equals(metrics.getK8s().getType())){ + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("kubernetes collect must has k8s params:type!"); + return; + } + K8sMetricsModel k8sMetricsModel = getK8sMetricsByType(k8sClient, metrics.getK8s().getType()); + //3. 组装采集数据到builder中 + if(null==k8sMetricsModel || k8sMetricsModel.getMetricsMap().isEmpty()){ + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("kubernetes collect has no data!"); + return; + } + fillK8sMetricsToBuilder(builder, metrics, k8sMetricsModel); + log.info("kubernetes collect finish in {}ms.", System.currentTimeMillis()-startTime); + } + + /** + * 填充获取到的k8s指标数据 + * @param builder 被填充对象 + * @param metrics 指标信息 + * @param k8sMetricsModel 填充内容 + * */ + private void fillK8sMetricsToBuilder(Builder builder, Metrics metrics, @NotNull K8sMetricsModel k8sMetricsModel) { + try{ + Map> metricsMap = k8sMetricsModel.getMetricsMap(); + List metricsList = metrics.getAliasFields(); + int num = k8sMetricsModel.getNumber(); + for(int i=0; i columnData; + for(String column: metricsList){ + columnData = metricsMap.get(column); + if(columnData==null){ + valueRowBuilder.addColumns(StringUtils.EMPTY); + }else{ + valueRowBuilder.addColumns(columnData.get(i)); + } + } + builder.addValues(valueRowBuilder.build()); + } + }catch (Exception e){ + log.error("填充k8s指标数据发生错误:{}", e.getMessage()); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("kubernetes collect failed from fillK8sMetricsToBuilder!"); + } + } + + /** + * 初始化k8s客户端 + * @param builder builder + * @param metrics 采集指标信息,包含 K8sProtocol + * @return 初始化成功返回K8sClient实例,否则返回Null + * */ + @Nullable + private K8sClient initK8sClient(Builder builder, Metrics metrics){ + if(null==metrics || null==metrics.getK8s()){ + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("kubernetes collect must has k8s params"); + }else{ + K8sProtocol k8sProtocol = metrics.getK8s(); + try{ + K8sClient k8sClient = new K8sClient(k8sProtocol.getHost(), k8sProtocol.getPort(), + k8sProtocol.getToken()); + log.info("初始化k8s客户端成功,客户端连接k8s集群host:{},port:{}!",k8sProtocol.getHost(), k8sProtocol.getPort()); + return k8sClient; + }catch (RuntimeException e){ + log.error("初始化k8s客户端出错,错误详情:{}", e.getMessage()); + } + } + return null; + } + + /** + * k8sClient采集数据 + * 根据不同k8s指标类型,采用不同的采集方式获取指标,类型包括:节点node、命名空间namespace、pod + * @param k8sClient k8s客户端 + * @param type k8s指标类型 + * @return 返回k8s指标数据 + * */ + @Nullable + private K8sMetricsModel getK8sMetricsByType(@NotNull K8sClient k8sClient, @NotEmpty String type){ + K8sMetricsModel k8sMetricsModel = null; + switch (type){ + case K8sMetricsUtil.K8S_TYPE_NODE: + k8sMetricsModel = getNodeMetrics(k8sClient); + break; + case K8sMetricsUtil.K8S_TYPE_NAMESPACE: + k8sMetricsModel = getNamespaceMetrics(k8sClient); + break; + case K8sMetricsUtil.K8S_TYPE_POD: + k8sMetricsModel = getPodMetrics(k8sClient); + break; + default: + break; + } + return k8sMetricsModel; + } + + /** + * 采集k8s节点指标数据 + * @param k8sClient k8s客户端 + * @return 返回k8s指标模型 + * */ + @Nullable + private K8sMetricsModel getNodeMetrics(@NotNull K8sClient k8sClient) { + K8sMetricsModel k8sMetricsModel = null; + V1NodeList allNodes = k8sClient.getAllNodes(); + if(null!=allNodes && CollectionUtils.isNotEmpty(allNodes.getItems())){ + List metricsList = K8sMetricsUtil.retrieveNodeMetricsList(); + Map> metricsMap = K8sMetricsUtil.retrieveNodeMetricsData(allNodes); + if(CollectionUtils.isNotEmpty(metricsList) + && null!=metricsMap + && CollectionUtils.isNotEmpty(metricsMap.keySet())){ + int instanceNum = 0; + for(Entry> ele : metricsMap.entrySet()){ + instanceNum = ele.getValue().size(); + break; + } + k8sMetricsModel = new K8sMetricsModel(instanceNum, metricsList, metricsMap); + }else{ + log.error("获取节点指标信息失败,节点指标列表不存在!"); + } + }else{ + log.error("获取节点指标信息失败,节点不存在!"); + } + return k8sMetricsModel; + } + + /** + * 采集k8s命名空间指标数据 + * @param k8sClient k8s客户端 + * @return 返回k8s指标模型 + * */ + private K8sMetricsModel getNamespaceMetrics(@NotNull K8sClient k8sClient) { + K8sMetricsModel k8sMetricsModel = null; + V1NamespaceList allNamespaces = k8sClient.getAllNamespaces(); + if(null!=allNamespaces && CollectionUtils.isNotEmpty(allNamespaces.getItems())){ + List metricsList = K8sMetricsUtil.retrieveNamespaceMetricsList(); + Map> metricsMap = K8sMetricsUtil.retrieveNamespaceMetricsData(allNamespaces); + if(CollectionUtils.isNotEmpty(metricsList) + && null!=metricsMap + && CollectionUtils.isNotEmpty(metricsMap.keySet())){ + int instanceNum = 0; + for(Entry> ele : metricsMap.entrySet()){ + instanceNum = ele.getValue().size(); + break; + } + k8sMetricsModel = new K8sMetricsModel(instanceNum, metricsList, metricsMap); + }else{ + log.error("获取k8s命名空间指标信息失败,命名空间指标列表不存在!"); + } + }else{ + log.error("获取k8s命名空间指标信息失败,命名空间不存在!"); + } + return k8sMetricsModel; + } + + private K8sMetricsModel getPodMetrics(@NotNull K8sClient k8sClient) { + K8sMetricsModel k8sMetricsModel = null; + V1PodList allPods = k8sClient.getAllPodList(); + if(null!=allPods && CollectionUtils.isNotEmpty(allPods.getItems())){ + List metricsList = K8sMetricsUtil.retrievePodMetricsList(); + Map> metricsMap = K8sMetricsUtil.retrievePodMetricsData(allPods); + if(CollectionUtils.isNotEmpty(metricsList) + && null!=metricsMap + && CollectionUtils.isNotEmpty(metricsMap.keySet())){ + int instanceNum = 0; + for(Entry> ele : metricsMap.entrySet()){ + instanceNum = ele.getValue().size(); + break; + } + k8sMetricsModel = new K8sMetricsModel(instanceNum, metricsList, metricsMap); + }else{ + log.error("获取k8s命名空间指标信息失败,命名空间指标列表不存在!"); + } + }else{ + log.error("获取k8s命名空间指标信息失败,命名空间不存在!"); + } + return k8sMetricsModel; + } + + private static class Singleton { + private static final K8sCollectImpl INSTANCE = new K8sCollectImpl(); + } + +// @Override +// public void afterPropertiesSet() throws Exception { +// CollectStrategyFactory.register(DispatchConstants.PROTOCOL_K8S, this); +// } + +} diff --git a/collector/src/main/java/com/usthe/collector/collect/k8s/K8sMetricsModel.java b/collector/src/main/java/com/usthe/collector/collect/k8s/K8sMetricsModel.java new file mode 100644 index 00000000000..6658d2f0bfd --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/k8s/K8sMetricsModel.java @@ -0,0 +1,35 @@ +package com.usthe.collector.collect.k8s; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; + + +/** + * K8sMetricsModel k8s指标模型,node/namespace/pod三种类型指标通用 + * + * + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class K8sMetricsModel { + //todo 定义不合理 + /** + * k8s对象个数 + * */ + Integer number; + /** + * 指标名集合 + * */ + List metricsList; + /** + * 指标值 + * */ + Map> metricsMap; +} diff --git a/collector/src/main/java/com/usthe/collector/collect/k8s/K8sMetricsUtil.java b/collector/src/main/java/com/usthe/collector/collect/k8s/K8sMetricsUtil.java new file mode 100644 index 00000000000..ab635c45a51 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/k8s/K8sMetricsUtil.java @@ -0,0 +1,305 @@ +package com.usthe.collector.collect.k8s; + +import com.usthe.common.util.GsonUtil; +import io.kubernetes.client.custom.Quantity; +import io.kubernetes.client.openapi.models.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; +import java.util.*; + + +/** + * k8s指标采集工具类:常量定义、参数解析 + * + * + */ +@Slf4j +public class K8sMetricsUtil { + /** + * k8s指标类型 + * */ + public static final String K8S_TYPE_NODE = "node"; + public static final String K8S_TYPE_NAMESPACE = "namespace"; + public static final String K8S_TYPE_POD = "pod"; + + /** + * node指标类型 指标 + * */ + public static final int NODE_METRICS_COUNT = 8; + public static final String NODE_METRICS_NODE_NAME = "node_name"; + public static final String NODE_METRICS_LABELS = "labels"; + public static final String NODE_METRICS_IS_READY = "is_ready"; + public static final String NODE_METRICS_CAPACITY_CPU = "capacity_cpu"; + public static final String NODE_METRICS_ALLOCATABLE_CPU = "allocatable_cpu"; + public static final String NODE_METRICS_CAPACITY_MEMORY = "capacity_memory"; + public static final String NODE_METRICS_ALLOCATABLE_MEMORY = "allocatable_memory"; + public static final String NODE_METRICS_CREATION_TIME = "creation_time"; + + /** + * namespace指标类型 指标 + * */ + public static final int NAMESPACE_METRICS_COUNT = 3; + public static final String NAMESPACE_METRICS_NAMESPACE_NAME = "namespace_name"; + public static final String NAMESPACE_METRICS_LABELS = "labels"; + public static final String NAMESPACE_METRICS_CREATION_TIME = "creation_time"; + + /** + * pod指标类型 指标 + * */ + public static final int POD_METRICS_COUNT = 7; + public static final String POD_METRICS_POD_NAME = "pod_name"; + public static final String POD_METRICS_LABELS = "labels"; + public static final String POD_METRICS_NAMESPACE_NAME = "namespace_name"; + public static final String POD_METRICS_POD_STATUS = "pod_status"; + public static final String POD_METRICS_IMAGES = "images"; + public static final String POD_METRICS_RESTART = "restart"; + public static final String POD_METRICS_CREATION_TIME = "creation_time"; + + /** + * 获取k8s节点指标列表,预置数据 + * @return List 节点指标列表 + * */ + public static List retrieveNodeMetricsList() { + //todo Arrays.asList() && 不应该写成方法 + List nodeMetricsList = new ArrayList<>(NODE_METRICS_COUNT); + nodeMetricsList.add(NODE_METRICS_NODE_NAME); + nodeMetricsList.add(NODE_METRICS_LABELS); + nodeMetricsList.add(NODE_METRICS_IS_READY); + nodeMetricsList.add(NODE_METRICS_CAPACITY_CPU); + nodeMetricsList.add(NODE_METRICS_ALLOCATABLE_CPU); + nodeMetricsList.add(NODE_METRICS_CAPACITY_MEMORY); + nodeMetricsList.add(NODE_METRICS_ALLOCATABLE_MEMORY); + nodeMetricsList.add(NODE_METRICS_CREATION_TIME); + return nodeMetricsList; + } + + /** + * 获取k8s命名空间指标列表,预置数据 + * @return List 命名空间指标列表 + * */ + public static List retrieveNamespaceMetricsList() { + List namespaceMetricsList = new ArrayList<>(NAMESPACE_METRICS_COUNT); + namespaceMetricsList.add(NAMESPACE_METRICS_NAMESPACE_NAME); + namespaceMetricsList.add(NAMESPACE_METRICS_LABELS); + namespaceMetricsList.add(NAMESPACE_METRICS_CREATION_TIME); + return namespaceMetricsList; + } + + /** + * 获取k8s pod指标列表,预置数据 + * @return List pod指标列表 + * */ + public static List retrievePodMetricsList() { + List podMetricsList = new ArrayList<>(POD_METRICS_COUNT); + podMetricsList.add(POD_METRICS_POD_NAME); + podMetricsList.add(POD_METRICS_LABELS); + podMetricsList.add(POD_METRICS_POD_STATUS); + podMetricsList.add(POD_METRICS_IMAGES); + podMetricsList.add(POD_METRICS_RESTART); + podMetricsList.add(POD_METRICS_CREATION_TIME); + podMetricsList.add(POD_METRICS_NAMESPACE_NAME); + return podMetricsList; + } + + /** + * 提取k8s节点指标数据 + * @param allNodes 节点对象 + * @return Map> 节点指标数据 + * */ + @Nullable + public static Map> retrieveNodeMetricsData(V1NodeList allNodes) { + //参数校验 + if(null==allNodes || CollectionUtils.isEmpty(allNodes.getItems())){ + log.error("k8s客户端获取V1NodeList allNodes节点信息为空!"); + return null; + } + //初始化metricsMap + Map> nodeMetricsMap = new HashMap<>(NODE_METRICS_COUNT); + retrieveNodeMetricsList().forEach(nodeMetrics -> nodeMetricsMap.put(nodeMetrics, new ArrayList<>())); + //填充指标 拓展点(可以添加其他指标,填充到nodeMetricsMap中) + for(V1Node v1Node : allNodes.getItems()){ + //提取 node_name + if(v1Node.getMetadata()!=null){ + nodeMetricsMap.get(NODE_METRICS_NODE_NAME).add(v1Node.getMetadata().getName()); + }else{ + nodeMetricsMap.get(NODE_METRICS_NODE_NAME).add(StringUtils.EMPTY); + } + //提取 labels + if(v1Node.getMetadata()!=null){ + nodeMetricsMap.get(NODE_METRICS_LABELS).add(GsonUtil.toJson(v1Node.getMetadata().getLabels())); + }else{ + nodeMetricsMap.get(NODE_METRICS_LABELS).add(StringUtils.EMPTY); + } + //提取 is_ready + if(v1Node.getStatus()!=null && v1Node.getStatus().getConditions()!=null){ + v1Node.getStatus().getConditions().forEach(ele->{ + if("Ready".equals(ele.getType())){ + nodeMetricsMap.get(NODE_METRICS_IS_READY).add(ele.getStatus()); + } + }); + }else{ + nodeMetricsMap.get(NODE_METRICS_IS_READY).add(StringUtils.EMPTY); + } + //提取 capacity_cpu capacity_memory + if(v1Node.getStatus()!=null && v1Node.getStatus().getCapacity()!=null){ + Quantity quantity = v1Node.getStatus().getCapacity().get("cpu"); + if(quantity!=null){ + nodeMetricsMap.get(NODE_METRICS_CAPACITY_CPU).add(quantity.toString()); + }else{ + nodeMetricsMap.get(NODE_METRICS_CAPACITY_CPU).add(StringUtils.EMPTY); + } + quantity = v1Node.getStatus().getCapacity().get("memory"); + if(quantity!=null){ + nodeMetricsMap.get(NODE_METRICS_CAPACITY_MEMORY).add(quantity.toString()); + }else{ + nodeMetricsMap.get(NODE_METRICS_CAPACITY_MEMORY).add(StringUtils.EMPTY); + } + }else{ + nodeMetricsMap.get(NODE_METRICS_CAPACITY_CPU).add(StringUtils.EMPTY); + nodeMetricsMap.get(NODE_METRICS_CAPACITY_MEMORY).add(StringUtils.EMPTY); + } + + //提取 allocatable_cpu allocatable_memory + if(v1Node.getStatus()!=null && v1Node.getStatus().getAllocatable()!=null){ + Quantity quantity = v1Node.getStatus().getAllocatable().get("cpu"); + if(quantity!=null){ + nodeMetricsMap.get(NODE_METRICS_ALLOCATABLE_CPU).add(quantity.toString()); + }else{ + nodeMetricsMap.get(NODE_METRICS_ALLOCATABLE_MEMORY).add(StringUtils.EMPTY); + } + quantity = v1Node.getStatus().getAllocatable().get("memory"); + if(quantity!=null){ + nodeMetricsMap.get(NODE_METRICS_CAPACITY_MEMORY).add(quantity.toString()); + }else{ + nodeMetricsMap.get(NODE_METRICS_ALLOCATABLE_MEMORY).add(StringUtils.EMPTY); + } + }else{ + nodeMetricsMap.get(NODE_METRICS_ALLOCATABLE_CPU).add(StringUtils.EMPTY); + nodeMetricsMap.get(NODE_METRICS_ALLOCATABLE_MEMORY).add(StringUtils.EMPTY); + } + + //提取 creation_time + if(v1Node.getMetadata()!=null && v1Node.getMetadata().getCreationTimestamp()!=null){ + nodeMetricsMap.get(NODE_METRICS_CREATION_TIME) + .add(v1Node.getMetadata().getCreationTimestamp().toString()); + }else{ + nodeMetricsMap.get(NODE_METRICS_CREATION_TIME).add(StringUtils.EMPTY); + } + //todo:可以添加其他指标获取,注意,需要定义新的静态变量并添加到 retrieveNodeMetricsList 方法中 + + } + return nodeMetricsMap; + } + + /** + * 提取k8s命名空间指标数据 + * @param allNamespaces 命名空间对象 + * @return Map> 节点指标数据 + * */ + public static Map> retrieveNamespaceMetricsData( + V1NamespaceList allNamespaces) { + //参数校验 + if(null==allNamespaces || CollectionUtils.isEmpty(allNamespaces.getItems())){ + log.error("k8s客户端获取V1NamespaceList allNamespaces节点信息为空!"); + return null; + } + Map> namespaceMetricsMap = new HashMap<>(NODE_METRICS_COUNT); + retrieveNamespaceMetricsList().forEach(namespaceMetrics -> namespaceMetricsMap.put(namespaceMetrics, new ArrayList<>())); + for(V1Namespace v1Namespace : allNamespaces.getItems()){ + //提取 node_name + if(v1Namespace.getMetadata()!=null){ + namespaceMetricsMap.get(NAMESPACE_METRICS_NAMESPACE_NAME) + .add(v1Namespace.getMetadata().getName()); + }else{ + namespaceMetricsMap.get(NAMESPACE_METRICS_NAMESPACE_NAME).add(StringUtils.EMPTY); + } + //提取 labels + if(v1Namespace.getMetadata()!=null){ + namespaceMetricsMap.get(NAMESPACE_METRICS_LABELS).add(GsonUtil.toJson(v1Namespace.getMetadata().getLabels())); + }else{ + namespaceMetricsMap.get(NAMESPACE_METRICS_LABELS).add(StringUtils.EMPTY); + } + //提取 creation_time + if(v1Namespace.getMetadata()!=null && v1Namespace.getMetadata().getCreationTimestamp()!=null){ + namespaceMetricsMap.get(NAMESPACE_METRICS_CREATION_TIME) + .add(v1Namespace.getMetadata().getCreationTimestamp().toString()); + }else{ + namespaceMetricsMap.get(NAMESPACE_METRICS_CREATION_TIME).add(StringUtils.EMPTY); + } + //todo:可以添加其他指标获取,注意,需要定义新的静态变量并添加到 retrieveNamespaceMetricsList 方法中 + + } + return namespaceMetricsMap; + } + + /** + * 提取k8s pods指标数据 + * @param allPods pods对象 + * @return Map> pods指标数据 + * */ + public static Map> retrievePodMetricsData(V1PodList allPods) { + //参数校验 + if(null==allPods || CollectionUtils.isEmpty(allPods.getItems())){ + log.error("k8s客户端获取V1PodList allPods节点信息为空!"); + return null; + } + Map> podMetricsMap = new HashMap<>(NODE_METRICS_COUNT); + retrievePodMetricsList().forEach(podMetrics -> podMetricsMap.put(podMetrics, new ArrayList<>())); + for(V1Pod v1Pod : allPods.getItems()){ + //提取 pod_name + if(v1Pod.getMetadata()!=null){ + podMetricsMap.get(POD_METRICS_POD_NAME).add(v1Pod.getMetadata().getName()); + }else{ + podMetricsMap.get(POD_METRICS_POD_NAME).add(StringUtils.EMPTY); + } + //提取 labels + if(v1Pod.getMetadata()!=null){ + podMetricsMap.get(POD_METRICS_LABELS) + .add(GsonUtil.toJson(v1Pod.getMetadata().getLabels())); + }else{ + podMetricsMap.get(POD_METRICS_LABELS).add(StringUtils.EMPTY); + } + //提取 namespace_name + if(v1Pod.getMetadata()!=null){ + podMetricsMap.get(POD_METRICS_NAMESPACE_NAME).add(v1Pod.getMetadata().getNamespace()); + }else{ + podMetricsMap.get(POD_METRICS_NAMESPACE_NAME).add(StringUtils.EMPTY); + } + //提取 pod_status + if(v1Pod.getStatus()!=null){ + podMetricsMap.get(POD_METRICS_POD_STATUS).add(v1Pod.getStatus().getPhase()); + }else{ + podMetricsMap.get(POD_METRICS_POD_STATUS).add(StringUtils.EMPTY); + } + //提取 images + if(v1Pod.getSpec()!=null && v1Pod.getSpec().getContainers()!=null){ + StringBuilder stringBuilder = new StringBuilder(); + v1Pod.getSpec().getContainers().forEach(v1Container -> stringBuilder.append(v1Container.getImage()).append(";")); + podMetricsMap.get(POD_METRICS_IMAGES).add(stringBuilder.toString()); + }else{ + podMetricsMap.get(POD_METRICS_IMAGES).add(StringUtils.EMPTY); + } + //提取 restart + if(v1Pod.getStatus()!=null && v1Pod.getStatus().getContainerStatuses()!=null){ + int restartCount = v1Pod.getStatus().getContainerStatuses().stream() + .mapToInt(V1ContainerStatus::getRestartCount).sum(); + podMetricsMap.get(POD_METRICS_RESTART).add(String.valueOf(restartCount)); + }else{ + podMetricsMap.get(POD_METRICS_RESTART).add(StringUtils.EMPTY); + } + //提取 creation_time + if(v1Pod.getMetadata()!=null && v1Pod.getMetadata().getCreationTimestamp()!=null){ + podMetricsMap.get(POD_METRICS_CREATION_TIME) + .add(v1Pod.getMetadata().getCreationTimestamp().toString()); + }else{ + podMetricsMap.get(POD_METRICS_CREATION_TIME).add(StringUtils.EMPTY); + } + //todo:可以添加其他指标获取,注意,需要定义新的静态变量,然后添加到 retrievePodMetricsList 方法中 + + } + return podMetricsMap; + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/microservice/JsonPathParseResponse.java b/collector/src/main/java/com/usthe/collector/collect/microservice/JsonPathParseResponse.java new file mode 100644 index 00000000000..781d0fdaaf3 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/microservice/JsonPathParseResponse.java @@ -0,0 +1,134 @@ +package com.usthe.collector.collect.microservice; + +import com.usthe.collector.collect.AbstractParseResponse; +import com.usthe.collector.collect.strategy.ParseStrategyFactory; +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.collector.util.CollectUtil; +import com.usthe.collector.util.CollectorConstants; +import com.usthe.collector.util.JsonPathParser; +import com.usthe.common.entity.job.Configmap; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.entity.job.protocol.ServiceProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.model.ServicePodModel; +import com.usthe.common.util.CommonConstants; +import com.usthe.common.util.GsonUtil; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.TypeRef; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Map; + + +/** + * + * + * + */ +@Slf4j +public class JsonPathParseResponse implements AbstractParseResponse { + + public static JsonPathParseResponse getInstance() { + return JsonPathParseResponse.Singleton.INSTANCE; + } + + @Override + public void parseResponse(String resp, List aliasFields, HttpProtocol http, + CollectRep.MetricsData.Builder builder, Long responseTime) { + TypeRef>> typeRef = new TypeRef>>() {}; + List> results = JsonPathParser.parseContentWithJsonPath(resp, http.getParseScript(), typeRef); + int keywordNum = CollectUtil.countMatchKeyword(resp, http.getKeyword()); + for (Map stringMap : results) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String alias : aliasFields) { + Object value = stringMap.get(alias); + if (value != null) { + valueRowBuilder.addColumns(String.valueOf(value)); + } else { + if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(responseTime.toString()); + } else if (CollectorConstants.KEYWORD.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(Integer.toString(keywordNum)); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + } + builder.addValues(valueRowBuilder.build()); + } + } + + @Override + public void parseK8sApi(Metrics metrics, Object resp, Map podMap, List aliasFields, ServiceProtocol service, + CollectRep.MetricsData.Builder builder, Long responseTime) { + //解决乱序问题 + Configmap configmap = metrics.getChildParam().stream().filter(childParam -> { + return "parseScript".equals(childParam.getKey()); + }).findFirst().get(); + List parseScript = (List)configmap.getValue(); + List> results = JsonPathParser.parseContentWithJsonPath(GsonUtil.toJson(resp), service.getMetaData()); + for (Object stringMap : results) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + ServicePodModel model = new ServicePodModel(); + for (int i = 0; i < aliasFields.size(); i++) { + String script = parseScript.get(i); + String field = aliasFields.get(i); + List> list = null; + Object value = null; + String[] split = script.split("-->"); + switch (split[0]) { + case DispatchConstants.PARSE_SINGLE: + try { + list = JsonPathParser.parseContentWithJsonPath(GsonUtil.toJson(stringMap), split[1]); + } catch (Exception e) { + value = JsonPath.read(results, split[1]); + } + break; + case DispatchConstants.PARSE_GROUP: + try { + List> maps = JsonPathParser.parseContentWithJsonPath(GsonUtil.toJson(results), split[1]); + value = maps.size(); + } catch (Exception e) { + log.error("parse error!error detail:{}", e.getMessage()); + } + break; + } + if (list != null && list.size() == 1) { + switch (field) { + case "podName": + model.setPodName(String.valueOf(list.get(0))); + break; + case "podHost": + model.setPodHost(String.valueOf(list.get(0))); + break; + case "podPort": + model.setPodPort(String.valueOf(list.get(0))); + break; + case "status": + model.setStatus(String.valueOf(list.get(0))); + break; + } + valueRowBuilder.addColumns(String.valueOf(list.get(0) == null ? CommonConstants.NULL_VALUE : list.get(0))); + model.getMetricsMap().put(field, String.valueOf(list.get(0) == null ? CommonConstants.NULL_VALUE : list.get(0))); + } else { + valueRowBuilder.addColumns(String.valueOf(value == null ? CommonConstants.NULL_VALUE : value)); + model.getMetricsMap().put(field, String.valueOf(value == null ? CommonConstants.NULL_VALUE : value)); + } + } + podMap.put(model.getPodName(), model); + builder.addValues(valueRowBuilder.build()); + } + } + + @Override + public void afterPropertiesSet() throws Exception { + ParseStrategyFactory.register(DispatchConstants.PARSE_JSON_PATH, this); + } + + private static class Singleton { + private static final JsonPathParseResponse INSTANCE = new JsonPathParseResponse(); + } + +} diff --git a/collector/src/main/java/com/usthe/collector/collect/microservice/MicroServiceParentCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/microservice/MicroServiceParentCollectImpl.java new file mode 100644 index 00000000000..cf51f895c84 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/microservice/MicroServiceParentCollectImpl.java @@ -0,0 +1,113 @@ +package com.usthe.collector.collect.microservice; + +import com.usthe.collector.collect.AbstractCollect; +import com.usthe.collector.collect.AbstractParseResponse; +import com.usthe.collector.collect.http.HttpCollectImpl; +import com.usthe.collector.dispatch.entrance.internal.CollectJobService; +import com.usthe.collector.dispatch.entrance.internal.CollectResponseEventListener; +import com.usthe.collector.dispatch.export.MetricsDataExporter; +import com.usthe.collector.util.K8sClient; +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.collector.dispatch.MetricsTaskDispatch; +import com.usthe.common.entity.job.Configmap; +import com.usthe.common.entity.job.Job; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.entity.job.protocol.ServiceProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.entity.message.CollectRep.MetricsData.Builder; +import com.usthe.common.model.ServicePodModel; +import com.usthe.common.support.SpringContextHolder; +import com.usthe.common.util.GsonUtil; +import io.kubernetes.client.openapi.models.V1PodList; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * + * @Data: 2022/7/14 16:01 + * @Description: 物联微服务指标具体采集实现 + */ +@Slf4j +public class MicroServiceParentCollectImpl extends AbstractCollect { + + @Autowired + private MetricsTaskDispatch metricsTaskDispatch; + + @Autowired + private CollectJobService collectJobService; + + public static MicroServiceParentCollectImpl getInstance() { + return MicroServiceParentCollectImpl.Singleton.INSTANCE; + } + + @Override + public void collect(Builder builder, long appId, String app, Metrics metrics) { + //1. 获取当前k8s环境下,特定mosquitto集群下的mosquitto实例所在pod信息 + // 准备连接k8s集群的配置信息 + ServiceProtocol serviceProtocol = metrics.getService(); + List childNode = metrics.getChildNode(); + String k8sAPIServHost = serviceProtocol.getK8sHost(); + String k8sAPIServPort = serviceProtocol.getK8sAPIServPort(); + String k8sAPIToken = serviceProtocol.getK8sAPIToken(); + // 初始化k8s客户端,获取特定pod信息 + K8sClient k8sClient = new K8sClient(k8sAPIServHost, k8sAPIServPort, k8sAPIToken); + if(k8sClient.getApiClient()==null){ + log.info("Failed to get k8sAPIClient, k8sAPIServHost:{}, k8sAPIServPort:{}", + k8sAPIServHost, k8sAPIServPort); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("Failed to get k8sAPIClient"); + return; + } + //key:pod名称 value:host + Map podMap = new HashMap<>(16); + //检查k8sAPI服务器是否连接成功,若不成功直接返回Null + if(k8sClient!=null && k8sClient.getApiClient()!=null) { + V1PodList podList = k8sClient.getAllPodList(); + AbstractParseResponse invoke = new JsonPathParseResponse(); + try { + invoke.parseK8sApi(metrics,podList,podMap, metrics.getAliasFields(), metrics.getService(), builder, null); + }catch (Exception e){ + log.error("Error in parseK8sApi, error details:{}", e.getMessage()); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("Failed to get k8sAPIClient"); + } + } + if(podMap.isEmpty()){ + log.warn("warn in K8s, warn details: {} pod running is empty",app); + return; + } + for(Map.Entry mapEntry : podMap.entrySet()){ + Job job = new Job(); + job.setApp(app); + job.setMonitorId(appId); + job.setInterval(0); + job.setCyclic(false); + job.setTimestamp(System.currentTimeMillis()); + ServicePodModel model = mapEntry.getValue(); + List configmap = new ArrayList<>(); + job.setConfigmap(configmap); + childNode.forEach(metric -> { + HttpProtocol http = GsonUtil.fromJson(GsonUtil.toJson(serviceProtocol.getHttp()), HttpProtocol.class); + http.setHost(model.getPodHost()); + http.setPort(model.getPodPort()); + metric.setHttp(http); + metric.setParentMetrics(model.getMetricsMap()); + }); + job.setMetrics(childNode); + //下发一次性微服务采集任务 + collectJobService.collectSyncJobData(job); + + } + } + + private static class Singleton { + private static final MicroServiceParentCollectImpl INSTANCE = new MicroServiceParentCollectImpl(); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/microservice/parse/DefaultParseResponse.java b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/DefaultParseResponse.java new file mode 100644 index 00000000000..f418d617f94 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/DefaultParseResponse.java @@ -0,0 +1,75 @@ +package com.usthe.collector.collect.microservice.parse; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.usthe.collector.collect.AbstractParseResponse; +import com.usthe.collector.collect.strategy.ParseStrategyFactory; +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.collector.util.CollectUtil; +import com.usthe.collector.util.CollectorConstants; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * http https collect + * + * + */ +@Slf4j +public class DefaultParseResponse implements AbstractParseResponse { + @Override + public void parseResponse(String resp, List aliasFields, HttpProtocol http, + CollectRep.MetricsData.Builder builder, Long responseTime) { + JsonElement element = JsonParser.parseString(resp); + int keywordNum = CollectUtil.countMatchKeyword(resp, http.getKeyword()); + if (element.isJsonArray()) { + JsonArray array = element.getAsJsonArray(); + for (JsonElement jsonElement : array) { + if (jsonElement.isJsonObject()) { + JsonObject object = jsonElement.getAsJsonObject(); + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String alias : aliasFields) { + JsonElement valueElement = object.get(alias); + if (valueElement != null) { + String value = valueElement.toString(); + valueRowBuilder.addColumns(value); + } else { + if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(responseTime.toString()); + } else if (CollectorConstants.KEYWORD.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(Integer.toString(keywordNum)); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + } + builder.addValues(valueRowBuilder.build()); + } + } + } else if (element.isJsonObject()) { + JsonObject object = element.getAsJsonObject(); + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String alias : aliasFields) { + JsonElement valueElement = object.get(alias); + if (valueElement != null) { + String value = valueElement.toString(); + valueRowBuilder.addColumns(value); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + builder.addValues(valueRowBuilder.build()); + } + } + + @Override + public void afterPropertiesSet() throws Exception { + ParseStrategyFactory.register(DispatchConstants.PARSE_DEFAULT,this); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/microservice/parse/MicroParseResponse.java b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/MicroParseResponse.java new file mode 100644 index 00000000000..5cc48892fce --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/MicroParseResponse.java @@ -0,0 +1,38 @@ +package com.usthe.collector.collect.microservice.parse; + +import com.usthe.collector.collect.AbstractParseResponse; +import com.usthe.collector.collect.http.micro.AbstractMicroParse; +import com.usthe.collector.collect.http.micro.MicroParseCreater; +import com.usthe.collector.collect.strategy.ParseStrategyFactory; +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Map; + +/** + * + * + * @description:微服务解析链路选择 + */ +@Slf4j +public class MicroParseResponse implements AbstractParseResponse { + @Override + public void parseResponse(String resp,List fields, List aliasFields ,List jsonScript, HttpProtocol http, + Map> tempcloums,String kv) { + AbstractMicroParse microParse = MicroParseCreater.getMicroParse(); + microParse.handle(resp,fields,aliasFields,jsonScript,http,tempcloums,kv); + } + + @Override + public void parseResponse(String resp,String field, String aliasField ,String jsonScript, HttpProtocol http, + Map> tempcloums,String kv) { + AbstractMicroParse microParse = MicroParseCreater.getMicroParse(); + microParse.handle(resp,field,aliasField,jsonScript,http,tempcloums, kv); + } + @Override + public void afterPropertiesSet() throws Exception { + ParseStrategyFactory.register(DispatchConstants.PARSE_MICRO,this); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/microservice/parse/PrometheusParseResponse.java b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/PrometheusParseResponse.java new file mode 100644 index 00000000000..9ad2e2a3b46 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/PrometheusParseResponse.java @@ -0,0 +1,32 @@ +package com.usthe.collector.collect.microservice.parse; + +import com.usthe.collector.collect.AbstractParseResponse; +import com.usthe.collector.collect.http.promethus.AbstractPrometheusParse; +import com.usthe.collector.collect.http.promethus.PrometheusParseCreater; +import com.usthe.collector.collect.strategy.ParseStrategyFactory; +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.entity.message.CollectRep; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * + * + * @description: + */ +@Slf4j +public class PrometheusParseResponse implements AbstractParseResponse { + @Override + public void parseResponse(String resp, List aliasFields, HttpProtocol http, + CollectRep.MetricsData.Builder builder, Long responseTime) { + AbstractPrometheusParse prometheusParser = PrometheusParseCreater.getPrometheusParse(); + prometheusParser.handle(resp, aliasFields, http, builder); + } + + @Override + public void afterPropertiesSet() throws Exception { + ParseStrategyFactory.register(DispatchConstants.PARSE_PROMETHEUS, this); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/microservice/parse/SiteMapParseResponse.java b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/SiteMapParseResponse.java new file mode 100644 index 00000000000..08e6f0ea3df --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/SiteMapParseResponse.java @@ -0,0 +1,133 @@ +package com.usthe.collector.collect.microservice.parse; + +import com.usthe.collector.collect.AbstractParseResponse; +import com.usthe.collector.collect.common.http.CommonHttpClient; +import com.usthe.collector.collect.strategy.ParseStrategyFactory; +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.collector.util.CollectorConstants; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; +import com.usthe.common.util.IpDomainUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.util.EntityUtils; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import javax.net.ssl.SSLException; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +/** + * + * + * @description: + */ +@Slf4j +public class SiteMapParseResponse implements AbstractParseResponse { + @Override + public void parseResponse(String resp, List aliasFields, HttpProtocol http, + CollectRep.MetricsData.Builder builder, Long responseTime) { + List siteUrls = new LinkedList<>(); + // 使用xml解析 + boolean isXmlFormat = true; + try { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + DocumentBuilder db = dbf.newDocumentBuilder(); + Document document = db.parse(new ByteArrayInputStream(resp.getBytes(StandardCharsets.UTF_8))); + NodeList urlList = document.getElementsByTagName("url"); + for (int i = 0; i < urlList.getLength(); i++) { + Node urlNode = urlList.item(i); + NodeList childNodes = urlNode.getChildNodes(); + for (int k = 0; k < childNodes.getLength(); k++) { + Node currentNode = childNodes.item(k); + // 区分出text类型的node以及element类型的node + if (currentNode.getNodeType() == Node.ELEMENT_NODE && "loc".equals(currentNode.getNodeName())) { + //获取了loc节点的值 + siteUrls.add(currentNode.getFirstChild().getNodeValue()); + break; + } + } + } + } catch (Exception e) { + log.warn(e.getMessage()); + isXmlFormat = false; + } + // 若xml解析失败 用txt格式解析 + if (!isXmlFormat) { + try { + String[] urls = resp.split("\n"); + // 校验是否是URL + if (IpDomainUtil.isHasSchema(urls[0])) { + siteUrls.addAll(Arrays.asList(urls)); + } + } catch (Exception e) { + log.warn(e.getMessage(), e); + } + } + // 开始循环访问每个site url 采集其 http status code, resTime, 异常信息 + for (String siteUrl : siteUrls) { + String errorMsg = ""; + Integer statusCode = null; + long startTime = System.currentTimeMillis(); + try { + HttpGet httpGet = new HttpGet(siteUrl); + CloseableHttpResponse response = CommonHttpClient.getHttpClient().execute(httpGet); + statusCode = response.getStatusLine().getStatusCode(); + EntityUtils.consume(response.getEntity()); + } catch (ClientProtocolException e1) { + if (e1.getCause() != null) { + errorMsg = e1.getCause().getMessage(); + } else { + errorMsg = e1.getMessage(); + } + } catch (UnknownHostException e2) { + // 对端不可达 + errorMsg = "unknown host"; + } catch (InterruptedIOException | ConnectException | SSLException e3) { + // 对端连接失败 + errorMsg = "connect error: " + e3.getMessage(); + } catch (IOException e4) { + // 其它IO异常 + errorMsg = "io error: " + e4.getMessage(); + } catch (Exception e) { + errorMsg = "error: " + e.getMessage(); + } + long resTime = System.currentTimeMillis() - startTime; + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String alias : aliasFields) { + if (CollectorConstants.URL.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(siteUrl); + } else if (CollectorConstants.STATUS_CODE.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(statusCode == null ? + CommonConstants.NULL_VALUE : String.valueOf(statusCode)); + } else if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(String.valueOf(resTime)); + } else if (CollectorConstants.ERROR_MSG.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(errorMsg); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + builder.addValues(valueRowBuilder.build()); + } + } + + @Override + public void afterPropertiesSet() throws Exception { + ParseStrategyFactory.register(DispatchConstants.PARSE_SITE_MAP, this); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/microservice/parse/WebsiteParseResponse.java b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/WebsiteParseResponse.java new file mode 100644 index 00000000000..2886fade238 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/WebsiteParseResponse.java @@ -0,0 +1,44 @@ +package com.usthe.collector.collect.microservice.parse; + +import com.usthe.collector.collect.AbstractParseResponse; +import com.usthe.collector.collect.strategy.ParseStrategyFactory; +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.collector.util.CollectUtil; +import com.usthe.collector.util.CollectorConstants; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * + * + * @description: + */ +@Slf4j +public class WebsiteParseResponse implements AbstractParseResponse { + @Override + public void parseResponse(String resp, List aliasFields, HttpProtocol http, + CollectRep.MetricsData.Builder builder, Long responseTime) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + // 网站关键词数量监测 + int keywordNum = CollectUtil.countMatchKeyword(resp, http.getKeyword()); + for (String alias : aliasFields) { + if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(responseTime.toString()); + } else if (CollectorConstants.KEYWORD.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(Integer.toString(keywordNum)); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + builder.addValues(valueRowBuilder.build()); + } + + @Override + public void afterPropertiesSet() throws Exception { + ParseStrategyFactory.register(DispatchConstants.PARSE_WEBSITE, this); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/microservice/parse/XmlPathParseResponse.java b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/XmlPathParseResponse.java new file mode 100644 index 00000000000..3abf32d5809 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/microservice/parse/XmlPathParseResponse.java @@ -0,0 +1,29 @@ +package com.usthe.collector.collect.microservice.parse; + +import com.usthe.collector.collect.AbstractParseResponse; +import com.usthe.collector.collect.strategy.ParseStrategyFactory; +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.entity.message.CollectRep; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * + * + * @description: + */ +@Slf4j +public class XmlPathParseResponse implements AbstractParseResponse { + @Override + public void parseResponse(String resp, List aliasFields, HttpProtocol http, + CollectRep.MetricsData.Builder builder, Long responseTime) { + + } + + @Override + public void afterPropertiesSet() throws Exception { + ParseStrategyFactory.register(DispatchConstants.PARSE_XML_PATH, this); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/strategy/CollectStrategyFactory.java b/collector/src/main/java/com/usthe/collector/collect/strategy/CollectStrategyFactory.java new file mode 100644 index 00000000000..2cc05f8a302 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/strategy/CollectStrategyFactory.java @@ -0,0 +1,44 @@ +package com.usthe.collector.collect.strategy; + +import com.usthe.collector.collect.AbstractCollect; +import org.springframework.util.StringUtils; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * 数据收集策略工厂 + * + * + */ +public class CollectStrategyFactory { + /** + * 策略容器 + */ + private static ConcurrentHashMap strategy = new ConcurrentHashMap<>(); + + /** + * 获取注册的收集实现类 + * + * @param name + * @return + */ + public static AbstractCollect invoke(String name) { + AbstractCollect abstractCollect = strategy.get(name); + return abstractCollect; + + } + + /** + * 注册的收集实现类 + * + * @param name + * @param abstractCollect + */ + public static void register(String name, AbstractCollect abstractCollect) { + if (StringUtils.isEmpty(name) || abstractCollect == null) { + return; + } + strategy.put(name, abstractCollect); + } + +} diff --git a/collector/src/main/java/com/usthe/collector/collect/strategy/ParseStrategyFactory.java b/collector/src/main/java/com/usthe/collector/collect/strategy/ParseStrategyFactory.java new file mode 100644 index 00000000000..bde549f457b --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/strategy/ParseStrategyFactory.java @@ -0,0 +1,46 @@ +package com.usthe.collector.collect.strategy; + +import com.usthe.collector.collect.AbstractParseResponse; +import com.usthe.collector.dispatch.DispatchConstants; +import org.springframework.util.StringUtils; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * + * @description: 数据收集策略工厂 + */ +public class ParseStrategyFactory { + /** + * 策略容器 + */ + private static ConcurrentHashMap strategy = new ConcurrentHashMap<>(); + + /** + * 获取注册的收集实现类 + * + * @param name + * @return + */ + public static AbstractParseResponse invoke(String name) { + AbstractParseResponse abstractCollect = strategy.get(name); + if (abstractCollect == null) strategy.get(DispatchConstants.PARSE_DEFAULT); + return abstractCollect; + + } + + /** + * 注册的收集实现类 + * + * @param name + * @param abstractParseResponse + */ + public static void register(String name, AbstractParseResponse abstractParseResponse) { + if (StringUtils.isEmpty(name) || abstractParseResponse == null) { + return; + } + strategy.put(name, abstractParseResponse); + } + +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java b/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java index 84f6be2efb2..9c10bb59983 100644 --- a/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java +++ b/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java @@ -30,6 +30,12 @@ public interface DispatchConstants { * protocol http */ String PROTOCOL_HTTP = "http"; + + /** + * protocol 微服务http + */ + String PROTOCOL_HTTP_MICRO = "http_micro"; + /** * protocol icmp */ @@ -66,6 +72,15 @@ public interface DispatchConstants { * protocol ssl Certificate - custom */ String PROTOCOL_SSL_CERT = "ssl_cert"; + /** + * protocol 协议 k8s + */ + String PROTOCOL_K8S = "k8s"; + + /** + * protocol 协议 microService + */ + String PROTOCOL_SERVICE = "service"; // Protocol type related - end // 协议类型相关 - end // @@ -119,9 +134,31 @@ public interface DispatchConstants { * 解析方式 prometheus规则 */ String PARSE_PROMETHEUS = "prometheus"; + String PARSE_MICRO = "micro"; + + String PARSE_CHAIN_REQUESTS = "requests"; String PARSE_PROMETHEUS_ACCEPT = "application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1"; String PARSE_PROMETHEUS_VECTOR = "vector"; String PARSE_PROMETHEUS_MATRIX = "matrix"; // http协议相关 - end // + // k8s相关 -- start// + /** + * k8s 解析数据按照获取值的方式 + */ + String PARSE_SINGLE = "single"; + /** + * k8s 解析数据算数组的大小 + */ + String PARSE_GROUP = "group"; + // k8s相关 -- end// + // 微服务相关 -- start// + /** + * 参数configParam的key + */ + String CHILD_REQUESTS = "requests"; + /** + * 通用解析链路 + */ + String PARSE_CHAIN_COMMON = "common"; } diff --git a/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java index 85a6e93e13f..b67a15fa02b 100644 --- a/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java +++ b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java @@ -22,9 +22,12 @@ import com.usthe.collector.collect.AbstractCollect; import com.usthe.collector.collect.database.JdbcCommonCollect; import com.usthe.collector.collect.http.HttpCollectImpl; +import com.usthe.collector.collect.http.MicroServiceActuatorHttpCollectImpl; import com.usthe.collector.collect.http.SslCertificateCollectImpl; import com.usthe.collector.collect.icmp.IcmpCollectImpl; import com.usthe.collector.collect.jmx.JmxCollectImpl; +import com.usthe.collector.collect.k8s.K8sCollectImpl; +import com.usthe.collector.collect.microservice.MicroServiceParentCollectImpl; import com.usthe.collector.collect.redis.RedisSingleCollectImpl; import com.usthe.collector.collect.snmp.SnmpCollectImpl; import com.usthe.collector.collect.ssh.SshCollectImpl; @@ -146,6 +149,7 @@ public void run() { // According to the indicator group collection protocol, application type, etc., dispatch to the real application indicator group collection implementation class // 根据指标组采集协议,应用类型等来调度到真正的应用指标组采集实现类 + //todo 是否需要用策略工厂 AbstractCollect abstractCollect = null; switch (metrics.getProtocol()) { case DispatchConstants.PROTOCOL_HTTP: @@ -175,6 +179,15 @@ public void run() { case DispatchConstants.PROTOCOL_SSL_CERT: abstractCollect = SslCertificateCollectImpl.getInstance(); break; + case DispatchConstants.PROTOCOL_K8S: + abstractCollect = K8sCollectImpl.getInstance(); + break; + case DispatchConstants.PROTOCOL_SERVICE: + abstractCollect = MicroServiceParentCollectImpl.getInstance(); + break; + case DispatchConstants.PROTOCOL_HTTP_MICRO: + abstractCollect = MicroServiceActuatorHttpCollectImpl.getInstance(); + break; default: break; } diff --git a/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java b/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java index eb50caaca4f..b3397b1a03d 100644 --- a/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java +++ b/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java @@ -17,16 +17,15 @@ package com.usthe.collector.util; -import com.jayway.jsonpath.Configuration; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.Option; -import com.jayway.jsonpath.ParseContext; +import com.jayway.jsonpath.*; import com.jayway.jsonpath.spi.cache.CacheProvider; import com.jayway.jsonpath.spi.cache.LRUCache; +import com.jayway.jsonpath.spi.json.GsonJsonProvider; +import com.jayway.jsonpath.spi.json.JsonProvider; +import com.jayway.jsonpath.spi.mapper.GsonMappingProvider; +import com.jayway.jsonpath.spi.mapper.MappingProvider; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; /** * json path parser @@ -38,6 +37,26 @@ public class JsonPathParser { private static final ParseContext PARSER; static { + Configuration.setDefaults(new Configuration.Defaults() { + + private final GsonJsonProvider jsonProvider = new GsonJsonProvider(); + private final MappingProvider mappingProvider = new GsonMappingProvider(); + + @Override + public JsonProvider jsonProvider() { + return jsonProvider; + } + + @Override + public MappingProvider mappingProvider() { + return mappingProvider; + } + + @Override + public Set