Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: fixing race with sending messages and topic/channel associations changing #828

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 57 additions & 36 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,30 +226,36 @@ func (t *Topic) messagePump() {
backendChan = t.backend.ReadChan()
}

doUpdate := func() {
chans = chans[:0]
for _, c := range t.channelMap {
chans = append(chans, c)
}
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the crux of the problem was actually that the select statement here could possibly never choose this path, or more likely, choose one of the message chan paths before choosing this one.

backendChan = t.backend.ReadChan()
}
}

for {
select {
case <-t.channelUpdateChan:
t.RLock()
doUpdate()
t.RUnlock()
continue

case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
continue
}
case <-t.channelUpdateChan:
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue

case pause := <-t.pauseChan:
if pause || len(chans) == 0 {
memoryMsgChan = nil
Expand All @@ -263,34 +269,49 @@ func (t *Topic) messagePump() {
goto exit
}

for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
// check for channel changes one last time before
// sending out a message

t.RLock()
select {
case <-t.channelUpdateChan:
doUpdate()
default:
}

t.pushMessage(chans, msg)
t.RUnlock()
}

exit:
t.ctx.nsqd.logf("TOPIC(%s): closing ... messagePump", t.name)
}

func (t *Topic) pushMessage(chans []*Channel, msg *Message) {
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
return
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}

// Delete empties the topic and all its channels and closes
func (t *Topic) Delete() error {
return t.exit(true)
Expand Down