Skip to content

Commit

Permalink
修改如下:
Browse files Browse the repository at this point in the history
1 新加特性
增加topic描述,方便识别topic用途。
增加topic环境区分,测试topic建立在测试集群,线上topic建立在线上集群,。
延迟消息topic流量统计采用客户端上报数据(broker端无数据)。
topic创建页面精简,只展示必要选项,其他隐藏。
trace搜索增加发送&消费耗时。
修改快速入门代码示例。
broker部署时开启slaveReadEnable机制。
审核完毕发送邮件通知。

2 bug修复
生产者名称过长页面错乱bug修复。
server监控小数位数过长bug修复。
关联用户名字为空导致提示为null bug修复。
修复客户端统计最大耗时死循环bug。
修复客户端引入apache common lang包导致类找不到。
  • Loading branch information
yongfeigao committed May 5, 2019
1 parent 826a991 commit d4f4aca
Show file tree
Hide file tree
Showing 53 changed files with 842 additions and 142 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
----------

## 目前运维的规模
1. 服务器:30台+
1. 服务器:40台+
2. 集群:5个+
3. topic:370个+
4. 生产消费消息量/日:10亿条+
Expand All @@ -77,4 +77,4 @@

MQCloud QQ交流群:474960759

使用方式请参考[wiki](https://github.com/sohutv/sohu-tv-mq/wiki)
使用方式请参考[wiki](https://github.com/sohutv/sohu-tv-mq/wiki)
2 changes: 1 addition & 1 deletion mq-client-common-open/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.sohu.tv</groupId>
<artifactId>mq</artifactId>
<version>1.6</version>
<version>1.7</version>
</parent>

<artifactId>mq-client-common-open</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
Expand Down Expand Up @@ -225,7 +225,7 @@ protected void initTrace() {
* @param traceDispatcher
*/
protected void registerTraceDispatcher(AsyncTraceDispatcher traceDispatcher) {
throw new NotImplementedException();
throw new NotImplementedException("not impl!");
}

public boolean isSampleEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* 调用统计
Expand Down Expand Up @@ -161,7 +160,7 @@ public Map<String, Object> getMap(){
*/
public class TimeStats {
// 最大耗时
private AtomicReference<Long> maxTimeReference = new AtomicReference<Long>(0L);
private AtomicLong maxTimeReference = new AtomicLong(0L);
// 调用次数统计
private AtomicLong count = new AtomicLong();
// 调用时间统计
Expand All @@ -182,18 +181,18 @@ public void increment(long timeInMillis) {
// 记录耗时
time.addAndGet(timeInMillis);
// 记录最大耗时
while (true) {
for(int i = 0; i < 10; ++i) {
long maxTime = maxTimeReference.get();
if (maxTime >= timeInMillis) {
return;
}
if (maxTimeReference.compareAndSet(maxTime, timeInMillis)) {
break;
return;
}
}
}

public AtomicReference<Long> getMaxTimeReference() {
public AtomicLong getMaxTimeReference() {
return maxTimeReference;
}

Expand Down Expand Up @@ -240,7 +239,7 @@ public static class InvokeStatsResult {
private Map<String, Integer> exceptionMap;

public void init(TimeStats timeStats) {
setMaxTime(timeStats.getMaxTimeReference().get().intValue());
setMaxTime((int)timeStats.getMaxTimeReference().get());
double time = timeStats.getTime().get();
long count = timeStats.getCount().get();
// 保留一位小数
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
public class Version {

public static String get() {
return "1.6";
return "1.7";
}
}
2 changes: 1 addition & 1 deletion mq-client-open/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.sohu.tv</groupId>
<artifactId>mq</artifactId>
<version>1.6</version>
<version>1.7</version>
</parent>

<artifactId>mq-client-open</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion mq-cloud-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.sohu.tv</groupId>
<artifactId>mq</artifactId>
<version>1.6</version>
<version>1.7</version>
</parent>

<artifactId>mq-cloud-common</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion mq-cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.sohu.tv</groupId>
<artifactId>mq</artifactId>
<version>1.6</version>
<version>1.7</version>
</parent>

<artifactId>mq-cloud</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions mq-cloud/sql/1.6-1.7.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
alter table `audit_topic` add column `test_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:非测试topic,1:测试topic';

alter table `topic` add column `info` varchar(360) DEFAULT NULL COMMENT 'topic描述';

alter table `topic` add column `delay_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不发送延迟消息,1:发送延迟消息。注:此字段不强制该topic的消息类型,仅为流量展示';

alter table `audit_topic` add column `delay_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不发送延迟消息,1:发送延迟消息。注:此字段不强制该topic的消息类型,仅为流量展示';
4 changes: 4 additions & 0 deletions mq-cloud/sql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ CREATE TABLE `audit_topic` (
`ordered` int(4) NOT NULL DEFAULT '0' COMMENT '0:无序,1:有序',
`trace_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不开启trace,1:开启trace',
`transaction_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不开启事务,1:开启事务',
`delay_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不发送延迟消息,1:发送延迟消息。注:此字段不强制该topic的消息类型',
`test_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:非测试topic,1:测试topic',
`info` varchar(360) DEFAULT NULL COMMENT 'topic描述',
`qps` int(11) DEFAULT NULL COMMENT '消息量qps预估',
`qpd` int(11) DEFAULT NULL COMMENT '一天消息量预估',
UNIQUE KEY `name` (`name`)
Expand Down Expand Up @@ -424,6 +427,7 @@ CREATE TABLE `topic` (
`ordered` int(4) NOT NULL DEFAULT '0' COMMENT '0:无序,1:有序',
`count` int(11) DEFAULT NULL COMMENT 'topic put times',
`trace_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不开启trace,1:开启trace',
`delay_enabled` int(4) NOT NULL DEFAULT '0' COMMENT '0:不发送延迟消息,1:发送延迟消息。注:此字段不强制该topic的消息类型',
`create_date` date NOT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
Expand Down
26 changes: 25 additions & 1 deletion mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/AuditTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ public class AuditTopic {
private int traceEnabled;
// 是否开启事务
private int transactionEnabled;

private int testEnabled;

private int delayEnabled;

public long getAid() {
return aid;
Expand Down Expand Up @@ -104,11 +108,31 @@ public void setTransactionEnabled(int transactionEnabled) {
this.transactionEnabled = transactionEnabled;
}

public int getTestEnabled() {
return testEnabled;
}

public void setTestEnabled(int testEnabled) {
this.testEnabled = testEnabled;
}

public boolean testEnabled() {
return testEnabled == 1;
}

public int getDelayEnabled() {
return delayEnabled;
}

public void setDelayEnabled(int delayEnabled) {
this.delayEnabled = delayEnabled;
}

@Override
public String toString() {
return "AuditTopic [aid=" + aid + ", name=" + name + ", queueNum=" + queueNum + ", ordered=" + ordered
+ ", producer=" + producer + ", qpd=" + qpd + ", qps=" + qps + ", traceEnabled=" + traceEnabled
+ ", transactionEnabled=" + transactionEnabled + "]";
+ ", transactionEnabled=" + transactionEnabled + ", testEnabled=" + testEnabled + "]";
}

}
4 changes: 4 additions & 0 deletions mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public boolean isEnableTransaction() {
return YES == transactionEnabled;
}

public boolean test() {
return !online();
}

@Override
public int hashCode() {
final int prime = 31;
Expand Down
26 changes: 26 additions & 0 deletions mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public class Topic {

// 是否开启trace
private int traceEnabled;

// topic描述
private String info;

// 是否延迟消息
private int delayEnabled;

public long getId() {
return id;
Expand Down Expand Up @@ -123,6 +129,26 @@ public void setTraceEnabled(int traceEnabled) {
this.traceEnabled = traceEnabled;
}

public String getInfo() {
return info;
}

public void setInfo(String info) {
this.info = info;
}

public int getDelayEnabled() {
return delayEnabled;
}

public void setDelayEnabled(int delayEnabled) {
this.delayEnabled = delayEnabled;
}

public boolean delayEnabled() {
return delayEnabled == 1;
}

@Override
public String toString() {
return "Topic [id=" + id + ", clusterId=" + clusterId + ", name=" + name + ", queueNum=" + queueNum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class TraceMessageDetail {
// 唯一id
private String requestId;

// broker address
private String broker;

public TraceType getTraceType() {
return traceType;
}
Expand Down Expand Up @@ -73,12 +76,8 @@ public Integer getCostTime() {
return costTime;
}

public void setCostTime(int costTime) {
if (costTime == 0) {
this.costTime = null;
} else {
this.costTime = costTime;
}
public void setCostTime(Integer costTime) {
this.costTime = costTime;
}

public Boolean isSuccess() {
Expand Down Expand Up @@ -148,5 +147,18 @@ public String getRequestId() {
public void setRequestId(String requestId) {
this.requestId = requestId;
}

public String getBroker() {
return broker;
}

public void setBroker(String broker) {
this.broker = broker;
}

public void setStoreHost(String storeHost) {
// 只取ip
setBroker(storeHost.split(":")[0]);
}

}
8 changes: 8 additions & 0 deletions mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/User.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Date;

import org.apache.commons.lang3.StringUtils;
import org.hibernate.validator.constraints.Email;
import org.hibernate.validator.constraints.Range;

Expand Down Expand Up @@ -121,6 +122,13 @@ public void setPassword(String password) {
this.password = password;
}

public String notBlankName(){
if (StringUtils.isBlank(getName())) {
return getEmailName();
}
return getName();
}

@Override
public String toString() {
return "User [id=" + id + ", name=" + name + ", email=" + email + ", mobile=" + mobile + ", type=" + type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public CipherHelper cipherHelper() throws UnsupportedEncodingException {
* @throws Exception
*/
@Bean
@Profile({"local-sohu", "test-sohu", "online-sohu"})
@Profile({"local-sohu", "test-sohu", "test-online-sohu", "online-sohu"})
public LoginService sohuLoginService() throws Exception {
Class<?> clz = Class.forName("com.sohu.tv.mq.cloud.common.service.impl.SohuLoginService");
AbstractLoginService loginService = (AbstractLoginService) clz.newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ public interface AuditTopicDao {
* 保存记录
* @param auditTopic
*/
@Insert("insert into audit_topic(aid,name,queue_num,producer,ordered,qpd,qps,trace_enabled,transaction_enabled) values(" +
"#{auditTopic.aid},#{auditTopic.name},#{auditTopic.queueNum},#{auditTopic.producer},"
+ "#{auditTopic.ordered},#{auditTopic.qpd},#{auditTopic.qps},#{auditTopic.traceEnabled},#{auditTopic.transactionEnabled})")
@Insert("insert into audit_topic(aid,name,queue_num,producer,ordered,qpd,qps,trace_enabled,"
+ "transaction_enabled,test_enabled, delay_enabled) values(#{auditTopic.aid},#{auditTopic.name},#{auditTopic.queueNum},"
+ "#{auditTopic.producer},#{auditTopic.ordered},#{auditTopic.qpd},#{auditTopic.qps},#{auditTopic.traceEnabled},"
+ "#{auditTopic.transactionEnabled},#{auditTopic.testEnabled},#{auditTopic.delayEnabled})")
public void insert(@Param("auditTopic") AuditTopic auditTopic);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.sohu.tv.mq.cloud.dao;

import java.util.List;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import com.sohu.tv.mq.cloud.bo.TopicTraffic;
/**
* 延迟消息
*
* @author zhehongyuan
* @date 2019年4月23日
*/
public interface DelayMessageDao {

/**
* 延迟消息的TopicTraffic记录
*/
@Select("select create_time ,sum(count) as count from producer_total_stat where"
+ " producer in(select distinct producer from user_producer where tid=#{tid}) "
+ "and create_date = #{createDate} group by create_time")
public List<TopicTraffic> selectTopicTraffic(@Param("tid")long tid, @Param("createDate")int createDate);


/**
* 获取topic流量
*
* @param tid
* @param createDate
* @return
*/
@Select("select #{tid} as tid, create_time, sum(count) as count from producer_total_stat where"
+ " producer in(select distinct producer from user_producer where tid=#{tid}) "
+ "and create_date = #{createDate} and create_time = #{createTime} group by create_time")
public TopicTraffic selectByIdListDateTime(@Param("tid") long tid,
@Param("createDate") int createDate, @Param("createTime") String createTime);

/**
* 获取topic日流量
* @param tid
* @param createDate
* @return
*/
@Select("select #{tid} as tid, sum(count) as count from producer_total_stat where"
+ " producer in(select distinct producer from user_producer where tid=#{tid}) "
+ "and create_date = #{createDate}")
public TopicTraffic selectTotalTraffic(@Param("tid") long tid, @Param("createDate") int createDate);

}
Loading

0 comments on commit d4f4aca

Please sign in to comment.