From 2b32309fbaf39e4cf5a700bb9545ce0f2c39c2c0 Mon Sep 17 00:00:00 2001 From: zero Date: Sat, 3 Nov 2018 18:59:04 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=99=E5=85=A5=E9=98=BB=E5=A1=9E=E5=81=87?= =?UTF-8?q?=E6=AD=BBbug=20=E6=96=B0=E5=A2=9EContinueCount=3D80,ContinueCou?= =?UTF-8?q?ntSleepTime=3D2=20=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AllQueueOp.go | 59 +++++++++++++++++++++++++++++++++++++++--------- README.MD | 5 ++++ SingleConsume.go | 18 ++++++++------- SingleSend.go | 40 ++++++++++++++++++++++++++------ Start.go | 5 +++- 5 files changed, 100 insertions(+), 27 deletions(-) diff --git a/AllQueueOp.go b/AllQueueOp.go index 4607715..5c061ae 100755 --- a/AllQueueOp.go +++ b/AllQueueOp.go @@ -33,7 +33,21 @@ func AllQueueOp(key string,config map[string]string,resultDataChan chan *Result) log.Println(key,"AllQueueOp start",AllStartTime) + var ContinueCount int + var ContinueCountSleepTime int + if _,ok:= config["ContinueCount"];ok{ + ContinueCount = GetIntDefault(config["ContinueCount"],0) + } + + if _,ok:= config["ContinueCountSleepTime"];ok{ + ContinueCountSleepTime = GetIntDefault(config["ContinueCountSleepTime"],0) + } + + for keyI,qInfo := range *qList{ + if keyI >0 && ContinueCount > 0 && keyI % ContinueCount == 0{ + time.Sleep(time.Duration(ContinueCountSleepTime) * time.Second) + } m := make(map[string]string) if qInfo.Vhost == "/"{ m["Uri"] = "amqp://"+AmqpAdmin+":"+AmqpPwd+"@"+AmqpUri+"/" @@ -119,23 +133,43 @@ func AllQueueOp(key string,config map[string]string,resultDataChan chan *Result) if NeedWaitCount == 0{ return } + loop: for{ - data := <- ResultChan - ResultData.ConnectSuccess += data.ConnectSuccess - ResultData.ConnectFail += data.ConnectFail - ResultData.ChannelSuccess += data.ChannelSuccess - ResultData.ChanneFail += data.ChanneFail - ResultData.WriteSuccess += data.WriteSuccess - ResultData.WriteFail += data.WriteFail - ResultData.CosumeSuccess += data.CosumeSuccess - OverCount++ - if OverCount >= NeedWaitCount{ + select { + case data := <-ResultChan: + ResultData.ConnectSuccess += data.ConnectSuccess + ResultData.ConnectFail += data.ConnectFail + ResultData.ChannelSuccess += data.ChannelSuccess + ResultData.ChanneFail += data.ChanneFail + ResultData.WriteSuccess += data.WriteSuccess + ResultData.WriteFail += data.WriteFail + ResultData.CosumeSuccess += data.CosumeSuccess + OverCount++ + if OverCount >= NeedWaitCount { + break loop + } + case <-time.After(100 * time.Second): + AllEndTime := time.Now().UnixNano() / 1e6 + fmt.Println(" ") + UseTime := int(AllEndTime-AllStartTime) + log.Println(key,"AllQueueOp end",AllEndTime," had use time(ms):",UseTime) + fmt.Println("ConnectSuccess:",ResultData.ConnectSuccess) + fmt.Println("ConnectFail:",ResultData.ConnectFail) + fmt.Println("ChannelSuccess:",ResultData.ChannelSuccess) + fmt.Println("ChanneFail:",ResultData.ChanneFail) + fmt.Println("WriteSuccess:",ResultData.WriteSuccess) + fmt.Println("WriteFail:",ResultData.WriteFail) + fmt.Println("CosumeSuccess:",ResultData.CosumeSuccess) + fmt.Println("Write QPS:",ResultData.WriteSuccess/UseTime*1000) + fmt.Println("Consume QPS:",ResultData.CosumeSuccess/UseTime*1000) break } } AllEndTime := time.Now().UnixNano() / 1e6 fmt.Println(" ") - log.Println(key,"AllQueueOp end",AllEndTime," time(ms):",AllEndTime-AllStartTime) + UseTime := int(AllEndTime-AllStartTime) + + log.Println(key,"AllQueueOp end",AllEndTime," time(ms):",UseTime) fmt.Println("ConnectSuccess:",ResultData.ConnectSuccess) fmt.Println("ConnectFail:",ResultData.ConnectFail) fmt.Println("ChannelSuccess:",ResultData.ChannelSuccess) @@ -143,4 +177,7 @@ func AllQueueOp(key string,config map[string]string,resultDataChan chan *Result) fmt.Println("WriteSuccess:",ResultData.WriteSuccess) fmt.Println("WriteFail:",ResultData.WriteFail) fmt.Println("CosumeSuccess:",ResultData.CosumeSuccess) + fmt.Println("Write QPS:",ResultData.WriteSuccess/UseTime*1000) + fmt.Println("Consume QPS:",ResultData.CosumeSuccess/UseTime*1000) + } diff --git a/README.MD b/README.MD index 9978978..55d3261 100644 --- a/README.MD +++ b/README.MD @@ -102,6 +102,11 @@ AmqpPwd=admintest ConnectCount=1 ConsumeTimeOut=20 +#起启多少个队列消费后,sleep [ContinueCountSleepTime] s,0代表不sleep +ContinueCount=80 +ContinueCountSleepTime=2 + + [allQueueWriteAndConsume] #多队列同时写与消费操作,请参考all_write 和 all_consume 参数 #这里唯一不同的主是 CosumeConnectCount ,WriteConnectCount 分别表示每个队列的消费者数量和每个队列写入开启多少个连接,和 ConnectCount同义,但这里配置 ConnectCount 无效 diff --git a/SingleConsume.go b/SingleConsume.go index eced342..8bf4b06 100755 --- a/SingleConsume.go +++ b/SingleConsume.go @@ -35,7 +35,7 @@ func SingleConsume(key string,config map[string]string,resultDataChan chan *Resu OverCount := 0 NeedWaitCount := ConnectCount - ResultChan := make(chan int,NeedWaitCount) + ResultChan := make(chan int,NeedWaitCount+2) SingleStartTime := time.Now().UnixNano() / 1e6 log.Println(key,"SingleConsume start",SingleStartTime) @@ -78,7 +78,7 @@ func SingleConsume(key string,config map[string]string,resultDataChan chan *Resu select { case d :=<-msgs: if AutoAck == 1{ - d.Ack(true) + d.Ack(false) } HadCosumeCount++ if HadCosumeCount >= ConsumeCount && ConsumeCount > 0 { @@ -100,12 +100,14 @@ func SingleConsume(key string,config map[string]string,resultDataChan chan *Resu }(i) } - for{ - HadCosumeCount := <-ResultChan - ResultData.CosumeSuccess+=HadCosumeCount - OverCount++ - if OverCount >= NeedWaitCount{ - break + if NeedWaitCount >0{ + for{ + HadCosumeCount := <-ResultChan + ResultData.CosumeSuccess+=HadCosumeCount + OverCount++ + if OverCount >= NeedWaitCount{ + break + } } } SingleEndTime := time.Now().UnixNano() / 1e6 diff --git a/SingleSend.go b/SingleSend.go index 822f3d0..ce8b6c6 100755 --- a/SingleSend.go +++ b/SingleSend.go @@ -84,14 +84,40 @@ func SingleSend(key string,config map[string]string,resultDataChan chan *Result) } } - for{ - FailCount := <-ResultChan - ResultData.WriteFail += FailCount - ResultData.WriteSuccess += ChanneWriteCount-FailCount - OverCount++ - if OverCount >= NeedWaitCount{ - break + if NeedWaitCount > 0 { + for{ + FailCount := <-ResultChan + ResultData.WriteFail += FailCount + ResultData.WriteSuccess += ChanneWriteCount - FailCount + OverCount++ + if OverCount >= NeedWaitCount { + break + } + } + /* + loop: + for { + select { + case FailCount := <-ResultChan: + ResultData.WriteFail += FailCount + ResultData.WriteSuccess += ChanneWriteCount - FailCount + OverCount++ + if OverCount >= NeedWaitCount { + break loop + } + case <-time.After(100 * time.Second): + log.Println(key, "single Send time after,had use time:", time.Now().UnixNano()/1e6-SendStartTime) + fmt.Println("ConnectSuccess:", ResultData.ConnectSuccess) + fmt.Println("ConnectFail:", ResultData.ConnectFail) + fmt.Println("ChannelSuccess:", ResultData.ChannelSuccess) + fmt.Println("ChanneFail:", ResultData.ChanneFail) + fmt.Println("WriteSuccess:", ResultData.WriteSuccess) + fmt.Println("WriteFail:", ResultData.WriteFail) + fmt.Println("CosumeSuccess:", ResultData.CosumeSuccess) + break + } } + */ } SendEndTime := time.Now().UnixNano() / 1e6 log.Println(key,"SingleSend end",SendEndTime," time(ms):",SendEndTime-SendStartTime) diff --git a/Start.go b/Start.go index 9f75d4e..8775737 100755 --- a/Start.go +++ b/Start.go @@ -107,7 +107,8 @@ func Start(){ if HacBackCount >= NeenBackCount{ TestEndTime := time.Now().UnixNano() / 1e6 fmt.Println(" ") - log.Println("Test Over:",TestEndTime,"Use Time(ms):",TestEndTime-TestStartTime) + UseTime := int(TestEndTime-TestStartTime) + log.Println("Test Over:",TestEndTime,"Use Time(ms):",UseTime) fmt.Println("ConnectSuccess:",ResultData.ConnectSuccess) fmt.Println("ConnectFail:",ResultData.ConnectFail) fmt.Println("ChannelSuccess:",ResultData.ChannelSuccess) @@ -115,6 +116,8 @@ func Start(){ fmt.Println("WriteSuccess:",ResultData.WriteSuccess) fmt.Println("WriteFail:",ResultData.WriteFail) fmt.Println("CosumeSuccess:",ResultData.CosumeSuccess) + fmt.Println("Write QPS:",ResultData.WriteSuccess/UseTime*1000) + fmt.Println("Consume QPS:",ResultData.CosumeSuccess/UseTime*1000) os.Exit(0) } }