Skip to content

Commit

Permalink
nsqd: save both old and new metadata filenames
Browse files Browse the repository at this point in the history
and when loading, if both exist, ensure they match

this makes rolling-back possible without losing messages
  • Loading branch information
ploxiln committed Jan 5, 2017
1 parent 960f2b8 commit e1cd409
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 18 deletions.
7 changes: 5 additions & 2 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,11 @@ func (p *program) Start() error {
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)

nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
Expand Down
60 changes: 44 additions & 16 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand Down Expand Up @@ -267,32 +268,45 @@ type meta struct {
} `json:"topics"`
}

func (n *NSQD) LoadMetadata() {
func (n *NSQD) LoadMetadata() error {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)

fn := path.Join(n.getOpts().DataPath, "nsqd.dat")
oldFn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
// old metadata filename with ID, maintained in parallel to enable roll-back
fnID := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)

data, err := ioutil.ReadFile(fn)
if err != nil {
if !os.IsNotExist(err) {
n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err)
return fmt.Errorf("failed to read channel metadata from %s - %s", fn, err)
}
data, err = ioutil.ReadFile(oldFn)
if err != nil {
if !os.IsNotExist(err) {
n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err)
}
return
}

dataID, errID := ioutil.ReadFile(fnID)
if errID != nil {
if !os.IsNotExist(errID) {
return fmt.Errorf("failed to read channel metadata from %s - %s", fnID, errID)
}
}

if err != nil && errID != nil {
return nil
}
if err == nil && errID == nil {
if bytes.Compare(data, dataID) != 0 {
return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)
}
}
if err != nil {
fn = fnID
data = dataID
}

var m meta
err = json.Unmarshal(data, &m)
if err != nil {
n.logf("ERROR: failed to parse metadata - %s", err)
return
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
}

for _, t := range m.Topics {
Expand All @@ -316,13 +330,15 @@ func (n *NSQD) LoadMetadata() {
}
}
}
return nil
}

func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have
// so that upon restart we can get back to the same state
fileName := path.Join(n.getOpts().DataPath, "nsqd.dat")
oldFileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
// old metadata filename with ID, maintained in parallel to enable roll-back
fileNameID := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
n.logf("NSQ: persisting topic/channel metadata to %s", fileName)

js := make(map[string]interface{})
Expand Down Expand Up @@ -365,7 +381,19 @@ func (n *NSQD) PersistMetadata() error {
if err != nil {
return err
}
_, err = f.Write(data)
if err != nil {
f.Close()
return err
}
f.Sync()
f.Close()

tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())
f, err = os.OpenFile(tmpFileNameID, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
_, err = f.Write(data)
if err != nil {
f.Close()
Expand All @@ -378,11 +406,11 @@ func (n *NSQD) PersistMetadata() error {
if err != nil {
return err
}

err = os.Remove(oldFileName)
if err != nil && !os.IsNotExist(err) {
n.logf("NSQ: WARNING: failed to delete old metadata file %s: %s", oldFileName, err)
err = atomicRename(tmpFileNameID, fileNameID)
if err != nil {
return err
}
// technically should fsync DataPath here

return nil
}
Expand Down

0 comments on commit e1cd409

Please sign in to comment.