diff --git a/nsqd/channel.go b/nsqd/channel.go index 1e0675d6a..72699e5fd 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -305,6 +305,11 @@ func (c *Channel) put(m *Message) error { return nil } +func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) { + atomic.AddUint64(&c.messageCount, 1) + c.StartDeferredTimeout(msg, timeout) +} + // TouchMessage resets the timeout for an in-flight message func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error { msg, err := c.popInFlightMessage(clientID, id) @@ -355,6 +360,7 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura return err } c.removeFromInFlightPQ(msg) + atomic.AddUint64(&c.requeueCount, 1) if timeout == 0 { c.exitMutex.RLock() @@ -427,7 +433,6 @@ func (c *Channel) doRequeue(m *Message) error { if err != nil { return err } - atomic.AddUint64(&c.requeueCount, 1) return nil } diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 20da61104..1f847cc1a 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -20,6 +20,7 @@ import ( "runtime" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -587,6 +588,7 @@ func TestDPUB(t *testing.T) { numDef := len(ch.deferredMessages) ch.deferredMutex.Unlock() test.Equal(t, 1, numDef) + test.Equal(t, 1, int(atomic.LoadUint64(&ch.messageCount))) // duration out of range nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn) @@ -1328,6 +1330,7 @@ func TestClientMsgTimeout(t *testing.T) { topicName := "test_cmsg_timeout" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) + ch := topic.GetChannel("ch") msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) topic.PutMessage(msg) @@ -1345,6 +1348,9 @@ func TestClientMsgTimeout(t *testing.T) { }, frameTypeResponse) sub(t, conn, topicName, "ch") + test.Equal(t, 0, int(atomic.LoadUint64(&ch.timeoutCount))) + test.Equal(t, 0, int(atomic.LoadUint64(&ch.requeueCount))) + _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) @@ -1359,6 +1365,9 @@ func TestClientMsgTimeout(t *testing.T) { time.Sleep(1100 * time.Millisecond) + test.Equal(t, 1, int(atomic.LoadUint64(&ch.timeoutCount))) + test.Equal(t, 0, int(atomic.LoadUint64(&ch.requeueCount))) + _, err = nsq.Finish(nsq.MessageID(msgOut.ID)).WriteTo(conn) test.Nil(t, err) diff --git a/nsqd/topic.go b/nsqd/topic.go index 62f604e28..7d0e886e5 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -275,7 +275,7 @@ func (t *Topic) messagePump() { chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { - channel.StartDeferredTimeout(chanMsg, chanMsg.deferred) + channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } err := channel.PutMessage(chanMsg)