diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index e8de561bd..5530526b8 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -292,22 +292,26 @@ func (n *NSQd) GetExistingTopic(topicName string) (*Topic, error) { // DeleteExistingTopic removes a topic only if it exists func (n *NSQd) DeleteExistingTopic(topicName string) error { - n.Lock() + n.RLock() topic, ok := n.topicMap[topicName] if !ok { - n.Unlock() + n.RUnlock() return errors.New("topic does not exist") } - delete(n.topicMap, topicName) - // not defered so that we can continue while the topic async closes - n.Unlock() - - log.Printf("TOPIC(%s): deleting", topic.name) + n.RUnlock() // delete empties all channels and the topic itself before closing // (so that we dont leave any messages around) + // + // we do this before removing the topic from map below (with no lock) + // so that any incoming writes will error and not create a new topic + // to enforce ordering topic.Delete() + n.Lock() + delete(n.topicMap, topicName) + n.Unlock() + return nil } diff --git a/nsqd/topic.go b/nsqd/topic.go index 646579600..c22030535 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -219,13 +219,10 @@ func (t *Topic) router() { // Delete empties the topic and all its channels and closes func (t *Topic) Delete() error { - err := t.exit(true) - // since we are explicitly deleting a topic (not just at system exit time) - // de-register this from the lookupd - go t.notifier.Notify(t) - return err + return t.exit(true) } +// Close persists all outstanding topic data and closes all its channels func (t *Topic) Close() error { return t.exit(false) } @@ -235,7 +232,15 @@ func (t *Topic) exit(deleted bool) error { return errors.New("exiting") } - log.Printf("TOPIC(%s): closing", t.name) + if deleted { + log.Printf("TOPIC(%s): deleting", t.name) + + // since we are explicitly deleting a topic (not just at system exit time) + // de-register this from the lookupd + go t.notifier.Notify(t) + } else { + log.Printf("TOPIC(%s): closing", t.name) + } close(t.exitChan)