Skip to content

Commit

Permalink
Merge pull request #617 from mreiferson/bench_fixes_617
Browse files Browse the repository at this point in the history
nsqd: bench=BenchmarkProtocolV2Sub256 panics
  • Loading branch information
mreiferson committed Aug 7, 2015
2 parents 133292c + 2327e28 commit 2460a4f
Showing 1 changed file with 5 additions and 9 deletions.
14 changes: 5 additions & 9 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func readValidate(t *testing.T, conn io.Reader, f int32, d string) []byte {
resp, err := nsq.ReadResponse(conn)
equal(t, err, nil)
frameType, data, err := nsq.UnpackResponse(resp)
t.Logf("%v %s %s", frameType, data, err)
equal(t, err, nil)
equal(t, frameType, f)
equal(t, string(data), d)
Expand Down Expand Up @@ -1582,7 +1581,7 @@ func subWorker(n int, workers int, tcpAddr *net.TCPAddr, topicName string, rdyCh
if err != nil {
panic(err.Error())
}
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriterSize(conn, 65536))

identify(nil, conn, nil, frameTypeResponse)
sub(nil, conn, topicName, "ch")
Expand All @@ -1593,8 +1592,6 @@ func subWorker(n int, workers int, tcpAddr *net.TCPAddr, topicName string, rdyCh
nsq.Ready(rdyCount).WriteTo(rw)
rw.Flush()
num := n / workers
numRdy := num/rdyCount - 1
rdy := rdyCount
for i := 0; i < num; i++ {
resp, err := nsq.ReadResponse(rw)
if err != nil {
Expand All @@ -1612,11 +1609,10 @@ func subWorker(n int, workers int, tcpAddr *net.TCPAddr, topicName string, rdyCh
panic(err.Error())
}
nsq.Finish(nsq.MessageID(msg.ID)).WriteTo(rw)
rdy--
if rdy == 0 && numRdy > 0 {
nsq.Ready(rdyCount).WriteTo(rw)
rdy = rdyCount
numRdy--
if (i+1)%rdyCount == 0 || i+1 == num {
if i+1 == num {
nsq.Ready(0).WriteTo(conn)
}
rw.Flush()
}
}
Expand Down

0 comments on commit 2460a4f

Please sign in to comment.