Skip to content

Commit

Permalink
Merge pull request #160 from dmarkham/empty_channel_testing
Browse files Browse the repository at this point in the history
nsqd: Empty Queue should reset the InFlight count of consumers
  • Loading branch information
mreiferson committed Mar 19, 2013
2 parents 2d1ee53 + cba0072 commit dc0af0d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
4 changes: 4 additions & 0 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Consumer interface {
Close() error
TimedOutMessage()
Stats() ClientStats
Empty()
}

// Channel represents the concrete type for a NSQ channel (and also
Expand Down Expand Up @@ -202,6 +203,9 @@ func (c *Channel) Empty() error {
defer c.Unlock()

c.initPQ()
for _, client := range c.clients {
client.Empty()
}

for {
select {
Expand Down
35 changes: 35 additions & 0 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,38 @@ func TestChannelEmpty(t *testing.T) {
assert.Equal(t, len(channel.deferredPQ), 0)
assert.Equal(t, channel.Depth(), int64(0))
}

func TestChannelEmptyConsumer(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

tcpAddr, _ := mustStartNSQd(NewNsqdOptions())
defer nsqd.Exit()
conn, _ := mustConnectNSQd(tcpAddr)

topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
client := NewClientV2(conn)
client.SetReadyCount(25)
channel.AddClient(client)

for i := 0; i < 25; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test"))
channel.StartInFlightTimeout(msg, client)
client.SendingMessage()
}

for _, cl := range channel.clients {
stats := cl.Stats()
assert.Equal(t, stats.InFlightCount, int64(25))
}

channel.Empty()

for _, cl := range channel.clients {
stats := cl.Stats()
assert.Equal(t, stats.InFlightCount, int64(0))
}

}
5 changes: 5 additions & 0 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (c *ClientV2) FinishedMessage() {
c.tryUpdateReadyState()
}

func (c *ClientV2) Empty() {
atomic.StoreInt64(&c.InFlightCount, 0)
c.tryUpdateReadyState()
}

func (c *ClientV2) SendingMessage() {
atomic.AddInt64(&c.ReadyCount, -1)
atomic.AddInt64(&c.InFlightCount, 1)
Expand Down

0 comments on commit dc0af0d

Please sign in to comment.