Skip to content

Commit

Permalink
nsqd: replace missing requeue exit check; cleanup doRequeue
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Jan 1, 2017
1 parent d497852 commit 538ab6b
Showing 1 changed file with 7 additions and 15 deletions.
22 changes: 7 additions & 15 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (c *Channel) IsPaused() bool {
func (c *Channel) PutMessage(m *Message) error {
c.RLock()
defer c.RUnlock()
if atomic.LoadInt32(&c.exitFlag) == 1 {
if c.Exiting() {
return errors.New("exiting")
}
err := c.put(m)
Expand Down Expand Up @@ -364,7 +364,10 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura

if timeout == 0 {
c.exitMutex.RLock()
err := c.doRequeue(msg)
if c.Exiting() {
return errors.New("exiting")
}
err := c.put(msg)
c.exitMutex.RUnlock()
return err
}
Expand Down Expand Up @@ -425,17 +428,6 @@ func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) erro
return nil
}

// doRequeue performs the low level operations to requeue a message
//
// Callers of this method need to ensure that a simultaneous exit will not occur
func (c *Channel) doRequeue(m *Message) error {
err := c.put(m)
if err != nil {
return err
}
return nil
}

// pushInFlightMessage atomically adds a message to the in-flight dictionary
func (c *Channel) pushInFlightMessage(msg *Message) error {
c.inFlightMutex.Lock()
Expand Down Expand Up @@ -540,7 +532,7 @@ func (c *Channel) processDeferredQueue(t int64) bool {
if err != nil {
goto exit
}
c.doRequeue(msg)
c.put(msg)
}

exit:
Expand Down Expand Up @@ -577,7 +569,7 @@ func (c *Channel) processInFlightQueue(t int64) bool {
if ok {
client.TimedOutMessage()
}
c.doRequeue(msg)
c.put(msg)
}

exit:
Expand Down

0 comments on commit 538ab6b

Please sign in to comment.