diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index e4267a292..0a161b88d 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -430,9 +430,17 @@ func (n *NSQD) Exit() { // GetTopic performs a thread safe operation // to return a pointer to a Topic object (potentially new) func (n *NSQD) GetTopic(topicName string) *Topic { + // most likely, we already have this topic, so try read lock first. + n.RLock() + t, ok := n.topicMap[topicName] + n.RUnlock() + if ok { + return t + } + n.Lock() - t, ok := n.topicMap[topicName] + t, ok = n.topicMap[topicName] if ok { n.Unlock() return t diff --git a/nsqd/topic.go b/nsqd/topic.go index 65d0b71f0..62f604e28 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -389,7 +389,13 @@ finish: func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile { var latencyStream *quantile.Quantile + t.RLock() + realChannels := make([]*Channel, 0, len(t.channelMap)) for _, c := range t.channelMap { + realChannels = append(realChannels, c) + } + t.RUnlock() + for _, c := range realChannels { if c.e2eProcessingLatencyStream == nil { continue }