From 8998c0f26632c34787a38ebacbc7563b82669449 Mon Sep 17 00:00:00 2001
From: ceilzcx <1758619238@qq.com>
Date: Wed, 7 Jun 2023 15:59:21 +0800
Subject: [PATCH 1/6] implement rocketmq metrics data collector
---
collector/pom.xml | 7 +
.../collect/mq/RocketMQCollectData.java | 57 +++
.../collect/mq/RocketMQSingleCollectImpl.java | 332 ++++++++++++++++++
.../collector/dispatch/DispatchConstants.java | 4 +
...ertzbeat.collector.collect.AbstractCollect | 1 +
.../hertzbeat/common/entity/job/Metrics.java | 4 +
.../entity/job/protocol/RocketMQProtocol.java | 44 +++
.../main/resources/define/app-rocketmq.yml | 144 ++++++++
8 files changed, 593 insertions(+)
create mode 100644 collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketMQCollectData.java
create mode 100644 collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketMQSingleCollectImpl.java
create mode 100644 common/src/main/java/org/dromara/hertzbeat/common/entity/job/protocol/RocketMQProtocol.java
create mode 100644 manager/src/main/resources/define/app-rocketmq.yml
diff --git a/collector/pom.xml b/collector/pom.xml
index 7cb2ac6d33b..5f6384ea97d 100644
--- a/collector/pom.xml
+++ b/collector/pom.xml
@@ -163,6 +163,13 @@
snmp4j
3.6.7
+
+
+
+ org.apache.rocketmq
+ rocketmq-tools
+ 4.9.4
+
diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketMQCollectData.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketMQCollectData.java
new file mode 100644
index 00000000000..2c661915de4
--- /dev/null
+++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketMQCollectData.java
@@ -0,0 +1,57 @@
+package org.dromara.hertzbeat.collector.collect.mq;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * rocketmq采集数据实体类
+ *
+ * @author ceilzcx
+ * @since 5/6/2023
+ */
+@Data
+public class RocketMQCollectData {
+
+ private List clusterBrokerDataList;
+
+ private List consumerInfoList;
+
+ @Data
+ public static class ClusterBrokerData {
+
+ private Long brokerId;
+
+ private String address;
+
+ private String version;
+
+ private double producerMessageTPS;
+
+ private double consumerMessageTPS;
+
+ private long yesterdayProduceCount;
+
+ private long todayProduceCount;
+
+ private long yesterdayConsumeCount;
+
+ private long todayConsumeCount;
+ }
+
+ @Data
+ public static class ConsumerInfo {
+
+ private String consumerGroup;
+
+ private int clientQuantity;
+
+ private String messageModel;
+
+ private String consumeType;
+
+ private double consumeTps;
+
+ private long diffTotal;
+ }
+}
diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketMQSingleCollectImpl.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketMQSingleCollectImpl.java
new file mode 100644
index 00000000000..efe24cacff6
--- /dev/null
+++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketMQSingleCollectImpl.java
@@ -0,0 +1,332 @@
+package org.dromara.hertzbeat.collector.collect.mq;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.dromara.hertzbeat.collector.collect.AbstractCollect;
+import org.dromara.hertzbeat.collector.dispatch.DispatchConstants;
+import org.dromara.hertzbeat.collector.util.JsonPathParser;
+import org.dromara.hertzbeat.common.constants.CommonConstants;
+import org.dromara.hertzbeat.common.entity.job.Metrics;
+import org.dromara.hertzbeat.common.entity.job.protocol.RocketMQProtocol;
+import org.dromara.hertzbeat.common.entity.message.CollectRep;
+import org.dromara.hertzbeat.common.util.CommonUtil;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.util.Assert;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * rocketmq采集实现类
+ *
+ * @author ceilzcx
+ * @since 5/6/2023
+ */
+@Slf4j
+public class RocketMQSingleCollectImpl extends AbstractCollect implements DisposableBean {
+
+ private static final Set SYSTEM_GROUP_SET = new HashSet<>();
+
+ private final ExecutorService executorService;
+
+ static {
+ // system consumer group
+ SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_CONSUMER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.ONS_HTTP_PROXY_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PULL_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PERMISSION_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_OWNER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS);
+ }
+
+ public RocketMQSingleCollectImpl() {
+ Runtime runtime = Runtime.getRuntime();
+ int corePoolSize = Math.max(8, runtime.availableProcessors());
+ int maximumPoolSize = Math.max(16, runtime.availableProcessors());
+ ThreadFactory threadFactory = new ThreadFactory() {
+ private final AtomicLong threadIndex = new AtomicLong(0);
+
+ @Override
+ public Thread newThread(@NotNull Runnable r) {
+ return new Thread(r, "RocketMQCollectGroup_" + this.threadIndex.incrementAndGet());
+ }
+ };
+ this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(5000), threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
+ }
+
+ @Override
+ public void destroy() {
+ ThreadUtils.shutdownGracefully(this.executorService, 10L, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) {
+ try {
+ preCheck(metrics);
+ } catch (Exception e) {
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg(e.getMessage());
+ return;
+ }
+ DefaultMQAdminExt mqAdminExt = null;
+ try {
+ mqAdminExt = this.createMQAdminExt(metrics);
+ mqAdminExt.start();
+
+ RocketMQCollectData rocketMQCollectData = new RocketMQCollectData();
+ this.collectData(mqAdminExt, rocketMQCollectData);
+
+ this.fillBuilder(rocketMQCollectData, builder, metrics.getAliasFields(), metrics.getRocketmq().getParseScript());
+
+ } catch (Exception e) {
+ builder.setCode(CollectRep.Code.FAIL);
+ String message = CommonUtil.getMessageFromThrowable(e);
+ builder.setMsg(message);
+ } finally {
+ if (mqAdminExt != null) {
+ mqAdminExt.shutdown();
+ }
+ }
+ }
+
+ @Override
+ public String supportProtocol() {
+ return DispatchConstants.PROTOCOL_ROCKETMQ;
+ }
+
+ /**
+ * 采集前置条件, 入参判断
+ * @param metrics 数据指标
+ */
+ private void preCheck(Metrics metrics) {
+ if (metrics == null || metrics.getRocketmq() == null) {
+ throw new IllegalArgumentException("Mongodb collect must has rocketmq params");
+ }
+ RocketMQProtocol rocketmq = metrics.getRocketmq();
+ Assert.hasText(rocketmq.getNamesrvHost(), "Rocketmq Protocol namesrvHost is required.");
+ Assert.hasText(rocketmq.getNamesrvPort(), "Rocketmq Protocol namesrvPort is required.");
+ }
+
+ /**
+ * 创建DefaultMQAdminExt实体类; 这里有个小问题, 是否需要每次都重新创建
+ * @param metrics 数据指标
+ * @return DefaultMQAdminExt
+ */
+ private DefaultMQAdminExt createMQAdminExt(Metrics metrics) {
+ RocketMQProtocol rocketMQProtocol = metrics.getRocketmq();
+ assert rocketMQProtocol != null;
+ RPCHook rpcHook = null;
+ if (StringUtils.isNotBlank(rocketMQProtocol.getAccessKey()) && StringUtils.isNotBlank(rocketMQProtocol.getSecretKey())) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(rocketMQProtocol.getAccessKey(), rocketMQProtocol.getSecretKey()));
+ }
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook, 5000L);
+ defaultMQAdminExt.setNamesrvAddr(rocketMQProtocol.getNamesrvHost() + ":" + rocketMQProtocol.getNamesrvPort());
+ defaultMQAdminExt.setInstanceName("admin-" + System.currentTimeMillis());
+ return defaultMQAdminExt;
+ }
+
+ /**
+ * 采集rocketmq数据
+ * @param mqAdminExt rocketmq提供的远程调用类
+ * @param rocketMQCollectData rocketmq数据采集类
+ * @throws Exception 远程调用异常
+ */
+ private void collectData(DefaultMQAdminExt mqAdminExt, RocketMQCollectData rocketMQCollectData) throws Exception {
+ this.collectClusterData(mqAdminExt, rocketMQCollectData);
+ this.collectConsumerData(mqAdminExt, rocketMQCollectData);
+ }
+
+ /**
+ * 采集rocketmq的集群数据
+ * @param mqAdminExt rocketmq提供的远程调用类
+ * @param rocketMQCollectData rocketmq数据采集类
+ * @throws Exception 远程调用异常
+ */
+ private void collectClusterData(DefaultMQAdminExt mqAdminExt, RocketMQCollectData rocketMQCollectData) throws Exception {
+ try {
+ List clusterBrokerDataList = new ArrayList<>();
+ rocketMQCollectData.setClusterBrokerDataList(clusterBrokerDataList);
+
+ ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
+
+ for (Map.Entry entry : brokerData.getBrokerAddrs().entrySet()) {
+ RocketMQCollectData.ClusterBrokerData clusterBrokerData = new RocketMQCollectData.ClusterBrokerData();
+ clusterBrokerDataList.add(clusterBrokerData);
+
+ clusterBrokerData.setBrokerId(entry.getKey());
+ clusterBrokerData.setAddress(entry.getValue());
+
+ KVTable kvTable = mqAdminExt.fetchBrokerRuntimeStats(entry.getValue());
+ clusterBrokerData.setVersion(kvTable.getTable().get("brokerVersionDesc"));
+
+ String putTps = kvTable.getTable().get("putTps");
+ if (StringUtils.isNotEmpty(putTps)) {
+ String[] putTpsArr = putTps.split(" ");
+ clusterBrokerData.setProducerMessageTPS(Double.parseDouble(putTpsArr[0]));
+ }
+
+ String getTransferredTps = kvTable.getTable().get("getTransferedTps");
+ if (StringUtils.isNotEmpty(getTransferredTps)) {
+ String[] getTransferredTpsArr = getTransferredTps.split(" ");
+ clusterBrokerData.setConsumerMessageTPS(Double.parseDouble(getTransferredTpsArr[0]));
+ }
+
+ String msgPutTotalTodayMorning = kvTable.getTable().get("msgPutTotalTodayMorning");
+ String msgPutTotalYesterdayMorning = kvTable.getTable().get("msgPutTotalYesterdayMorning");
+ if (StringUtils.isNotEmpty(msgPutTotalTodayMorning) && StringUtils.isNotEmpty(msgPutTotalYesterdayMorning)) {
+ long yesterdayProduceCount = Long.parseLong(msgPutTotalTodayMorning) - Long.parseLong(msgPutTotalYesterdayMorning);
+ clusterBrokerData.setYesterdayProduceCount(yesterdayProduceCount);
+ }
+
+ String msgGetTotalTodayMorning = kvTable.getTable().get("msgGetTotalTodayMorning");
+ String msgGetTotalYesterdayMorning = kvTable.getTable().get("msgGetTotalYesterdayMorning");
+ if (StringUtils.isNotEmpty(msgGetTotalTodayMorning) && StringUtils.isNotEmpty(msgGetTotalYesterdayMorning)) {
+ long yesterdayConsumerCount = Long.parseLong(msgGetTotalTodayMorning) - Long.parseLong(msgGetTotalYesterdayMorning);
+ clusterBrokerData.setYesterdayConsumeCount(yesterdayConsumerCount);
+ }
+
+ String msgPutTotalTodayNow = kvTable.getTable().get("msgPutTotalTodayNow");
+ if (StringUtils.isNotEmpty(msgPutTotalTodayNow) && StringUtils.isNotEmpty(msgPutTotalTodayMorning)) {
+ long todayProduceCount = Long.parseLong(msgPutTotalTodayNow) - Long.parseLong(msgPutTotalTodayMorning);
+ clusterBrokerData.setTodayProduceCount(todayProduceCount);
+ }
+
+ String msgGetTotalTodayNow = kvTable.getTable().get("msgGetTotalTodayNow");
+ if (StringUtils.isNotEmpty(msgGetTotalTodayNow) && StringUtils.isNotEmpty(msgGetTotalTodayMorning)) {
+ long todayConsumerCount = Long.parseLong(msgGetTotalTodayNow) - Long.parseLong(msgGetTotalTodayMorning);
+ clusterBrokerData.setTodayConsumeCount(todayConsumerCount);
+ }
+ }
+
+ }
+ } catch (Exception e) {
+ log.warn("collect rocketmq cluster data error", e);
+ throw e;
+ }
+ }
+
+ /**
+ * 采集rocketmq的消费者数据
+ * @param mqAdminExt rocketmq提供的远程调用类
+ * @param rocketMQCollectData rocketmq数据采集类
+ * @throws Exception 远程调用异常
+ */
+ private void collectConsumerData(DefaultMQAdminExt mqAdminExt, RocketMQCollectData rocketMQCollectData) throws Exception {
+ Set consumerGroupSet = new HashSet<>();
+ try {
+ // 获取consumerGroup集合
+ ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
+ SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+ consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+ }
+
+ List consumerInfoList = Collections.synchronizedList(Lists.newArrayList());
+ rocketMQCollectData.setConsumerInfoList(consumerInfoList);
+ CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
+ for (String consumerGroup : consumerGroupSet) {
+ if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
+ continue;
+ }
+ executorService.submit(() -> {
+ RocketMQCollectData.ConsumerInfo consumerInfo = new RocketMQCollectData.ConsumerInfo();
+ consumerInfoList.add(consumerInfo);
+ consumerInfo.setConsumerGroup(consumerGroup);
+ try {
+ ConsumeStats consumeStats = null;
+ try {
+ consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);
+ }
+ catch (Exception e) {
+ log.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
+ }
+ if (consumeStats != null) {
+ consumerInfo.setConsumeTps(consumeStats.getConsumeTps());
+ consumerInfo.setDiffTotal(consumeStats.computeTotalDiff());
+ }
+
+ ConsumerConnection consumerConnection = null;
+ try {
+ consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
+ }
+ catch (Exception e) {
+ log.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
+ }
+ if (consumerConnection != null) {
+ consumerInfo.setClientQuantity(consumerConnection.getConnectionSet().size());
+ consumerInfo.setMessageModel(consumerConnection.getMessageModel().getModeCN());
+ consumerInfo.setConsumeType(consumerConnection.getConsumeType().getTypeCN());
+ }
+ } catch (Exception e) {
+ log.warn("examineConsumeStats or examineConsumerConnectionInfo error, {}", consumerGroup, e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+ }
+
+ if (!countDownLatch.await(10, TimeUnit.SECONDS)) {
+ log.warn("examineConsumeStats or examineConsumerConnectionInfo timeout");
+ }
+ } catch (Exception e) {
+ log.warn("collect rocketmq consume data error", e);
+ throw e;
+ }
+ }
+
+ /**
+ * 采集数据填充到builder
+ * @param rocketMQCollectData rocketmq数据采集类
+ * @param builder metrics data builder
+ * @param aliasFields 字段别名
+ * @param parseScript JSON的base path
+ */
+ private void fillBuilder(RocketMQCollectData rocketMQCollectData, CollectRep.MetricsData.Builder builder, List aliasFields, String parseScript) {
+ String dataJson = JSONObject.toJSONString(rocketMQCollectData);
+ List