Skip to content

Commit

Permalink
update MPUB to conform to protocol spec
Browse files Browse the repository at this point in the history
  • Loading branch information
dustismo committed Jan 23, 2013
1 parent 6680504 commit 26baf6e
Showing 1 changed file with 7 additions and 16 deletions.
23 changes: 7 additions & 16 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net"
"sync/atomic"
"time"
"strconv"
)

const maxTimeout = time.Hour
Expand Down Expand Up @@ -472,11 +473,11 @@ func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) {

func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) {
var err error
var bodyLen int32
var numMessages int32

var numMessages int
var messageSize int32

if len(params) < 2 {
if len(params) < 3 {
return nil, nsq.NewFatalClientErr("E_MISSING_PARAMS", "insufficient number of parameters")
}

Expand All @@ -486,23 +487,13 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("topic name '%s' is not valid", topicName))
}

err = binary.Read(client.Reader, binary.BigEndian, &bodyLen)
numMessages, err = strconv.Atoi(string(params[2]))
if err != nil {
return nil, nsq.NewFatalClientErr("E_BAD_BODY", err.Error())
}

if int64(bodyLen) > nsqd.options.maxBodySize {
return nil, nsq.NewFatalClientErr("E_BODY_TOO_BIG",
fmt.Sprintf("body too big %d > %d", bodyLen, nsqd.options.maxBodySize))
}

err = binary.Read(client.Reader, binary.BigEndian, &numMessages)
if err != nil {
return nil, nsq.NewFatalClientErr("E_BAD_BODY", err.Error())
return nil, nsq.NewFatalClientErr("E_MISSING_PARAMS", "invalid number of messages")
}

messages := make([]*nsq.Message, 0, numMessages)
for i := int32(0); i < numMessages; i++ {
for i := 0; i < numMessages; i++ {
err = binary.Read(client.Reader, binary.BigEndian, &messageSize)
if err != nil {
return nil, nsq.NewFatalClientErr("E_BAD_BODY", err.Error())
Expand Down

0 comments on commit 26baf6e

Please sign in to comment.