diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index 54ba531ec..b2606a2ca 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -218,8 +218,11 @@ func (p *program) Start() error { options.Resolve(opts, flagSet, cfg) nsqd := nsqd.New(opts) - nsqd.LoadMetadata() - err := nsqd.PersistMetadata() + err := nsqd.LoadMetadata() + if err != nil { + log.Fatalf("ERROR: %s", err.Error()) + } + err = nsqd.PersistMetadata() if err != nil { log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 53eb34aea..65bfaf98f 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -1,6 +1,7 @@ package nsqd import ( + "bytes" "crypto/tls" "crypto/x509" "encoding/json" @@ -267,32 +268,45 @@ type meta struct { } `json:"topics"` } -func (n *NSQD) LoadMetadata() { +func (n *NSQD) LoadMetadata() error { atomic.StoreInt32(&n.isLoading, 1) defer atomic.StoreInt32(&n.isLoading, 0) fn := path.Join(n.getOpts().DataPath, "nsqd.dat") - oldFn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) + // old metadata filename with ID, maintained in parallel to enable roll-back + fnID := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) data, err := ioutil.ReadFile(fn) if err != nil { if !os.IsNotExist(err) { - n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err) + return fmt.Errorf("failed to read channel metadata from %s - %s", fn, err) } - data, err = ioutil.ReadFile(oldFn) - if err != nil { - if !os.IsNotExist(err) { - n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err) - } - return + } + + dataID, errID := ioutil.ReadFile(fnID) + if errID != nil { + if !os.IsNotExist(errID) { + return fmt.Errorf("failed to read channel metadata from %s - %s", fnID, errID) } } + if err != nil && errID != nil { + return nil + } + if err == nil && errID == nil { + if bytes.Compare(data, dataID) != 0 { + return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID) + } + } + if err != nil { + fn = fnID + data = dataID + } + var m meta err = json.Unmarshal(data, &m) if err != nil { - n.logf("ERROR: failed to parse metadata - %s", err) - return + return fmt.Errorf("failed to parse metadata in %s - %s", fn, err) } for _, t := range m.Topics { @@ -316,13 +330,15 @@ func (n *NSQD) LoadMetadata() { } } } + return nil } func (n *NSQD) PersistMetadata() error { // persist metadata about what topics/channels we have // so that upon restart we can get back to the same state fileName := path.Join(n.getOpts().DataPath, "nsqd.dat") - oldFileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) + // old metadata filename with ID, maintained in parallel to enable roll-back + fileNameID := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) n.logf("NSQ: persisting topic/channel metadata to %s", fileName) js := make(map[string]interface{}) @@ -365,7 +381,19 @@ func (n *NSQD) PersistMetadata() error { if err != nil { return err } + _, err = f.Write(data) + if err != nil { + f.Close() + return err + } + f.Sync() + f.Close() + tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int()) + f, err = os.OpenFile(tmpFileNameID, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } _, err = f.Write(data) if err != nil { f.Close() @@ -378,11 +406,11 @@ func (n *NSQD) PersistMetadata() error { if err != nil { return err } - - err = os.Remove(oldFileName) - if err != nil && !os.IsNotExist(err) { - n.logf("NSQ: WARNING: failed to delete old metadata file %s: %s", oldFileName, err) + err = atomicRename(tmpFileNameID, fileNameID) + if err != nil { + return err } + // technically should fsync DataPath here return nil }