diff --git a/README.md b/README.md index 4cd4b16d..f9c8c2bc 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ ---------- ## 目前运维的规模 -1. 服务器:30台+ +1. 服务器:40台+ 2. 集群:5个+ 3. topic:370个+ 4. 生产消费消息量/日:10亿条+ @@ -77,4 +77,4 @@ MQCloud QQ交流群:474960759 -使用方式请参考[wiki](https://github.com/sohutv/sohu-tv-mq/wiki)。 +使用方式请参考[wiki](https://github.com/sohutv/sohu-tv-mq/wiki)。 \ No newline at end of file diff --git a/mq-client-common-open/pom.xml b/mq-client-common-open/pom.xml index 09fcfc08..11d8100a 100644 --- a/mq-client-common-open/pom.xml +++ b/mq-client-common-open/pom.xml @@ -6,7 +6,7 @@ com.sohu.tv mq - 1.6 + 1.7 mq-client-common-open diff --git a/mq-client-common-open/src/main/java/com/sohu/tv/mq/common/AbstractConfig.java b/mq-client-common-open/src/main/java/com/sohu/tv/mq/common/AbstractConfig.java index 79a60246..a1c1f4d3 100644 --- a/mq-client-common-open/src/main/java/com/sohu/tv/mq/common/AbstractConfig.java +++ b/mq-client-common-open/src/main/java/com/sohu/tv/mq/common/AbstractConfig.java @@ -4,7 +4,7 @@ import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang3.NotImplementedException; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; @@ -225,7 +225,7 @@ protected void initTrace() { * @param traceDispatcher */ protected void registerTraceDispatcher(AsyncTraceDispatcher traceDispatcher) { - throw new NotImplementedException(); + throw new NotImplementedException("not impl!"); } public boolean isSampleEnabled() { diff --git a/mq-client-common-open/src/main/java/com/sohu/tv/mq/stats/InvokeStats.java b/mq-client-common-open/src/main/java/com/sohu/tv/mq/stats/InvokeStats.java index 8bb8337c..f681000f 100644 --- a/mq-client-common-open/src/main/java/com/sohu/tv/mq/stats/InvokeStats.java +++ b/mq-client-common-open/src/main/java/com/sohu/tv/mq/stats/InvokeStats.java @@ -7,7 +7,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; /** * 调用统计 @@ -161,7 +160,7 @@ public Map getMap(){ */ public class TimeStats { // 最大耗时 - private AtomicReference maxTimeReference = new AtomicReference(0L); + private AtomicLong maxTimeReference = new AtomicLong(0L); // 调用次数统计 private AtomicLong count = new AtomicLong(); // 调用时间统计 @@ -182,18 +181,18 @@ public void increment(long timeInMillis) { // 记录耗时 time.addAndGet(timeInMillis); // 记录最大耗时 - while (true) { + for(int i = 0; i < 10; ++i) { long maxTime = maxTimeReference.get(); if (maxTime >= timeInMillis) { return; } if (maxTimeReference.compareAndSet(maxTime, timeInMillis)) { - break; + return; } } } - public AtomicReference getMaxTimeReference() { + public AtomicLong getMaxTimeReference() { return maxTimeReference; } @@ -240,7 +239,7 @@ public static class InvokeStatsResult { private Map exceptionMap; public void init(TimeStats timeStats) { - setMaxTime(timeStats.getMaxTimeReference().get().intValue()); + setMaxTime((int)timeStats.getMaxTimeReference().get()); double time = timeStats.getTime().get(); long count = timeStats.getCount().get(); // 保留一位小数 diff --git a/mq-client-common-open/src/main/java/com/sohu/tv/mq/util/Version.java b/mq-client-common-open/src/main/java/com/sohu/tv/mq/util/Version.java index 7a092363..4ffeacbc 100644 --- a/mq-client-common-open/src/main/java/com/sohu/tv/mq/util/Version.java +++ b/mq-client-common-open/src/main/java/com/sohu/tv/mq/util/Version.java @@ -7,6 +7,6 @@ public class Version { public static String get() { - return "1.6"; + return "1.7"; } } diff --git a/mq-client-open/pom.xml b/mq-client-open/pom.xml index 80d57c64..6a9f56d3 100644 --- a/mq-client-open/pom.xml +++ b/mq-client-open/pom.xml @@ -6,7 +6,7 @@ com.sohu.tv mq - 1.6 + 1.7 mq-client-open diff --git a/mq-cloud-common/pom.xml b/mq-cloud-common/pom.xml index 8a1ca62a..c8b0f57c 100644 --- a/mq-cloud-common/pom.xml +++ b/mq-cloud-common/pom.xml @@ -6,7 +6,7 @@ com.sohu.tv mq - 1.6 + 1.7 mq-cloud-common diff --git a/mq-cloud/pom.xml b/mq-cloud/pom.xml index c952a6f5..9ebc696a 100644 --- a/mq-cloud/pom.xml +++ b/mq-cloud/pom.xml @@ -5,7 +5,7 @@ com.sohu.tv mq - 1.6 + 1.7 mq-cloud diff --git a/mq-cloud/sql/1.6-1.7.sql b/mq-cloud/sql/1.6-1.7.sql new file mode 100644 index 00000000..4b4d9a75 --- /dev/null +++ b/mq-cloud/sql/1.6-1.7.sql @@ -0,0 +1,7 @@ +alter table `audit_topic` add column `test_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:非测试topic,1:测试topic'; + +alter table `topic` add column `info` varchar(360) DEFAULT NULL COMMENT 'topic描述'; + +alter table `topic` add column `delay_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不发送延迟消息,1:发送延迟消息。注:此字段不强制该topic的消息类型,仅为流量展示'; + +alter table `audit_topic` add column `delay_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不发送延迟消息,1:发送延迟消息。注:此字段不强制该topic的消息类型,仅为流量展示'; \ No newline at end of file diff --git a/mq-cloud/sql/init.sql b/mq-cloud/sql/init.sql index 43df5061..77ccaa6a 100644 --- a/mq-cloud/sql/init.sql +++ b/mq-cloud/sql/init.sql @@ -105,6 +105,9 @@ CREATE TABLE `audit_topic` ( `ordered` int(4) NOT NULL DEFAULT '0' COMMENT '0:无序,1:有序', `trace_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不开启trace,1:开启trace', `transaction_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不开启事务,1:开启事务', + `delay_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不发送延迟消息,1:发送延迟消息。注:此字段不强制该topic的消息类型', + `test_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:非测试topic,1:测试topic', + `info` varchar(360) DEFAULT NULL COMMENT 'topic描述', `qps` int(11) DEFAULT NULL COMMENT '消息量qps预估', `qpd` int(11) DEFAULT NULL COMMENT '一天消息量预估', UNIQUE KEY `name` (`name`) @@ -424,6 +427,7 @@ CREATE TABLE `topic` ( `ordered` int(4) NOT NULL DEFAULT '0' COMMENT '0:无序,1:有序', `count` int(11) DEFAULT NULL COMMENT 'topic put times', `trace_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不开启trace,1:开启trace', + `delay_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不发送延迟消息,1:发送延迟消息。注:此字段不强制该topic的消息类型', `create_date` date NOT NULL, `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/AuditTopic.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/AuditTopic.java index c636a10e..c59e0288 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/AuditTopic.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/AuditTopic.java @@ -23,6 +23,10 @@ public class AuditTopic { private int traceEnabled; // 是否开启事务 private int transactionEnabled; + + private int testEnabled; + + private int delayEnabled; public long getAid() { return aid; @@ -104,11 +108,31 @@ public void setTransactionEnabled(int transactionEnabled) { this.transactionEnabled = transactionEnabled; } + public int getTestEnabled() { + return testEnabled; + } + + public void setTestEnabled(int testEnabled) { + this.testEnabled = testEnabled; + } + + public boolean testEnabled() { + return testEnabled == 1; + } + + public int getDelayEnabled() { + return delayEnabled; + } + + public void setDelayEnabled(int delayEnabled) { + this.delayEnabled = delayEnabled; + } + @Override public String toString() { return "AuditTopic [aid=" + aid + ", name=" + name + ", queueNum=" + queueNum + ", ordered=" + ordered + ", producer=" + producer + ", qpd=" + qpd + ", qps=" + qps + ", traceEnabled=" + traceEnabled - + ", transactionEnabled=" + transactionEnabled + "]"; + + ", transactionEnabled=" + transactionEnabled + ", testEnabled=" + testEnabled + "]"; } } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/Cluster.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/Cluster.java index 22f22e7b..77e62bec 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/Cluster.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/Cluster.java @@ -89,6 +89,10 @@ public boolean isEnableTransaction() { return YES == transactionEnabled; } + public boolean test() { + return !online(); + } + @Override public int hashCode() { final int prime = 31; diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/Topic.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/Topic.java index 18427f31..6b0e310a 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/Topic.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/Topic.java @@ -38,6 +38,12 @@ public class Topic { // 是否开启trace private int traceEnabled; + + // topic描述 + private String info; + + // 是否延迟消息 + private int delayEnabled; public long getId() { return id; @@ -123,6 +129,26 @@ public void setTraceEnabled(int traceEnabled) { this.traceEnabled = traceEnabled; } + public String getInfo() { + return info; + } + + public void setInfo(String info) { + this.info = info; + } + + public int getDelayEnabled() { + return delayEnabled; + } + + public void setDelayEnabled(int delayEnabled) { + this.delayEnabled = delayEnabled; + } + + public boolean delayEnabled() { + return delayEnabled == 1; + } + @Override public String toString() { return "Topic [id=" + id + ", clusterId=" + clusterId + ", name=" + name + ", queueNum=" + queueNum diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/TraceMessageDetail.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/TraceMessageDetail.java index 69e820f4..28240b2d 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/TraceMessageDetail.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/TraceMessageDetail.java @@ -30,6 +30,9 @@ public class TraceMessageDetail { // 唯一id private String requestId; + // broker address + private String broker; + public TraceType getTraceType() { return traceType; } @@ -73,12 +76,8 @@ public Integer getCostTime() { return costTime; } - public void setCostTime(int costTime) { - if (costTime == 0) { - this.costTime = null; - } else { - this.costTime = costTime; - } + public void setCostTime(Integer costTime) { + this.costTime = costTime; } public Boolean isSuccess() { @@ -148,5 +147,18 @@ public String getRequestId() { public void setRequestId(String requestId) { this.requestId = requestId; } + + public String getBroker() { + return broker; + } + + public void setBroker(String broker) { + this.broker = broker; + } + + public void setStoreHost(String storeHost) { + // 只取ip + setBroker(storeHost.split(":")[0]); + } } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/User.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/User.java index 9ede0545..b08b33c8 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/User.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/User.java @@ -2,6 +2,7 @@ import java.util.Date; +import org.apache.commons.lang3.StringUtils; import org.hibernate.validator.constraints.Email; import org.hibernate.validator.constraints.Range; @@ -121,6 +122,13 @@ public void setPassword(String password) { this.password = password; } + public String notBlankName(){ + if (StringUtils.isBlank(getName())) { + return getEmailName(); + } + return getName(); + } + @Override public String toString() { return "User [id=" + id + ", name=" + name + ", email=" + email + ", mobile=" + mobile + ", type=" + type diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/conf/CommonConfiguration.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/conf/CommonConfiguration.java index 6fbefa60..4f9278d3 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/conf/CommonConfiguration.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/conf/CommonConfiguration.java @@ -224,7 +224,7 @@ public CipherHelper cipherHelper() throws UnsupportedEncodingException { * @throws Exception */ @Bean - @Profile({"local-sohu", "test-sohu", "online-sohu"}) + @Profile({"local-sohu", "test-sohu", "test-online-sohu", "online-sohu"}) public LoginService sohuLoginService() throws Exception { Class clz = Class.forName("com.sohu.tv.mq.cloud.common.service.impl.SohuLoginService"); AbstractLoginService loginService = (AbstractLoginService) clz.newInstance(); diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/AuditTopicDao.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/AuditTopicDao.java index c908bfd9..ee62a6ab 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/AuditTopicDao.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/AuditTopicDao.java @@ -11,9 +11,10 @@ public interface AuditTopicDao { * 保存记录 * @param auditTopic */ - @Insert("insert into audit_topic(aid,name,queue_num,producer,ordered,qpd,qps,trace_enabled,transaction_enabled) values(" + - "#{auditTopic.aid},#{auditTopic.name},#{auditTopic.queueNum},#{auditTopic.producer}," - + "#{auditTopic.ordered},#{auditTopic.qpd},#{auditTopic.qps},#{auditTopic.traceEnabled},#{auditTopic.transactionEnabled})") + @Insert("insert into audit_topic(aid,name,queue_num,producer,ordered,qpd,qps,trace_enabled," + + "transaction_enabled,test_enabled, delay_enabled) values(#{auditTopic.aid},#{auditTopic.name},#{auditTopic.queueNum}," + + "#{auditTopic.producer},#{auditTopic.ordered},#{auditTopic.qpd},#{auditTopic.qps},#{auditTopic.traceEnabled}," + + "#{auditTopic.transactionEnabled},#{auditTopic.testEnabled},#{auditTopic.delayEnabled})") public void insert(@Param("auditTopic") AuditTopic auditTopic); /** diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/DelayMessageDao.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/DelayMessageDao.java new file mode 100644 index 00000000..76e919bb --- /dev/null +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/DelayMessageDao.java @@ -0,0 +1,48 @@ +package com.sohu.tv.mq.cloud.dao; + +import java.util.List; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import com.sohu.tv.mq.cloud.bo.TopicTraffic; +/** + * 延迟消息 + * + * @author zhehongyuan + * @date 2019年4月23日 + */ +public interface DelayMessageDao { + + /** + * 延迟消息的TopicTraffic记录 + */ + @Select("select create_time ,sum(count) as count from producer_total_stat where" + + " producer in(select distinct producer from user_producer where tid=#{tid}) " + + "and create_date = #{createDate} group by create_time") + public List selectTopicTraffic(@Param("tid")long tid, @Param("createDate")int createDate); + + + /** + * 获取topic流量 + * + * @param tid + * @param createDate + * @return + */ + @Select("select #{tid} as tid, create_time, sum(count) as count from producer_total_stat where" + + " producer in(select distinct producer from user_producer where tid=#{tid}) " + + "and create_date = #{createDate} and create_time = #{createTime} group by create_time") + public TopicTraffic selectByIdListDateTime(@Param("tid") long tid, + @Param("createDate") int createDate, @Param("createTime") String createTime); + + /** + * 获取topic日流量 + * @param tid + * @param createDate + * @return + */ + @Select("select #{tid} as tid, sum(count) as count from producer_total_stat where" + + " producer in(select distinct producer from user_producer where tid=#{tid}) " + + "and create_date = #{createDate}") + public TopicTraffic selectTotalTraffic(@Param("tid") long tid, @Param("createDate") int createDate); + +} diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/TopicDao.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/TopicDao.java index 90116187..e87af453 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/TopicDao.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/TopicDao.java @@ -28,8 +28,9 @@ public interface TopicDao { * @param topic */ @Options(useGeneratedKeys = true, keyProperty = "topic.id") - @Insert("insert into topic(cluster_id, name, queue_num, ordered, create_date, trace_enabled) values(" - + "#{topic.clusterId},#{topic.name},#{topic.queueNum},#{topic.ordered},now(),#{topic.traceEnabled})") + @Insert("insert into topic(cluster_id, name, queue_num, ordered, create_date, trace_enabled, info, delay_enabled) values(" + + "#{topic.clusterId},#{topic.name},#{topic.queueNum},#{topic.ordered},now(),#{topic.traceEnabled}," + + "#{topic.info}, #{topic.delayEnabled})") public Integer insert(@Param("topic") Topic topic); /** @@ -132,4 +133,13 @@ public interface TopicDao { @Select("SELECT c.id cid,c.name consumer,t.id tid,t.name topic,t.cluster_id FROM consumer c, topic t " + "where c.tid = t.id and c.consume_way = 0") public List selectTopicConsumer(); + + /** + * 更新记录 + * + * @param tid + * @param info + */ + @Update("update topic set info=#{info} where id=#{tid}") + public Integer updateTopicInfo(@Param("tid") long tid, @Param("info") String info); } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/UserProducerDao.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/UserProducerDao.java index 07e7abfe..3fb18236 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/UserProducerDao.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/dao/UserProducerDao.java @@ -90,4 +90,12 @@ public interface UserProducerDao { */ @Select("select * from user_producer where producer = #{producer} and uid = #{uid}") public List selectByProducerAndUid(@Param("producer") String producer, @Param("uid") long uid); + + /** + * 查询记录 + * @param tid + * @param uid + */ + @Select("select * from user_producer where tid = #{tid} and uid = #{uid} limit 1") + public UserProducer selectByTidAndUid(@Param("tid") long tid, @Param("uid") long uid); } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/AlertService.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/AlertService.java index 14469fcc..4c5ea666 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/AlertService.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/AlertService.java @@ -65,6 +65,20 @@ public boolean sendMail(String title, String content) { return alertMessageSender.sendMail(title, content, getDevelopers()); } + /** + * 发送邮件(同时抄送管理员) + * @param title 标题 + * @param content 内容 + * @param email 收件人 + * @return 成功返回true,否则返回false + */ + public boolean sendMail(String title, String content, String email) { + if(alertMessageSender == null) { + return false; + } + return alertMessageSender.sendMail(title, content, email, getDevelopers(), 0); + } + /** * 发送手机报警 * diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/DelayMessageService.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/DelayMessageService.java new file mode 100644 index 00000000..17db66bb --- /dev/null +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/DelayMessageService.java @@ -0,0 +1,105 @@ +package com.sohu.tv.mq.cloud.service; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.sohu.tv.mq.cloud.bo.TopicTraffic; +import com.sohu.tv.mq.cloud.dao.DelayMessageDao; +import com.sohu.tv.mq.cloud.util.Result; + +/** + * 延迟消息服务 + * + * @Description: + * @author zhehongyuan + * @date 2019年4月23日 + */ +@Service +public class DelayMessageService { + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DelayMessageDao delayMessageDao; + + /** + * 延迟消息的流量使用客户端上报的数据 + * + * @param tid + * @param createDate + */ + public Result> selectDelayMessageTraffic(long tid, int createDate) { + List topicTrafficList = null; + try { + topicTrafficList = delayMessageDao.selectTopicTraffic(tid, createDate); + } catch (Exception e) { + logger.error("selectDelayMessageTraffic err, tid:{}, createDate:{}", tid, createDate, e); + return Result.getDBErrorResult(e); + } + return Result.getResult(topicTrafficList); + } + + /** + * 延迟消息的流量使用客户端上报的数据 + * + * @param tid + * @param date + * @param time + * @return + */ + public Result query(long tid, int date, String time) { + TopicTraffic topicTraffic = null; + try { + topicTraffic = delayMessageDao.selectByIdListDateTime(tid, date, time); + } catch (Exception e) { + logger.error("query traffic err, tid:{},date:{},time:{}", tid, date, time, e); + return Result.getDBErrorResult(e); + } + return Result.getResult(topicTraffic); + } + + /** + * idList中的id需要全部属于延迟消息的topic,外部做过滤 + * + * @param idList + * @param date + * @param time + * @return + */ + public Result> query(List idList, int date, String time) { + if (idList == null || idList.isEmpty()) { + return Result.getResult(null); + } + List list = new ArrayList(idList.size()); + for (Long tid : idList) { + Result topicTraffic = query(tid, date, time); + if (topicTraffic.isOK()) { + list.add(topicTraffic.getResult()); + } + } + return Result.getResult(list); + } + + /** + * 查询延迟消息日流量 + * + * @param tid + * @param date + * @return + */ + public Result query(long tid, int date) { + TopicTraffic topicTraffic = null; + try { + topicTraffic = delayMessageDao.selectTotalTraffic(tid, date); + } catch (Exception e) { + logger.error("query traffic err, tid:{},date:{}", tid, date, e); + return Result.getDBErrorResult(e); + } + return Result.getResult(topicTraffic); + } +} diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/MessageService.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/MessageService.java index 56489d8b..b805e78e 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/MessageService.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/MessageService.java @@ -812,22 +812,23 @@ public Map groupTraceMessage(List decodedMe // 发送消息部分 if (TraceType.Pub == traceContext.getTraceType()) { traceViewVO.buildProducer(decodedMessage.getBornHostString(), traceContext.isSuccess(), - traceContext.getTimeStamp(), traceMessageDetail, traceContext.getGroupName()); - // broker上的信息根据生产者构建, - traceViewVO.setBroker(traceBean.getStoreHost().split(":")[0]); + traceContext.getTimeStamp(), traceMessageDetail, traceContext.getGroupName(), traceContext.getCostTime()); } else { // 消费消息部分 // 不显示空字符串 if (traceMessageDetail.getTopic().isEmpty()) { traceMessageDetail.setTopic(null); } + // 消费者不显示broker信息 + traceMessageDetail.setBroker(null); if (TraceType.SubBefore == traceContext.getTraceType()) { // 实际消费动作前 traceMessageDetail.setSuccess(null); + traceMessageDetail.setCostTime(null); traceViewVO.buildConsumer(decodedMessage.getBornHostString(), null, traceContext.getTimeStamp(), - traceMessageDetail, traceContext.getRequestId(), traceContext.getGroupName()); + traceMessageDetail, traceContext.getRequestId(), traceContext.getGroupName(), null); } else if (TraceType.SubAfter == traceContext.getTraceType()) { // 消费结束 traceMessageDetail.setTimeStamp(0); traceViewVO.buildConsumer(decodedMessage.getBornHostString(), traceContext.isSuccess(), 0L, - traceMessageDetail, traceContext.getRequestId(), traceContext.getGroupName()); + traceMessageDetail, traceContext.getRequestId(), traceContext.getGroupName(), traceContext.getCostTime()); } } } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/TopicService.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/TopicService.java index 33dfd695..697d735a 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/TopicService.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/TopicService.java @@ -326,6 +326,7 @@ public Result createTopic(Cluster mqCluster, Audit audit, AuditTopic auditTop Topic topic = new Topic(); BeanUtils.copyProperties(auditTopic, topic); topic.setClusterId(mqCluster.getId()); + topic.setInfo(audit.getInfo()); Integer count = save(topic); if(count == null) { return Result.getResult(Status.DB_ERROR); @@ -601,5 +602,21 @@ public Result updateCount(List topicTrafficList) { } return Result.getResult(result); } + + /** + * 更新topic描述 + * @param topicTrafficList + * @return + */ + public Result updateTopicInfo(long tid, String info) { + Integer result = null; + try { + result = topicDao.updateTopicInfo(tid, info); + } catch (Exception e) { + logger.error("updateTopicInfo err, tid:{}, info:{}", tid, info, e); + return Result.getDBErrorResult(e); + } + return Result.getResult(result); + } } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/UserProducerService.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/UserProducerService.java index 435fcd09..aed2b663 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/UserProducerService.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/UserProducerService.java @@ -228,4 +228,18 @@ public Result> findUserProducer(long uid, String producer) { } return Result.getResult(userProducerList); } + + /** + * 按照uid和tid查询UserProducer + */ + public Result findUserProducer(long uid, long tid) { + UserProducer userProducer = null; + try { + userProducer = userProducerDao.selectByTidAndUid(uid, tid); + } catch (Exception e) { + logger.error("findUserProducer err, pid:{}, tid:{}", uid, tid, e); + return Result.getDBErrorResult(e); + } + return Result.getResult(userProducer); + } } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/util/Status.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/util/Status.java index 3935d7e2..8594396e 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/util/Status.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/util/Status.java @@ -27,6 +27,7 @@ public enum Status { REPEAT_ERROR(305, "关联关系已存在"), LONGIN_ERROR(306, "用户名或密码错误"), OLD_PASSWORD_ERROR(307, "原始密码输入有误"), + NOT_ALLOWED(308, "无权限执行该操作"), // 4xx代表请求问题 NOT_FOUND_ERROR(404, "请求不存在"), diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/ConsumerController.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/ConsumerController.java index 9108ab30..2965cca5 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/ConsumerController.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/ConsumerController.java @@ -493,9 +493,7 @@ private Result associateUserConsumer(UserInfo userInfo, long uid, long tid, l if (userResult.isNotOK()) { return Result.getResult(Status.EMAIL_SEND_ERR); } - tip = tip + " user:" + (userResult.getResult().getName() == null - ? userResult.getResult().getEmailName() - : userResult.getResult().getName()) + ""; + tip = tip + " user:" + userResult.getResult().notBlankName() + ""; } alertService.sendAuditMail(userInfo.getUser(), TypeEnum.ASSOCIATE_CONSUMER, tip); } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/FeedbackController.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/FeedbackController.java index cd8d6aad..5cb2fc45 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/FeedbackController.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/FeedbackController.java @@ -6,6 +6,7 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.util.HtmlUtils; import com.sohu.tv.mq.cloud.bo.Feedback; import com.sohu.tv.mq.cloud.service.AlertService; @@ -22,6 +23,8 @@ @RequestMapping("/feedback") public class FeedbackController { + private static String FEED_BACK_TITLE = "MQCloud用户反馈"; + @Autowired private FeedbackService feedbackService; @@ -37,13 +40,13 @@ public class FeedbackController { @ResponseBody @RequestMapping(value = "/add", method = RequestMethod.POST) public Result add(UserInfo userInfo, @RequestParam("content") String content) throws Exception { + content = HtmlUtils.htmlEscape(content, "UTF-8"); Feedback feedback = new Feedback(); feedback.setContent(content); feedback.setUid(userInfo.getUser().getId()); Result result = feedbackService.save(feedback); - alertService.sendMail("MQCloud用户反馈", - "用户: " + userInfo.getUser().getEmail() + "
" + - "反馈: " + feedback); + alertService.sendMail(FEED_BACK_TITLE, "您好!
我们已收到您的反馈,感谢您为MQCloud做出的贡献 !
反馈内容如下:
" + content, + userInfo.getUser().getEmail()); return Result.getWebResult(result); } } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/TopicController.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/TopicController.java index 38cee6d6..dd5f9fdf 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/TopicController.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/TopicController.java @@ -11,6 +11,7 @@ import javax.validation.Valid; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; @@ -28,6 +29,7 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.util.HtmlUtils; import com.sohu.tv.mq.cloud.bo.Audit; import com.sohu.tv.mq.cloud.bo.Audit.TypeEnum; @@ -225,7 +227,7 @@ public Result add(UserInfo userInfo, @Valid TopicParam topicParam) throws Exc audit.setType(TypeEnum.NEW_TOPIC.getType()); audit.setStatus(Audit.StatusEnum.INIT.getStatus()); audit.setUid(userInfo.getUser().getId()); - audit.setInfo(topicParam.getInfo()); + audit.setInfo(HtmlUtils.htmlEscape(topicParam.getInfo(), "UTF-8")); // 构造topic审核记录 AuditTopic auditTopic = new AuditTopic(); BeanUtils.copyProperties(topicParam, auditTopic); @@ -319,9 +321,7 @@ private Result associateUserProducer(UserInfo userInfo, long uid, long tid, S Result userResult = userService.query(uid); if (userResult.isOK()) { tip = getTopicTip(tid) + " producer:" + producer + " user:" - + (userResult.getResult().getName() == null ? userResult.getResult().getEmailName() - : userResult.getResult().getName()) - + ""; + + userResult.getResult().notBlankName()+ ""; } } if (tip == null) { @@ -460,6 +460,33 @@ private String getTopicTip(long tid) { return sb.toString(); } + /** + * 更新Topic描述 + * + * @return + * @throws Exception + */ + @ResponseBody + @RequestMapping(value = "/update/info", method = RequestMethod.POST) + public Result updateTopicInfo(UserInfo userInfo, @RequestParam("tid") int tid, + @RequestParam("info") String info) throws Exception { + // 校验当前用户是否拥有权限 + Result userProducerResult = userProducerService.findUserProducer(userInfo.getUser().getId(), tid); + if (userProducerResult.isNotOK() && !userInfo.getUser().isAdmin()) { + return Result.getResult(Status.NOT_ALLOWED); + } + Result topicResult = topicService.queryTopic(tid); + if (topicResult.isNotOK()) { + return topicResult; + } + if (StringUtils.isBlank(info)) { + return Result.getResult(Status.PARAM_ERROR); + } + Result result = topicService.updateTopicInfo(tid, HtmlUtils.htmlEscape(info.trim(), "UTF-8")); + logger.info(userInfo.getUser().getName() + " update topic info , tid:{}, info:{}, status:{}", tid, info, result.isOK()); + return Result.getWebResult(result); + } + @Override public String viewModule() { return "topic"; diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserConsumerController.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserConsumerController.java index a33e5c81..f2d19b7c 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserConsumerController.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserConsumerController.java @@ -97,7 +97,7 @@ public Result delete(UserInfo userInfo, @RequestParam(value = "uid", defaultV topicResult.getResult().getName(), uid); if (result.isOK()) { alertService.sendAuditMail(userInfo.getUser(), TypeEnum.DELETE_USERCONSUMER, - userResult.getResult().getName() + "与" + consumerResult.getResult().getName() + "的关联关系"); + userResult.getResult().notBlankName() + "与" + consumerResult.getResult().getName() + "的关联关系"); } return Result.getWebResult(result); diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserController.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserController.java index cf9cda13..cf3632d0 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserController.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserController.java @@ -38,6 +38,7 @@ import com.sohu.tv.mq.cloud.service.ClusterService; import com.sohu.tv.mq.cloud.service.ConsumerService; import com.sohu.tv.mq.cloud.service.ConsumerTrafficService; +import com.sohu.tv.mq.cloud.service.DelayMessageService; import com.sohu.tv.mq.cloud.service.ProducerTotalStatService; import com.sohu.tv.mq.cloud.service.TopicService; import com.sohu.tv.mq.cloud.service.TopicTrafficService; @@ -91,6 +92,9 @@ public class UserController extends ViewController { @Autowired private ClusterService clusterService; + + @Autowired + private DelayMessageService delayMessageService; /** * 退出登录 @@ -219,15 +223,32 @@ public String topic(UserInfo userInfo, @Valid PaginationParam paginationParam, List topicList = result.getResult(); // 组装topic id 列表 List tidList = new ArrayList(topicList.size()); + List delayTidList = new ArrayList(); for (Topic topic : topicList) { - tidList.add(topic.getId()); + if (topic.delayEnabled()) { + delayTidList.add(topic.getId()); + } else { + tidList.add(topic.getId()); + } } // 获取一分钟之前的topic流量数据 Date oneMinuteAgo = new Date(System.currentTimeMillis() - 60000); String time = DateUtil.getFormat(DateUtil.HHMM).format(oneMinuteAgo); String date = DateUtil.formatYMD(oneMinuteAgo); - Result> topicTrafficListResult = topicTrafficService.query(tidList, date, time); - + Result> topicTrafficListResult = Result.getResult(new ArrayList(topicList.size())); + if (!tidList.isEmpty()) { + Result> trafficListResult = topicTrafficService.query(tidList, date, time); + if (trafficListResult.isNotEmpty()) { + topicTrafficListResult.getResult().addAll(trafficListResult.getResult()); + } + } + if (!delayTidList.isEmpty()) { + Result> delayTrafficListResult = delayMessageService.query(delayTidList, Integer.parseInt(date), time); + if (delayTrafficListResult.isNotEmpty()) { + topicTrafficListResult.getResult().addAll(delayTrafficListResult.getResult()); + } + } + tidList.addAll(delayTidList); // 查询consumer列表 Result> consumerListResult = consumerService.queryByTidList(tidList); Map> consumerMap = null; @@ -369,7 +390,13 @@ public String topology(UserInfo userInfo, @PathVariable long tid, Map tidList = new ArrayList(1); tidList.add(topic.getId()); - Result> topicTrafficListResult = topicTrafficService.query(tidList, date, time); + Result> topicTrafficListResult = null; + if (topic.delayEnabled()) { + topicTrafficListResult = delayMessageService.query(tidList, Integer.parseInt(date), time); + } else { + topicTrafficListResult = topicTrafficService.query(tidList, date, time); + } + if (topicTrafficListResult.isNotEmpty()) { result.getResult().setTopicTraffic(topicTrafficListResult.getResult().get(0)); } @@ -432,7 +459,12 @@ public Cluster mqCluster() { } // 获取总流量 - Result topicTrafficResult = topicTrafficService.queryTotalTraffic(topic.getId(), date); + Result topicTrafficResult = null; + if (topic.delayEnabled()) { + topicTrafficResult = delayMessageService.query(topic.getId(), Integer.parseInt(date)); + } else { + topicTrafficResult = topicTrafficService.queryTotalTraffic(topic.getId(), date); + } if (topicTrafficResult.isOK()) { result.getResult().setTotalTopicTraffic(topicTrafficResult.getResult()); } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserProducerController.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserProducerController.java index 31a9b257..17be1047 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserProducerController.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/UserProducerController.java @@ -83,7 +83,7 @@ public Result delete(UserInfo userInfo, @RequestParam(value = "pid", defaultV currentUserProducer.getResult().getUid()); if (result.isOK()) { alertService.sendAuditMail(userInfo.getUser(), TypeEnum.DELETE_USERPRODUCER, - userResult.getResult().getName() + "与" + currentUserProducer.getResult().getProducer() + "的关联关系"); + userResult.getResult().notBlankName() + "与" + currentUserProducer.getResult().getProducer() + "的关联关系"); } return Result.getWebResult(result); diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/admin/AuditController.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/admin/AuditController.java index ebccd18b..175f4b05 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/admin/AuditController.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/admin/AuditController.java @@ -38,6 +38,7 @@ import com.sohu.tv.mq.cloud.bo.UserConsumer; import com.sohu.tv.mq.cloud.bo.UserMessage; import com.sohu.tv.mq.cloud.bo.UserProducer; +import com.sohu.tv.mq.cloud.service.AlertService; import com.sohu.tv.mq.cloud.service.AssociateConsumerService; import com.sohu.tv.mq.cloud.service.AssociateProducerService; import com.sohu.tv.mq.cloud.service.AuditConsumerDeleteService; @@ -142,6 +143,9 @@ public class AuditController extends AdminViewController { @Autowired private AuditResendMessageService auditResendMessageService; + + @Autowired + private AlertService alertService; /** * 审核主列表 @@ -511,6 +515,7 @@ public Result refuse(UserInfo userInfo, userMessage.setMessage(sb.toString()); userMessage.setUid(audit.getUid()); userMessageService.save(userMessage); + sendEmailMessage(audit.getUid(), sb.toString()); return updateResult; } return Result.getResult(Status.PARAM_ERROR); @@ -1304,6 +1309,7 @@ private boolean agreeAndTip(Audit audit, String auditMail, String tip) { userMessage.setMessage(sb.toString()); userMessage.setUid(audit.getUid()); userMessageService.save(userMessage); + sendEmailMessage(audit.getUid(), sb.toString()); return true; } return false; @@ -1775,6 +1781,33 @@ public Result createTraceTopic(Audit audit, AuditTopic auditTopic, Integer tr return Result.getOKResult(); } + /** + * 为用户发送审核结果 + * + * @param uid + * @param content + */ + private void sendEmailMessage(long uid, String content) { + Result userResult = userService.query(uid); + if (userResult.isNotOK()) { + logger.warn("select user is not ok! uid:{}", uid); + return; + } + sendEmailMessage(userResult.getResult().getEmail(), content); + } + + /** + * 为用户发送审核结果 + * + * @param email + * @param content + */ + private void sendEmailMessage(String email, String content) { + if (!alertService.sendMail("MQCloud:审核结果", content, email)) { + logger.warn("send audit result fail! email:{}, content:{}", email, content); + } + } + @Override public String viewModule() { return "audit"; diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/param/BrokerParam.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/param/BrokerParam.java index 71910a7b..f3fb969c 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/param/BrokerParam.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/param/BrokerParam.java @@ -104,6 +104,7 @@ public String toConfig(String nameServerDomain, Cluster cluster) { + "\nautoCreateTopicEnable=false" + "\nclusterTopicEnable=false" + "\nautoCreateSubscriptionGroup=true" + + "\nslaveReadEnable=true" + "\nstorePathRootDir=" + MQDeployer.MQ_CLOUD_DIR + getDir() + "/data" + "\nstorePathCommitLog=" + MQDeployer.MQ_CLOUD_DIR + getDir() + "/data/commitlog" + "\nstorePathIndex=" + MQDeployer.MQ_CLOUD_DIR + getDir() + "/data/index" diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/param/TopicParam.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/param/TopicParam.java index a0228d04..179172fc 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/param/TopicParam.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/controller/param/TopicParam.java @@ -35,6 +35,14 @@ public class TopicParam { private long qpd; private long qps; + // 是否测试topic + @Range(min = 0, max = 1) + private int testEnabled; + + // 是否消息延迟 + @Range(min = 0, max = 1) + private int delayEnabled; + public String getProducer() { return producer; } @@ -91,10 +99,23 @@ public int getTransactionEnabled() { public void setTransactionEnabled(int transactionEnabled) { this.transactionEnabled = transactionEnabled; } + public int getTestEnabled() { + return testEnabled; + } + public void setTestEnabled(int testEnabled) { + this.testEnabled = testEnabled; + } + public int getDelayEnabled() { + return delayEnabled; + } + public void setDelayEnabled(int delayEnabled) { + this.delayEnabled = delayEnabled; + } @Override public String toString() { return "TopicParam [name=" + name + ", queueNum=" + queueNum + ", ordered=" + ordered + ", info=" + info + ", producer=" + producer + ", traceEnabled=" + traceEnabled + ", transactionEnabled=" - + transactionEnabled + ", qpd=" + qpd + ", qps=" + qps + "]"; + + transactionEnabled + ", qpd=" + qpd + ", qps=" + qps + ", testEnabled=" + testEnabled + + ", delayEnabled=" + delayEnabled + "]"; } } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/view/data/ConsumeTrafficLineChartData.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/view/data/ConsumeTrafficLineChartData.java index d6004ac7..a75d6965 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/view/data/ConsumeTrafficLineChartData.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/view/data/ConsumeTrafficLineChartData.java @@ -16,10 +16,12 @@ import com.sohu.tv.mq.cloud.bo.Consumer; import com.sohu.tv.mq.cloud.bo.ConsumerTraffic; +import com.sohu.tv.mq.cloud.bo.Topic; import com.sohu.tv.mq.cloud.bo.TopicTopology; import com.sohu.tv.mq.cloud.bo.TopicTraffic; import com.sohu.tv.mq.cloud.bo.Traffic; import com.sohu.tv.mq.cloud.service.ConsumerTrafficService; +import com.sohu.tv.mq.cloud.service.DelayMessageService; import com.sohu.tv.mq.cloud.service.TopicTrafficService; import com.sohu.tv.mq.cloud.service.UserService; import com.sohu.tv.mq.cloud.util.DateUtil; @@ -61,6 +63,9 @@ public class ConsumeTrafficLineChartData implements LineChartData { @Autowired private UserService userService; + + @Autowired + private DelayMessageService delayMessageService; public ConsumeTrafficLineChartData() { initSearchHeader(); @@ -119,9 +124,9 @@ public List getLineChartData(Map searchMap) { if (tid == null || tid <= 0) { return lineChartList; - } + } //获取topic流量 - Result> result = topicTrafficService.query(tid, dateStr); + Result> result = getTopicTraffic(topicTopology.getTopic(), dateStr); if (!result.isOK()) { return lineChartList; } @@ -288,6 +293,23 @@ protected Date getDate(Map searchMap, String key) { return new Date(); } + /** + * 获取topic流量 + * + * @param topic + * @param dateStr + * @return + */ + private Result> getTopicTraffic(Topic topic, String dateStr) { + Result> result = null; + if (topic.delayEnabled()) { + result = delayMessageService.selectDelayMessageTraffic(topic.getId(), Integer.parseInt(dateStr)); + } else { + result = topicTrafficService.query(topic.getId(), dateStr); + } + return result; + } + @Override public SearchHeader getSearchHeader() { return searchHeader; diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/view/data/ProduceTrafficLineChartData.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/view/data/ProduceTrafficLineChartData.java index ac63ee09..182c0353 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/view/data/ProduceTrafficLineChartData.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/view/data/ProduceTrafficLineChartData.java @@ -11,8 +11,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import com.sohu.tv.mq.cloud.bo.Topic; import com.sohu.tv.mq.cloud.bo.TopicTraffic; import com.sohu.tv.mq.cloud.bo.Traffic; +import com.sohu.tv.mq.cloud.service.DelayMessageService; +import com.sohu.tv.mq.cloud.service.TopicService; import com.sohu.tv.mq.cloud.service.TopicTrafficService; import com.sohu.tv.mq.cloud.util.DateUtil; import com.sohu.tv.mq.cloud.util.Result; @@ -51,7 +54,13 @@ public class ProduceTrafficLineChartData implements LineChartData { @Autowired private TopicTrafficService topicTrafficService; + + @Autowired + private DelayMessageService delayMessageService; + @Autowired + private TopicService topicService; + public ProduceTrafficLineChartData() { initSearchHeader(); } @@ -112,15 +121,19 @@ public List getLineChartData(Map searchMap) { if (tid == null || tid <= 0) { return lineChartList; } + Result topicResult = topicService.queryTopic(tid); + if (topicResult.isNotOK()) { + return lineChartList; + } //获取topic流量 - Result> result = topicTrafficService.query(tid, dateStr); + Result> result = getTopicTraffic(topicResult.getResult(), dateStr); if (!result.isOK()) { return lineChartList; } Date dayBefore = new Date(date.getTime() - 24*60*60*1000); //获取前一天topic流量 - Result> resultDayBefore = topicTrafficService.query(tid, DateUtil.formatYMD(dayBefore)); + Result> resultDayBefore = getTopicTraffic(topicResult.getResult(), DateUtil.formatYMD(dayBefore)); // 构造曲线图对象 LineChart lineChart = new LineChart(); @@ -308,6 +321,23 @@ private Map list2Map(List list) { return map; } + /** + * 获取topic流量 + * + * @param topic + * @param dateStr + * @return + */ + private Result> getTopicTraffic(Topic topic, String dateStr) { + Result> result = null; + if (topic.delayEnabled()) { + result = delayMessageService.selectDelayMessageTraffic(topic.getId(), Integer.parseInt(dateStr)); + } else { + result = topicTrafficService.query(topic.getId(), dateStr); + } + return result; + } + /** * 获取长整型数据 * diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/vo/TopicTrafficVO.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/vo/TopicTrafficVO.java index 055e524b..150d85fc 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/vo/TopicTrafficVO.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/vo/TopicTrafficVO.java @@ -21,6 +21,9 @@ public class TopicTrafficVO { private boolean own; + // topic描述 + private String info; + public long getId() { return id; } @@ -61,9 +64,17 @@ public void setOwn(boolean own) { this.own = own; } + public String getInfo() { + return info; + } + + public void setInfo(String info) { + this.info = info; + } + @Override public String toString() { return "TopicTrafficVO [id=" + id + ", name=" + name + ", topicTraffic=" + topicTraffic + ", consumerTraffic=" - + consumerTraffic + "]"; + + consumerTraffic + ", info=" + info + "]"; } } diff --git a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/vo/TraceViewVO.java b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/vo/TraceViewVO.java index a1058a79..28717098 100644 --- a/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/vo/TraceViewVO.java +++ b/mq-cloud/src/main/java/com/sohu/tv/mq/cloud/web/vo/TraceViewVO.java @@ -1,5 +1,6 @@ package com.sohu.tv.mq.cloud.web.vo; +import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -19,11 +20,17 @@ public class TraceViewVO { // 表示肯定 public static int YES = 1; + // 耗时计算保留两位 + public static DecimalFormat df = new DecimalFormat("0.00"); + // 一秒 + public static int ONE_SECOND = 1000; + // 一分钟 + public static int ONE_MINUTE = 60 * ONE_SECOND; + // 一小时 + public static int ONE_HOUR = 60 * ONE_MINUTE; private ViewVO producer; - private String broker; - // consumer 可能会有多个, 同一个consumer的requestId相同 private Map consumer; @@ -38,6 +45,9 @@ public static class ViewVO { private List detail; private String group; + + // 耗时 ms + private Integer costTime; public String getAddr() { return addr; @@ -79,13 +89,38 @@ public void setGroup(String group) { this.group = group; } + public Integer getCostTime() { + return costTime; + } + + public void setCostTime(Integer costTime) { + this.costTime = costTime; + } + public String status() { if (success == null) { return "未知"; } return success ? "成功" : "失败"; } - + /** + * 简单计算耗时,只保留两位小数 + */ + public String costTimes() { + if (costTime == null) { + return "未知"; + } + if (costTime < ONE_SECOND) { + return costTime + "ms"; + } else if (costTime < ONE_MINUTE) { + return df.format((float)costTime/ONE_SECOND) + "s"; + } else if (costTime < ONE_HOUR) { + return df.format((float)costTime/ONE_MINUTE) + "m"; + } else { + return df.format((float)costTime/ONE_HOUR) + "h"; + } + } + @Override public String toString() { return "ViewVO [addr=" + addr + ", success=" + success + ", time=" + time + ", group=" + group + "]"; @@ -100,14 +135,6 @@ public void setProducer(ViewVO producer) { this.producer = producer; } - public String getBroker() { - return broker; - } - - public void setBroker(String broker) { - this.broker = broker; - } - public Map getConsumer() { return consumer; } @@ -147,9 +174,9 @@ public String consumerToJsonString() { } public void buildProducer(String addr, Boolean isSuccess, long time, TraceMessageDetail detail, - String producerGroup) { + String producerGroup, int costTime) { ViewVO viewVo = new ViewVO(); - buildViewVo(viewVo, addr, isSuccess, time, detail, producerGroup); + buildViewVo(viewVo, addr, isSuccess, time, detail, producerGroup, costTime); setProducer(viewVo); } @@ -162,8 +189,8 @@ public void buildProducer(String addr, Boolean isSuccess, long time, TraceMessag * @param consumerStatus * @param detail */ - public void buildConsumer(String addr, Boolean isSuccess, long time, - TraceMessageDetail detail, String requestId, String consumerGroup) { + public void buildConsumer(String addr, Boolean isSuccess, long time, TraceMessageDetail detail, + String requestId, String consumerGroup, Integer costTime) { Map consumer = getConsumer(); if (consumer == null) { consumer = new HashMap(); @@ -174,13 +201,13 @@ public void buildConsumer(String addr, Boolean isSuccess, long time, viewVo = new ViewVO(); consumer.put(requestId, viewVo); } - buildViewVo(viewVo, addr, isSuccess, detail, consumerGroup); + buildViewVo(viewVo, addr, isSuccess, detail, consumerGroup, costTime); if (time > 0) { viewVo.setTime(DateUtil.getFormat(DateUtil.YMD_DASH_HMS_COLON_DOT_SSS).format(new Date(time))); } } - private void buildViewVo(ViewVO viewVo, String addr, Boolean isSuccess, TraceMessageDetail detail, String group) { + private void buildViewVo(ViewVO viewVo, String addr, Boolean isSuccess, TraceMessageDetail detail, String group, Integer costTime) { if (viewVo.getAddr() == null) { viewVo.setAddr(addr); } @@ -196,11 +223,15 @@ private void buildViewVo(ViewVO viewVo, String addr, Boolean isSuccess, TraceMes } traceMessageDetailList.add(detail); viewVo.setDetail(traceMessageDetailList); + + if (viewVo.getCostTime() == null) { + viewVo.setCostTime(costTime); + } } private void buildViewVo(ViewVO viewVo, String addr, Boolean isSuccess, long time, TraceMessageDetail detail, - String group) { - buildViewVo(viewVo, addr, isSuccess, detail, group); + String group, int costTime) { + buildViewVo(viewVo, addr, isSuccess, detail, group, costTime); if (viewVo.getTime() == null) { viewVo.setTime(DateUtil.getFormat(DateUtil.YMD_DASH_HMS_COLON_DOT_SSS).format(new Date(time))); } @@ -208,6 +239,6 @@ private void buildViewVo(ViewVO viewVo, String addr, Boolean isSuccess, long tim @Override public String toString() { - return "TraceViewVO [producer=" + producer + ", broker=" + broker + ", consumer=" + consumer + "]"; + return "TraceViewVO [producer=" + producer + ", consumer=" + consumer + "]"; } } diff --git a/mq-cloud/src/main/resources/logback-spring.xml b/mq-cloud/src/main/resources/logback-spring.xml index 47423540..7c8a8953 100644 --- a/mq-cloud/src/main/resources/logback-spring.xml +++ b/mq-cloud/src/main/resources/logback-spring.xml @@ -40,7 +40,7 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} {%thread} %-5level %logger{50}-%L - %msg%n diff --git a/mq-cloud/src/main/resources/templates/admin/audit/addTopic.html b/mq-cloud/src/main/resources/templates/admin/audit/addTopic.html index 245bf515..6abc2172 100644 --- a/mq-cloud/src/main/resources/templates/admin/audit/addTopic.html +++ b/mq-cloud/src/main/resources/templates/admin/audit/addTopic.html @@ -50,6 +50,13 @@ checked>是 +
+ +
+ checked>线上 + checked>测试 +
+
@@ -79,16 +86,33 @@
+ <#elseif response.result.transactionEnabled()> <#--事务--> <#list clusters as cluster> <#if cluster.isEnableTransaction()> - <#else> + + <#elseif response.result.testEnabled()> <#--测试--> + <#list clusters as cluster> + <#if cluster.test()> + + + + + <#else> <#--all--> <#list clusters as cluster> +
@@ -122,12 +146,18 @@ \ No newline at end of file diff --git a/mq-cloud/src/main/resources/templates/user/topicTopology.html b/mq-cloud/src/main/resources/templates/user/topicTopology.html index 353eb961..2910ca2e 100644 --- a/mq-cloud/src/main/resources/templates/user/topicTopology.html +++ b/mq-cloud/src/main/resources/templates/user/topicTopology.html @@ -8,7 +8,7 @@

topic信息

- +
@@ -46,6 +46,21 @@

topic信息

+ + + + @@ -637,7 +652,37 @@ - +<#--修改topic描述--> +
用途: + ${response.result.topic.info!} + <#if response.result.own> + + +
所属用户 生产者(producer group)