From af6312986ab9947fd4a72b25e4c8ec65416fa52b Mon Sep 17 00:00:00 2001 From: Stephen Searles Date: Thu, 22 Dec 2016 15:49:05 -0800 Subject: [PATCH] nsqd: fixing race with sending messages and topic/channel associations changing --- nsqd/topic.go | 93 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 36 deletions(-) diff --git a/nsqd/topic.go b/nsqd/topic.go index 62f604e28..90e36f1f6 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -226,8 +226,28 @@ func (t *Topic) messagePump() { backendChan = t.backend.ReadChan() } + doUpdate := func() { + chans = chans[:0] + for _, c := range t.channelMap { + chans = append(chans, c) + } + if len(chans) == 0 || t.IsPaused() { + memoryMsgChan = nil + backendChan = nil + } else { + memoryMsgChan = t.memoryMsgChan + backendChan = t.backend.ReadChan() + } + } + for { select { + case <-t.channelUpdateChan: + t.RLock() + doUpdate() + t.RUnlock() + continue + case msg = <-memoryMsgChan: case buf = <-backendChan: msg, err = decodeMessage(buf) @@ -235,21 +255,7 @@ func (t *Topic) messagePump() { t.ctx.nsqd.logf("ERROR: failed to decode message - %s", err) continue } - case <-t.channelUpdateChan: - chans = chans[:0] - t.RLock() - for _, c := range t.channelMap { - chans = append(chans, c) - } - t.RUnlock() - if len(chans) == 0 || t.IsPaused() { - memoryMsgChan = nil - backendChan = nil - } else { - memoryMsgChan = t.memoryMsgChan - backendChan = t.backend.ReadChan() - } - continue + case pause := <-t.pauseChan: if pause || len(chans) == 0 { memoryMsgChan = nil @@ -263,34 +269,49 @@ func (t *Topic) messagePump() { goto exit } - for i, channel := range chans { - chanMsg := msg - // copy the message because each channel - // needs a unique instance but... - // fastpath to avoid copy if its the first channel - // (the topic already created the first copy) - if i > 0 { - chanMsg = NewMessage(msg.ID, msg.Body) - chanMsg.Timestamp = msg.Timestamp - chanMsg.deferred = msg.deferred - } - if chanMsg.deferred != 0 { - channel.StartDeferredTimeout(chanMsg, chanMsg.deferred) - continue - } - err := channel.PutMessage(chanMsg) - if err != nil { - t.ctx.nsqd.logf( - "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", - t.name, msg.ID, channel.name, err) - } + // check for channel changes one last time before + // sending out a message + + t.RLock() + select { + case <-t.channelUpdateChan: + doUpdate() + default: } + + t.pushMessage(chans, msg) + t.RUnlock() } exit: t.ctx.nsqd.logf("TOPIC(%s): closing ... messagePump", t.name) } +func (t *Topic) pushMessage(chans []*Channel, msg *Message) { + for i, channel := range chans { + chanMsg := msg + // copy the message because each channel + // needs a unique instance but... + // fastpath to avoid copy if its the first channel + // (the topic already created the first copy) + if i > 0 { + chanMsg = NewMessage(msg.ID, msg.Body) + chanMsg.Timestamp = msg.Timestamp + chanMsg.deferred = msg.deferred + } + if chanMsg.deferred != 0 { + channel.StartDeferredTimeout(chanMsg, chanMsg.deferred) + return + } + err := channel.PutMessage(chanMsg) + if err != nil { + t.ctx.nsqd.logf( + "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", + t.name, msg.ID, channel.name, err) + } + } +} + // Delete empties the topic and all its channels and closes func (t *Topic) Delete() error { return t.exit(true)