Skip to content

Commit

Permalink
nsqd: add DPUB
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed May 3, 2015
1 parent 9e74e8c commit 8786215
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 2 deletions.
16 changes: 14 additions & 2 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,31 @@ func (s *httpServer) doPUB(req *http.Request) (interface{}, error) {
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
if int64(len(body)) == readMax {
s.ctx.nsqd.logf("ERROR: /put hit max message size")
return nil, http_api.Err{413, "MSG_TOO_BIG"}
}
if len(body) == 0 {
return nil, http_api.Err{400, "MSG_EMPTY"}
}

_, topic, err := s.getTopicFromQuery(req)
reqParams, topic, err := s.getTopicFromQuery(req)
if err != nil {
return nil, err
}

var deferred time.Duration
if ds, ok := reqParams["defer"]; ok {
di, err := strconv.ParseInt(ds[0], 10, 64)
if err != nil {
return nil, http_api.Err{400, "INVALID_DEFER"}
}
deferred = time.Duration(di) * time.Millisecond
if deferred < 0 || deferred > s.ctx.nsqd.opts.MaxReqTimeout {
return nil, http_api.Err{400, "INVALID_DEFER"}
}
}

msg := NewMessage(<-s.ctx.nsqd.idChan, body)
msg.deferred = deferred
err = topic.PutMessage(msg)
if err != nil {
return nil, http_api.Err{503, "EXITING"}
Expand Down
27 changes: 27 additions & 0 deletions nsqd/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,33 @@ func TestHTTPmputBinary(t *testing.T) {
equal(t, topic.Depth(), int64(5))
}

func TestHTTPpubDefer(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_pub_defer" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
ch := topic.GetChannel("ch")

buf := bytes.NewBuffer([]byte("test message"))
url := fmt.Sprintf("http://%s/pub?topic=%s&defer=%d", httpAddr, topicName, 1000)
resp, err := http.Post(url, "application/octet-stream", buf)
equal(t, err, nil)
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
equal(t, string(body), "OK")

time.Sleep(5 * time.Millisecond)

ch.Lock()
numDef := len(ch.deferredMessages)
ch.Unlock()
equal(t, numDef, 1)
}

func TestHTTPSRequire(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
Expand Down
1 change: 1 addition & 0 deletions nsqd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Message struct {
clientID int64
pri int64
index int
deferred time.Duration
}

func NewMessage(id MessageID, body []byte) *Message {
Expand Down
64 changes: 64 additions & 0 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
return p.PUB(client, params)
case bytes.Equal(params[0], []byte("MPUB")):
return p.MPUB(client, params)
case bytes.Equal(params[0], []byte("DPUB")):
return p.DPUB(client, params)
case bytes.Equal(params[0], []byte("NOP")):
return p.NOP(client, params)
case bytes.Equal(params[0], []byte("TOUCH")):
Expand Down Expand Up @@ -817,6 +819,68 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
return okBytes, nil
}

func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
var err error

if len(params) < 3 {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "DPUB insufficient number of parameters")
}

topicName := string(params[1])
if !protocol.IsValidTopicName(topicName) {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
fmt.Sprintf("DPUB topic name %q is not valid", topicName))
}

timeoutMs, err := protocol.ByteToBase10(params[2])
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_INVALID",
fmt.Sprintf("DPUB could not parse timeout %s", params[2]))
}
timeoutDuration := time.Duration(timeoutMs) * time.Millisecond

if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.opts.MaxReqTimeout {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID",
fmt.Sprintf("DPUB timeout %d out of range 0-%d",
timeoutMs, p.ctx.nsqd.opts.MaxReqTimeout/time.Millisecond))
}

bodyLen, err := readLen(client.Reader, client.lenSlice)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "DPUB failed to read message body size")
}

if bodyLen <= 0 {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
fmt.Sprintf("DPUB invalid message body size %d", bodyLen))
}

if int64(bodyLen) > p.ctx.nsqd.opts.MaxMsgSize {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
fmt.Sprintf("DPUB message too big %d > %d", bodyLen, p.ctx.nsqd.opts.MaxMsgSize))
}

messageBody := make([]byte, bodyLen)
_, err = io.ReadFull(client.Reader, messageBody)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "DPUB failed to read message body")
}

if err := p.CheckAuth(client, "DPUB", topicName, ""); err != nil {
return nil, err
}

topic := p.ctx.nsqd.GetTopic(topicName)
msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
msg.deferred = timeoutDuration
err = topic.PutMessage(msg)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error())
}

return okBytes, nil
}

func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) {
state := atomic.LoadInt32(&client.State)
if state != stateSubscribed && state != stateClosing {
Expand Down
36 changes: 36 additions & 0 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,42 @@ func TestSizeLimits(t *testing.T) {
equal(t, string(data), fmt.Sprintf("E_BAD_MESSAGE MPUB message too big 101 > 100"))
}

func TestDPUB(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
opts.Verbose = true
tcpAddr, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

conn, err := mustConnectNSQD(tcpAddr)
equal(t, err, nil)
defer conn.Close()

topicName := "test_dpub_v2" + strconv.Itoa(int(time.Now().Unix()))

identify(t, conn, nil, frameTypeResponse)
sub(t, conn, topicName, "ch")

// valid
nsq.DeferredPublish(topicName, time.Second, make([]byte, 100)).WriteTo(conn)
resp, _ := nsq.ReadResponse(conn)
frameType, data, _ := nsq.UnpackResponse(resp)
t.Logf("frameType: %d, data: %s", frameType, data)
equal(t, frameType, frameTypeResponse)
equal(t, data, []byte("OK"))

equal(t, len(nsqd.GetTopic(topicName).GetChannel("ch").deferredMessages), 1)

// duration out of range
nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn)
resp, _ = nsq.ReadResponse(conn)
frameType, data, _ = nsq.UnpackResponse(resp)
t.Logf("frameType: %d, data: %s", frameType, data)
equal(t, frameType, frameTypeError)
equal(t, string(data), fmt.Sprintf("E_INVALID DPUB timeout 3600100 out of range 0-3600000"))
}

func TestTouch(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
Expand Down
5 changes: 5 additions & 0 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ func (t *Topic) messagePump() {
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
Expand Down

0 comments on commit 8786215

Please sign in to comment.