Skip to content

Commit

Permalink
nsqd: more intuitive channel message accounting
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Dec 28, 2016
1 parent bdb2919 commit 48294b8
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 2 deletions.
7 changes: 6 additions & 1 deletion nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ func (c *Channel) put(m *Message) error {
return nil
}

func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
atomic.AddUint64(&c.messageCount, 1)
c.StartDeferredTimeout(msg, timeout)
}

// TouchMessage resets the timeout for an in-flight message
func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error {
msg, err := c.popInFlightMessage(clientID, id)
Expand Down Expand Up @@ -355,6 +360,7 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura
return err
}
c.removeFromInFlightPQ(msg)
atomic.AddUint64(&c.requeueCount, 1)

if timeout == 0 {
c.exitMutex.RLock()
Expand Down Expand Up @@ -427,7 +433,6 @@ func (c *Channel) doRequeue(m *Message) error {
if err != nil {
return err
}
atomic.AddUint64(&c.requeueCount, 1)
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"runtime"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -587,6 +588,7 @@ func TestDPUB(t *testing.T) {
numDef := len(ch.deferredMessages)
ch.deferredMutex.Unlock()
test.Equal(t, 1, numDef)
test.Equal(t, 1, int(atomic.LoadUint64(&ch.messageCount)))

// duration out of range
nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn)
Expand Down Expand Up @@ -1328,6 +1330,7 @@ func TestClientMsgTimeout(t *testing.T) {

topicName := "test_cmsg_timeout" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
ch := topic.GetChannel("ch")
msg := NewMessage(<-nsqd.idChan, make([]byte, 100))
topic.PutMessage(msg)

Expand All @@ -1345,6 +1348,9 @@ func TestClientMsgTimeout(t *testing.T) {
}, frameTypeResponse)
sub(t, conn, topicName, "ch")

test.Equal(t, 0, int(atomic.LoadUint64(&ch.timeoutCount)))
test.Equal(t, 0, int(atomic.LoadUint64(&ch.requeueCount)))

_, err = nsq.Ready(1).WriteTo(conn)
test.Nil(t, err)

Expand All @@ -1359,6 +1365,9 @@ func TestClientMsgTimeout(t *testing.T) {

time.Sleep(1100 * time.Millisecond)

test.Equal(t, 1, int(atomic.LoadUint64(&ch.timeoutCount)))
test.Equal(t, 0, int(atomic.LoadUint64(&ch.requeueCount)))

_, err = nsq.Finish(nsq.MessageID(msgOut.ID)).WriteTo(conn)
test.Nil(t, err)

Expand Down
2 changes: 1 addition & 1 deletion nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (t *Topic) messagePump() {
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
Expand Down

0 comments on commit 48294b8

Please sign in to comment.