Skip to content

Commit

Permalink
nsqd: pause topics while loading metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelyou committed May 22, 2018
1 parent ef7fc2e commit 07f8e2f
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,6 @@ func (n *NSQD) LoadMetadata() error {
continue
}
topic := n.GetTopic(t.Name)
if t.Paused {
topic.Pause()
}

for _, c := range t.Channels {
if !protocol.IsValidChannelName(c.Name) {
Expand All @@ -372,6 +369,12 @@ func (n *NSQD) LoadMetadata() error {
channel.Pause()
}
}

// this logic is reversed, and done _after_ channel creation to ensure that all channels
// are created before messages begin flowing
if !t.Paused {
topic.UnPause()
}
}
return nil
}
Expand Down Expand Up @@ -515,6 +518,13 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
t = NewTopic(topicName, &context{n}, deleteCallback)
n.topicMap[topicName] = t

// if we're creating a new topic in this process while we're loading from metadata, the topic may already
// have message data persisted on disk. To ensure that all channels are created before message flow
// begins, we pause the topic.
if atomic.LoadInt32(&n.isLoading) == 1 {
t.Pause()
}

n.logf(LOG_INFO, "TOPIC(%s): created", t.name)

// release our global nsqd lock, and switch to a more granular topic lock while we init our
Expand Down

0 comments on commit 07f8e2f

Please sign in to comment.