From 07f8e2f7136911703b550c69e518e0e1fdb44e11 Mon Sep 17 00:00:00 2001 From: michaelyou Date: Tue, 22 May 2018 10:50:35 +0800 Subject: [PATCH] nsqd: pause topics while loading metadata --- nsqd/nsqd.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 65218456a..95672f57a 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -358,9 +358,6 @@ func (n *NSQD) LoadMetadata() error { continue } topic := n.GetTopic(t.Name) - if t.Paused { - topic.Pause() - } for _, c := range t.Channels { if !protocol.IsValidChannelName(c.Name) { @@ -372,6 +369,12 @@ func (n *NSQD) LoadMetadata() error { channel.Pause() } } + + // this logic is reversed, and done _after_ channel creation to ensure that all channels + // are created before messages begin flowing + if !t.Paused { + topic.UnPause() + } } return nil } @@ -515,6 +518,13 @@ func (n *NSQD) GetTopic(topicName string) *Topic { t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t + // if we're creating a new topic in this process while we're loading from metadata, the topic may already + // have message data persisted on disk. To ensure that all channels are created before message flow + // begins, we pause the topic. + if atomic.LoadInt32(&n.isLoading) == 1 { + t.Pause() + } + n.logf(LOG_INFO, "TOPIC(%s): created", t.name) // release our global nsqd lock, and switch to a more granular topic lock while we init our