diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 65218456a..95672f57a 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -358,9 +358,6 @@ func (n *NSQD) LoadMetadata() error { continue } topic := n.GetTopic(t.Name) - if t.Paused { - topic.Pause() - } for _, c := range t.Channels { if !protocol.IsValidChannelName(c.Name) { @@ -372,6 +369,12 @@ func (n *NSQD) LoadMetadata() error { channel.Pause() } } + + // this logic is reversed, and done _after_ channel creation to ensure that all channels + // are created before messages begin flowing + if !t.Paused { + topic.UnPause() + } } return nil } @@ -515,6 +518,13 @@ func (n *NSQD) GetTopic(topicName string) *Topic { t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t + // if we're creating a new topic in this process while we're loading from metadata, the topic may already + // have message data persisted on disk. To ensure that all channels are created before message flow + // begins, we pause the topic. + if atomic.LoadInt32(&n.isLoading) == 1 { + t.Pause() + } + n.logf(LOG_INFO, "TOPIC(%s): created", t.name) // release our global nsqd lock, and switch to a more granular topic lock while we init our