Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: empty queue does not empty deferred/inflight #144

Merged
merged 1 commit into from
Feb 5, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 88 additions & 37 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,30 +85,30 @@ func NewChannel(topicName string, channelName string, options *nsqdOptions,
notifier Notifier, deleteCallback func(*Channel)) *Channel {
// backend names, for uniqueness, automatically include the topic... <topic>:<channel>
backendName := topicName + ":" + channelName
pqSize := int(math.Max(1, float64(options.memQueueSize)/10))
c := &Channel{
topicName: topicName,
name: channelName,
incomingMsgChan: make(chan *nsq.Message, 1),
memoryMsgChan: make(chan *nsq.Message, options.memQueueSize),
clientMsgChan: make(chan *nsq.Message),
exitChan: make(chan int),
clients: make([]Consumer, 0, 5),
inFlightMessages: make(map[nsq.MessageID]*pqueue.Item),
inFlightPQ: pqueue.New(pqSize),
deferredMessages: make(map[nsq.MessageID]*pqueue.Item),
deferredPQ: pqueue.New(pqSize),
deleteCallback: deleteCallback,
notifier: notifier,
options: options,
topicName: topicName,
name: channelName,
incomingMsgChan: make(chan *nsq.Message, 1),
memoryMsgChan: make(chan *nsq.Message, options.memQueueSize),
clientMsgChan: make(chan *nsq.Message),
exitChan: make(chan int),
clients: make([]Consumer, 0, 5),
deleteCallback: deleteCallback,
notifier: notifier,
options: options,
}

c.initPQ()

if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeralChannel = true
c.backend = NewDummyBackendQueue()
} else {
c.backend = NewDiskQueue(backendName, options.dataPath, options.maxBytesPerFile, options.syncEvery)
}

go c.messagePump()

c.waitGroup.Wrap(func() { c.router() })
c.waitGroup.Wrap(func() { c.deferredWorker() })
c.waitGroup.Wrap(func() { c.inFlightWorker() })
Expand All @@ -118,6 +118,21 @@ func NewChannel(topicName string, channelName string, options *nsqdOptions,
return c
}

func (c *Channel) initPQ() {
pqSize := int(math.Max(1, float64(c.options.memQueueSize)/10))

c.inFlightMessages = make(map[nsq.MessageID]*pqueue.Item)
c.deferredMessages = make(map[nsq.MessageID]*pqueue.Item)

c.inFlightMutex.Lock()
c.inFlightPQ = pqueue.New(pqSize)
c.inFlightMutex.Unlock()

c.deferredMutex.Lock()
c.deferredPQ = pqueue.New(pqSize)
c.deferredMutex.Unlock()
}

// Exiting returns a boolean indicating if this channel is closed/exiting
func (c *Channel) Exiting() bool {
return atomic.LoadInt32(&c.exitFlag) == 1
Expand Down Expand Up @@ -166,44 +181,80 @@ func (c *Channel) exit(deleted bool) error {

if deleted {
// empty the queue (deletes the backend files, too)
EmptyQueue(c)
c.Empty()
} else {
// messagePump is responsible for closing the channel it writes to
// this will read until its closed (exited)
for msg := range c.clientMsgChan {
log.Printf("CHANNEL(%s): recovered buffered message from clientMsgChan", c.name)
WriteMessageToBackend(&msgBuf, msg, c)
WriteMessageToBackend(&msgBuf, msg, c.backend)
}

// write anything leftover to disk
if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
log.Printf("CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend",
c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
}
FlushQueue(c)
c.flush()
}

return c.backend.Close()
}

// MemoryChan implements the Queue interface
func (c *Channel) MemoryChan() chan *nsq.Message {
return c.memoryMsgChan
}
func (c *Channel) Empty() error {
c.Lock()
defer c.Unlock()

// BackendQueue implements the Queue interface
func (c *Channel) BackendQueue() BackendQueue {
return c.backend
}
c.initPQ()

// InFlight implements the Queue interface
func (c *Channel) InFlight() map[nsq.MessageID]*pqueue.Item {
return c.inFlightMessages
for {
select {
case <-c.memoryMsgChan:
default:
goto finish
}
}

finish:
return c.backend.Empty()
}

// Deferred implements the Queue interface
func (c *Channel) Deferred() map[nsq.MessageID]*pqueue.Item {
return c.deferredMessages
// flush persists all the messages in internal memory buffers to the backend
// it does not drain inflight/deferred because it is only called in Close()
func (c *Channel) flush() error {
var msgBuf bytes.Buffer

if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
log.Printf("CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend",
c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
}

for {
select {
case msg := <-c.memoryMsgChan:
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
default:
goto finish
}
}

finish:
for _, item := range c.inFlightMessages {
msg := item.Value.(*inFlightMessage).msg
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
}

for _, item := range c.deferredMessages {
msg := item.Value.(*nsq.Message)
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
}

return nil
}

func (c *Channel) Depth() int64 {
Expand Down Expand Up @@ -476,7 +527,7 @@ func (c *Channel) router() {
select {
case c.memoryMsgChan <- msg:
default:
err := WriteMessageToBackend(&msgBuf, msg, c)
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("CHANNEL(%s) ERROR: failed to write message to backend - %s", c.name, err.Error())
// theres not really much we can do at this point, you're certainly
Expand Down
37 changes: 36 additions & 1 deletion nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func TestInFlightWorker(t *testing.T) {
nsqd := NewNSQd(1, options)
defer nsqd.Exit()

topic := nsqd.GetTopic("topic")
topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")

for i := 0; i < 1000; i++ {
Expand All @@ -83,3 +84,37 @@ func TestInFlightWorker(t *testing.T) {
assert.Equal(t, len(channel.inFlightMessages), 0)
assert.Equal(t, len(channel.inFlightPQ), 0)
}

func TestChannelEmpty(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

nsqd := NewNSQd(1, NewNsqdOptions())
defer nsqd.Exit()

topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
client := NewClientV2(nil)

msgs := make([]*nsq.Message, 0, 25)
for i := 0; i < 25; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test"))
channel.StartInFlightTimeout(msg, client)
msgs = append(msgs, msg)
}

channel.RequeueMessage(client, msgs[len(msgs)-1].Id, 100*time.Millisecond)
assert.Equal(t, len(channel.inFlightMessages), 24)
assert.Equal(t, len(channel.inFlightPQ), 24)
assert.Equal(t, len(channel.deferredMessages), 1)
assert.Equal(t, len(channel.deferredPQ), 1)

channel.Empty()

assert.Equal(t, len(channel.inFlightMessages), 0)
assert.Equal(t, len(channel.inFlightPQ), 0)
assert.Equal(t, len(channel.deferredMessages), 0)
assert.Equal(t, len(channel.deferredPQ), 0)
assert.Equal(t, channel.Depth(), int64(0))
}
2 changes: 1 addition & 1 deletion nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func emptyChannelHandler(w http.ResponseWriter, req *http.Request) {
return
}

err = EmptyQueue(channel)
err = channel.Empty()
if err != nil {
util.ApiResponse(w, 500, "INTERNAL_ERROR", nil)
return
Expand Down
61 changes: 2 additions & 59 deletions nsqd/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package main

import (
"../nsq"
"../util/pqueue"
"bytes"
"log"
)

// BackendQueue represents the behavior for the secondary message
Expand All @@ -17,13 +15,6 @@ type BackendQueue interface {
Empty() error
}

type Queue interface {
MemoryChan() chan *nsq.Message
BackendQueue() BackendQueue
InFlight() map[nsq.MessageID]*pqueue.Item
Deferred() map[nsq.MessageID]*pqueue.Item
}

type DummyBackendQueue struct {
readChan chan []byte
}
Expand Down Expand Up @@ -52,61 +43,13 @@ func (d *DummyBackendQueue) Empty() error {
return nil
}

func EmptyQueue(q Queue) error {
for {
select {
case <-q.MemoryChan():
default:
goto disk
}
}

disk:
return q.BackendQueue().Empty()
}

func FlushQueue(q Queue) error {
var msgBuf bytes.Buffer

for {
select {
case msg := <-q.MemoryChan():
err := WriteMessageToBackend(&msgBuf, msg, q)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
default:
goto finish
}
}

finish:
for _, item := range q.InFlight() {
msg := item.Value.(*inFlightMessage).msg
err := WriteMessageToBackend(&msgBuf, msg, q)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
}

for _, item := range q.Deferred() {
msg := item.Value.(*nsq.Message)
err := WriteMessageToBackend(&msgBuf, msg, q)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
}

return nil
}

func WriteMessageToBackend(buf *bytes.Buffer, msg *nsq.Message, q Queue) error {
func WriteMessageToBackend(buf *bytes.Buffer, msg *nsq.Message, bq BackendQueue) error {
buf.Reset()
err := msg.Write(buf)
if err != nil {
return err
}
err = q.BackendQueue().Put(buf.Bytes())
err = bq.Put(buf.Bytes())
if err != nil {
return err
}
Expand Down
Loading