From 538ab6b7571280972d377aada259e55e1e17c3e5 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 1 Jan 2017 13:10:25 -0800 Subject: [PATCH] nsqd: replace missing requeue exit check; cleanup doRequeue --- nsqd/channel.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index 72699e5fd..3bb5693a7 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -277,7 +277,7 @@ func (c *Channel) IsPaused() bool { func (c *Channel) PutMessage(m *Message) error { c.RLock() defer c.RUnlock() - if atomic.LoadInt32(&c.exitFlag) == 1 { + if c.Exiting() { return errors.New("exiting") } err := c.put(m) @@ -364,7 +364,10 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura if timeout == 0 { c.exitMutex.RLock() - err := c.doRequeue(msg) + if c.Exiting() { + return errors.New("exiting") + } + err := c.put(msg) c.exitMutex.RUnlock() return err } @@ -425,17 +428,6 @@ func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) erro return nil } -// doRequeue performs the low level operations to requeue a message -// -// Callers of this method need to ensure that a simultaneous exit will not occur -func (c *Channel) doRequeue(m *Message) error { - err := c.put(m) - if err != nil { - return err - } - return nil -} - // pushInFlightMessage atomically adds a message to the in-flight dictionary func (c *Channel) pushInFlightMessage(msg *Message) error { c.inFlightMutex.Lock() @@ -540,7 +532,7 @@ func (c *Channel) processDeferredQueue(t int64) bool { if err != nil { goto exit } - c.doRequeue(msg) + c.put(msg) } exit: @@ -577,7 +569,7 @@ func (c *Channel) processInFlightQueue(t int64) bool { if ok { client.TimedOutMessage() } - c.doRequeue(msg) + c.put(msg) } exit: