Skip to content

Commit

Permalink
写入阻塞假死bug
Browse files Browse the repository at this point in the history
新增ContinueCount=80,ContinueCountSleepTime=2 参数
  • Loading branch information
jc3wish committed Nov 3, 2018
1 parent 41e2248 commit 2b32309
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 27 deletions.
59 changes: 48 additions & 11 deletions AllQueueOp.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,21 @@ func AllQueueOp(key string,config map[string]string,resultDataChan chan *Result)

log.Println(key,"AllQueueOp start",AllStartTime)

var ContinueCount int
var ContinueCountSleepTime int
if _,ok:= config["ContinueCount"];ok{
ContinueCount = GetIntDefault(config["ContinueCount"],0)
}

if _,ok:= config["ContinueCountSleepTime"];ok{
ContinueCountSleepTime = GetIntDefault(config["ContinueCountSleepTime"],0)
}


for keyI,qInfo := range *qList{
if keyI >0 && ContinueCount > 0 && keyI % ContinueCount == 0{
time.Sleep(time.Duration(ContinueCountSleepTime) * time.Second)
}
m := make(map[string]string)
if qInfo.Vhost == "/"{
m["Uri"] = "amqp://"+AmqpAdmin+":"+AmqpPwd+"@"+AmqpUri+"/"
Expand Down Expand Up @@ -119,28 +133,51 @@ func AllQueueOp(key string,config map[string]string,resultDataChan chan *Result)
if NeedWaitCount == 0{
return
}
loop:
for{
data := <- ResultChan
ResultData.ConnectSuccess += data.ConnectSuccess
ResultData.ConnectFail += data.ConnectFail
ResultData.ChannelSuccess += data.ChannelSuccess
ResultData.ChanneFail += data.ChanneFail
ResultData.WriteSuccess += data.WriteSuccess
ResultData.WriteFail += data.WriteFail
ResultData.CosumeSuccess += data.CosumeSuccess
OverCount++
if OverCount >= NeedWaitCount{
select {
case data := <-ResultChan:
ResultData.ConnectSuccess += data.ConnectSuccess
ResultData.ConnectFail += data.ConnectFail
ResultData.ChannelSuccess += data.ChannelSuccess
ResultData.ChanneFail += data.ChanneFail
ResultData.WriteSuccess += data.WriteSuccess
ResultData.WriteFail += data.WriteFail
ResultData.CosumeSuccess += data.CosumeSuccess
OverCount++
if OverCount >= NeedWaitCount {
break loop
}
case <-time.After(100 * time.Second):
AllEndTime := time.Now().UnixNano() / 1e6
fmt.Println(" ")
UseTime := int(AllEndTime-AllStartTime)
log.Println(key,"AllQueueOp end",AllEndTime," had use time(ms):",UseTime)
fmt.Println("ConnectSuccess:",ResultData.ConnectSuccess)
fmt.Println("ConnectFail:",ResultData.ConnectFail)
fmt.Println("ChannelSuccess:",ResultData.ChannelSuccess)
fmt.Println("ChanneFail:",ResultData.ChanneFail)
fmt.Println("WriteSuccess:",ResultData.WriteSuccess)
fmt.Println("WriteFail:",ResultData.WriteFail)
fmt.Println("CosumeSuccess:",ResultData.CosumeSuccess)
fmt.Println("Write QPS:",ResultData.WriteSuccess/UseTime*1000)
fmt.Println("Consume QPS:",ResultData.CosumeSuccess/UseTime*1000)
break
}
}
AllEndTime := time.Now().UnixNano() / 1e6
fmt.Println(" ")
log.Println(key,"AllQueueOp end",AllEndTime," time(ms):",AllEndTime-AllStartTime)
UseTime := int(AllEndTime-AllStartTime)

log.Println(key,"AllQueueOp end",AllEndTime," time(ms):",UseTime)
fmt.Println("ConnectSuccess:",ResultData.ConnectSuccess)
fmt.Println("ConnectFail:",ResultData.ConnectFail)
fmt.Println("ChannelSuccess:",ResultData.ChannelSuccess)
fmt.Println("ChanneFail:",ResultData.ChanneFail)
fmt.Println("WriteSuccess:",ResultData.WriteSuccess)
fmt.Println("WriteFail:",ResultData.WriteFail)
fmt.Println("CosumeSuccess:",ResultData.CosumeSuccess)
fmt.Println("Write QPS:",ResultData.WriteSuccess/UseTime*1000)
fmt.Println("Consume QPS:",ResultData.CosumeSuccess/UseTime*1000)

}
5 changes: 5 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ AmqpPwd=admintest
ConnectCount=1
ConsumeTimeOut=20
#起启多少个队列消费后,sleep [ContinueCountSleepTime] s,0代表不sleep
ContinueCount=80
ContinueCountSleepTime=2
[allQueueWriteAndConsume]
#多队列同时写与消费操作,请参考all_write 和 all_consume 参数
#这里唯一不同的主是 CosumeConnectCount ,WriteConnectCount 分别表示每个队列的消费者数量和每个队列写入开启多少个连接,和 ConnectCount同义,但这里配置 ConnectCount 无效
Expand Down
18 changes: 10 additions & 8 deletions SingleConsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func SingleConsume(key string,config map[string]string,resultDataChan chan *Resu

OverCount := 0
NeedWaitCount := ConnectCount
ResultChan := make(chan int,NeedWaitCount)
ResultChan := make(chan int,NeedWaitCount+2)

SingleStartTime := time.Now().UnixNano() / 1e6
log.Println(key,"SingleConsume start",SingleStartTime)
Expand Down Expand Up @@ -78,7 +78,7 @@ func SingleConsume(key string,config map[string]string,resultDataChan chan *Resu
select {
case d :=<-msgs:
if AutoAck == 1{
d.Ack(true)
d.Ack(false)
}
HadCosumeCount++
if HadCosumeCount >= ConsumeCount && ConsumeCount > 0 {
Expand All @@ -100,12 +100,14 @@ func SingleConsume(key string,config map[string]string,resultDataChan chan *Resu
}(i)
}

for{
HadCosumeCount := <-ResultChan
ResultData.CosumeSuccess+=HadCosumeCount
OverCount++
if OverCount >= NeedWaitCount{
break
if NeedWaitCount >0{
for{
HadCosumeCount := <-ResultChan
ResultData.CosumeSuccess+=HadCosumeCount
OverCount++
if OverCount >= NeedWaitCount{
break
}
}
}
SingleEndTime := time.Now().UnixNano() / 1e6
Expand Down
40 changes: 33 additions & 7 deletions SingleSend.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,40 @@ func SingleSend(key string,config map[string]string,resultDataChan chan *Result)
}
}

for{
FailCount := <-ResultChan
ResultData.WriteFail += FailCount
ResultData.WriteSuccess += ChanneWriteCount-FailCount
OverCount++
if OverCount >= NeedWaitCount{
break
if NeedWaitCount > 0 {
for{
FailCount := <-ResultChan
ResultData.WriteFail += FailCount
ResultData.WriteSuccess += ChanneWriteCount - FailCount
OverCount++
if OverCount >= NeedWaitCount {
break
}
}
/*
loop:
for {
select {
case FailCount := <-ResultChan:
ResultData.WriteFail += FailCount
ResultData.WriteSuccess += ChanneWriteCount - FailCount
OverCount++
if OverCount >= NeedWaitCount {
break loop
}
case <-time.After(100 * time.Second):
log.Println(key, "single Send time after,had use time:", time.Now().UnixNano()/1e6-SendStartTime)
fmt.Println("ConnectSuccess:", ResultData.ConnectSuccess)
fmt.Println("ConnectFail:", ResultData.ConnectFail)
fmt.Println("ChannelSuccess:", ResultData.ChannelSuccess)
fmt.Println("ChanneFail:", ResultData.ChanneFail)
fmt.Println("WriteSuccess:", ResultData.WriteSuccess)
fmt.Println("WriteFail:", ResultData.WriteFail)
fmt.Println("CosumeSuccess:", ResultData.CosumeSuccess)
break
}
}
*/
}
SendEndTime := time.Now().UnixNano() / 1e6
log.Println(key,"SingleSend end",SendEndTime," time(ms):",SendEndTime-SendStartTime)
Expand Down
5 changes: 4 additions & 1 deletion Start.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,17 @@ func Start(){
if HacBackCount >= NeenBackCount{
TestEndTime := time.Now().UnixNano() / 1e6
fmt.Println(" ")
log.Println("Test Over:",TestEndTime,"Use Time(ms):",TestEndTime-TestStartTime)
UseTime := int(TestEndTime-TestStartTime)
log.Println("Test Over:",TestEndTime,"Use Time(ms):",UseTime)
fmt.Println("ConnectSuccess:",ResultData.ConnectSuccess)
fmt.Println("ConnectFail:",ResultData.ConnectFail)
fmt.Println("ChannelSuccess:",ResultData.ChannelSuccess)
fmt.Println("ChanneFail:",ResultData.ChanneFail)
fmt.Println("WriteSuccess:",ResultData.WriteSuccess)
fmt.Println("WriteFail:",ResultData.WriteFail)
fmt.Println("CosumeSuccess:",ResultData.CosumeSuccess)
fmt.Println("Write QPS:",ResultData.WriteSuccess/UseTime*1000)
fmt.Println("Consume QPS:",ResultData.CosumeSuccess/UseTime*1000)
os.Exit(0)
}
}
Expand Down

0 comments on commit 2b32309

Please sign in to comment.