diff --git a/nsqd/http.go b/nsqd/http.go index 4969f8f77..b5da451f4 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -243,19 +243,31 @@ func (s *httpServer) doPUB(req *http.Request) (interface{}, error) { return nil, http_api.Err{500, "INTERNAL_ERROR"} } if int64(len(body)) == readMax { - s.ctx.nsqd.logf("ERROR: /put hit max message size") return nil, http_api.Err{413, "MSG_TOO_BIG"} } if len(body) == 0 { return nil, http_api.Err{400, "MSG_EMPTY"} } - _, topic, err := s.getTopicFromQuery(req) + reqParams, topic, err := s.getTopicFromQuery(req) if err != nil { return nil, err } + var deferred time.Duration + if ds, ok := reqParams["defer"]; ok { + di, err := strconv.ParseInt(ds[0], 10, 64) + if err != nil { + return nil, http_api.Err{400, "INVALID_DEFER"} + } + deferred = time.Duration(di) * time.Millisecond + if deferred < 0 || deferred > s.ctx.nsqd.opts.MaxReqTimeout { + return nil, http_api.Err{400, "INVALID_DEFER"} + } + } + msg := NewMessage(<-s.ctx.nsqd.idChan, body) + msg.deferred = deferred err = topic.PutMessage(msg) if err != nil { return nil, http_api.Err{503, "EXITING"} diff --git a/nsqd/http_test.go b/nsqd/http_test.go index 3744ba44c..087a0cc54 100644 --- a/nsqd/http_test.go +++ b/nsqd/http_test.go @@ -154,6 +154,33 @@ func TestHTTPmputBinary(t *testing.T) { equal(t, topic.Depth(), int64(5)) } +func TestHTTPpubDefer(t *testing.T) { + opts := NewNSQDOptions() + opts.Logger = newTestLogger(t) + _, httpAddr, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + topicName := "test_http_pub_defer" + strconv.Itoa(int(time.Now().Unix())) + topic := nsqd.GetTopic(topicName) + ch := topic.GetChannel("ch") + + buf := bytes.NewBuffer([]byte("test message")) + url := fmt.Sprintf("http://%s/pub?topic=%s&defer=%d", httpAddr, topicName, 1000) + resp, err := http.Post(url, "application/octet-stream", buf) + equal(t, err, nil) + defer resp.Body.Close() + body, _ := ioutil.ReadAll(resp.Body) + equal(t, string(body), "OK") + + time.Sleep(5 * time.Millisecond) + + ch.Lock() + numDef := len(ch.deferredMessages) + ch.Unlock() + equal(t, numDef, 1) +} + func TestHTTPSRequire(t *testing.T) { opts := NewNSQDOptions() opts.Logger = newTestLogger(t) diff --git a/nsqd/message.go b/nsqd/message.go index 2ce39a71c..67d8db86a 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -24,6 +24,7 @@ type Message struct { clientID int64 pri int64 index int + deferred time.Duration } func NewMessage(id MessageID, body []byte) *Message { diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 6c56c1f8f..768dbab9d 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -185,6 +185,8 @@ func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { return p.PUB(client, params) case bytes.Equal(params[0], []byte("MPUB")): return p.MPUB(client, params) + case bytes.Equal(params[0], []byte("DPUB")): + return p.DPUB(client, params) case bytes.Equal(params[0], []byte("NOP")): return p.NOP(client, params) case bytes.Equal(params[0], []byte("TOUCH")): @@ -817,6 +819,68 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { return okBytes, nil } +func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { + var err error + + if len(params) < 3 { + return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "DPUB insufficient number of parameters") + } + + topicName := string(params[1]) + if !protocol.IsValidTopicName(topicName) { + return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", + fmt.Sprintf("DPUB topic name %q is not valid", topicName)) + } + + timeoutMs, err := protocol.ByteToBase10(params[2]) + if err != nil { + return nil, protocol.NewFatalClientErr(err, "E_INVALID", + fmt.Sprintf("DPUB could not parse timeout %s", params[2])) + } + timeoutDuration := time.Duration(timeoutMs) * time.Millisecond + + if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.opts.MaxReqTimeout { + return nil, protocol.NewFatalClientErr(nil, "E_INVALID", + fmt.Sprintf("DPUB timeout %d out of range 0-%d", + timeoutMs, p.ctx.nsqd.opts.MaxReqTimeout/time.Millisecond)) + } + + bodyLen, err := readLen(client.Reader, client.lenSlice) + if err != nil { + return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "DPUB failed to read message body size") + } + + if bodyLen <= 0 { + return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", + fmt.Sprintf("DPUB invalid message body size %d", bodyLen)) + } + + if int64(bodyLen) > p.ctx.nsqd.opts.MaxMsgSize { + return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", + fmt.Sprintf("DPUB message too big %d > %d", bodyLen, p.ctx.nsqd.opts.MaxMsgSize)) + } + + messageBody := make([]byte, bodyLen) + _, err = io.ReadFull(client.Reader, messageBody) + if err != nil { + return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "DPUB failed to read message body") + } + + if err := p.CheckAuth(client, "DPUB", topicName, ""); err != nil { + return nil, err + } + + topic := p.ctx.nsqd.GetTopic(topicName) + msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody) + msg.deferred = timeoutDuration + err = topic.PutMessage(msg) + if err != nil { + return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error()) + } + + return okBytes, nil +} + func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) { state := atomic.LoadInt32(&client.State) if state != stateSubscribed && state != stateClosing { diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 15b917410..331736b96 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -553,6 +553,42 @@ func TestSizeLimits(t *testing.T) { equal(t, string(data), fmt.Sprintf("E_BAD_MESSAGE MPUB message too big 101 > 100")) } +func TestDPUB(t *testing.T) { + opts := NewNSQDOptions() + opts.Logger = newTestLogger(t) + opts.Verbose = true + tcpAddr, _, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + conn, err := mustConnectNSQD(tcpAddr) + equal(t, err, nil) + defer conn.Close() + + topicName := "test_dpub_v2" + strconv.Itoa(int(time.Now().Unix())) + + identify(t, conn, nil, frameTypeResponse) + sub(t, conn, topicName, "ch") + + // valid + nsq.DeferredPublish(topicName, time.Second, make([]byte, 100)).WriteTo(conn) + resp, _ := nsq.ReadResponse(conn) + frameType, data, _ := nsq.UnpackResponse(resp) + t.Logf("frameType: %d, data: %s", frameType, data) + equal(t, frameType, frameTypeResponse) + equal(t, data, []byte("OK")) + + equal(t, len(nsqd.GetTopic(topicName).GetChannel("ch").deferredMessages), 1) + + // duration out of range + nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn) + resp, _ = nsq.ReadResponse(conn) + frameType, data, _ = nsq.UnpackResponse(resp) + t.Logf("frameType: %d, data: %s", frameType, data) + equal(t, frameType, frameTypeError) + equal(t, string(data), fmt.Sprintf("E_INVALID DPUB timeout 3600100 out of range 0-3600000")) +} + func TestTouch(t *testing.T) { opts := NewNSQDOptions() opts.Logger = newTestLogger(t) diff --git a/nsqd/topic.go b/nsqd/topic.go index 84863e23d..da614ac84 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -270,6 +270,11 @@ func (t *Topic) messagePump() { if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp + chanMsg.deferred = msg.deferred + } + if chanMsg.deferred != 0 { + channel.StartDeferredTimeout(chanMsg, chanMsg.deferred) + continue } err := channel.PutMessage(chanMsg) if err != nil {