From cba007225d31b494b75b6a774cfc010969d2c71b Mon Sep 17 00:00:00 2001 From: Dan Markham Date: Sat, 16 Mar 2013 12:28:33 -0700 Subject: [PATCH] Empty Queue should reset the InFlight count of consumers This will keep them insync after they try and fail to FIN messages that were InFlight when the Queue was Emptied. --- nsqd/channel.go | 4 ++++ nsqd/channel_test.go | 35 +++++++++++++++++++++++++++++++++++ nsqd/client_v2.go | 5 +++++ 3 files changed, 44 insertions(+) diff --git a/nsqd/channel.go b/nsqd/channel.go index 11a8af492..f74620aab 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -24,6 +24,7 @@ type Consumer interface { Close() error TimedOutMessage() Stats() ClientStats + Empty() } // Channel represents the concrete type for a NSQ channel (and also @@ -202,6 +203,9 @@ func (c *Channel) Empty() error { defer c.Unlock() c.initPQ() + for _, client := range c.clients { + client.Empty() + } for { select { diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 559d07788..8e8af4d52 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -118,3 +118,38 @@ func TestChannelEmpty(t *testing.T) { assert.Equal(t, len(channel.deferredPQ), 0) assert.Equal(t, channel.Depth(), int64(0)) } + +func TestChannelEmptyConsumer(t *testing.T) { + log.SetOutput(ioutil.Discard) + defer log.SetOutput(os.Stdout) + + tcpAddr, _ := mustStartNSQd(NewNsqdOptions()) + defer nsqd.Exit() + conn, _ := mustConnectNSQd(tcpAddr) + + topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix())) + topic := nsqd.GetTopic(topicName) + channel := topic.GetChannel("channel") + client := NewClientV2(conn) + client.SetReadyCount(25) + channel.AddClient(client) + + for i := 0; i < 25; i++ { + msg := nsq.NewMessage(<-nsqd.idChan, []byte("test")) + channel.StartInFlightTimeout(msg, client) + client.SendingMessage() + } + + for _, cl := range channel.clients { + stats := cl.Stats() + assert.Equal(t, stats.InFlightCount, int64(25)) + } + + channel.Empty() + + for _, cl := range channel.clients { + stats := cl.Stats() + assert.Equal(t, stats.InFlightCount, int64(0)) + } + +} diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index c2abb299b..166a68a8c 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -123,6 +123,11 @@ func (c *ClientV2) FinishedMessage() { c.tryUpdateReadyState() } +func (c *ClientV2) Empty() { + atomic.StoreInt64(&c.InFlightCount, 0) + c.tryUpdateReadyState() +} + func (c *ClientV2) SendingMessage() { atomic.AddInt64(&c.ReadyCount, -1) atomic.AddInt64(&c.InFlightCount, 1)