-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nsqd: message loss during topic/channel bootstrap from metadata #1032
Conversation
nsqd/nsqd.go
Outdated
@@ -372,6 +372,7 @@ func (n *NSQD) LoadMetadata() error { | |||
channel.Pause() | |||
} | |||
} | |||
topic.UnPause() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Above, you'll see that paused state is stored in the topic metadata, so it does:
if t.Paused {
topic.Pause()
}
So you can remove that now, but here only un-pause if the topic was not originally paused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, i will correct it
nsqd/nsqd.go
Outdated
@@ -515,6 +516,10 @@ func (n *NSQD) GetTopic(topicName string) *Topic { | |||
t = NewTopic(topicName, &context{n}, deleteCallback) | |||
n.topicMap[topicName] = t | |||
|
|||
if atomic.LoadInt32(&n.isLoading) == 0 { | |||
t.UnPause() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can move this below the channels block, as an alternative to taking the topic lock to block PutMessages() while getting channels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channelMap
in getOrCreateChannel
needs the topic's lock . I am not sure if we can do this~
// this expects the caller to handle locking
func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
channel, ok := t.channelMap[channelName]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right - you'd call t.GetChannel()
instead. But this is still tentative supposition :)
I wonder if we should just create a |
90a4f63
to
6505605
Compare
@mreiferson Creating a |
It will be a slightly bigger change, but I feel like this code is a little too clever and surprising. We could comment why the ordering is what it is, but I feel like I'd rather have slightly more code that's clear in its intent? What do you think @ploxiln? |
I think the |
It looks like this changed since I last viewed it - instead of initializing a Topic with Anyway, I think this is a fine minimal fix-up, and further refactoring can be proposed in a separate PR. @mreiferson? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after adding these comments, thanks!
nsqd/nsqd.go
Outdated
@@ -515,6 +515,10 @@ func (n *NSQD) GetTopic(topicName string) *Topic { | |||
t = NewTopic(topicName, &context{n}, deleteCallback) | |||
n.topicMap[topicName] = t | |||
|
|||
if atomic.LoadInt32(&n.isLoading) == 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a comment above this line that reads:
// if we're creating a new topic in this process while we're loading from metadata, the topic may already
// have message data persisted on disk. To ensure that all channels are created before message flow
// begins, we pause the topic.
nsqd/nsqd.go
Outdated
@@ -372,6 +369,9 @@ func (n *NSQD) LoadMetadata() error { | |||
channel.Pause() | |||
} | |||
} | |||
if !t.Paused { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And a comment above this line:
// this logic is reversed, and done _after_ channel creation to ensure that all channels
// are created before messages begin flowing
6505605
to
1055073
Compare
@mreiferson done |
Thanks for squashing down to one commit. The final little issue is that the commit title is not descriptive of the overall change. I suggest:
|
1055073
to
07f8e2f
Compare
@ploxiln OK, done |
Thanks! |
#1031
I am not sure will this introduce other bugs~
This is what the pr do:
Initialize topic with
pause=1
, so when startt.messagePump
goroutine,it won't read msg out.In the method
GetTopic
,afterNewTopic
return, checkn.isLoading
to see if it is loading from metadata file(restart). If not, unpause it immediately, else, unpause it when all channels are created inLoadMetadata
.NewTopic
is only called byGetTopic
, so the change should not affect other code.