Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: renaming db happens after snapshot persists to wal and snap files #7876

Merged
merged 2 commits into from
May 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ type RaftTimer interface {
type apply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
raftDone <-chan struct{} // rx {} after raft has persisted messages
// notifyc synchronizes etcd server applies with the raft node
notifyc chan struct{}
}

type raftNode struct {
Expand Down Expand Up @@ -190,11 +191,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}
}

raftDone := make(chan struct{}, 1)
notifyc := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
raftDone: raftDone,
notifyc: notifyc,
}

updateCommittedIndex(&ap, rh)
Expand Down Expand Up @@ -227,6 +228,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
}
// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}

// gofail: var raftAfterSaveSnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
Expand All @@ -240,7 +244,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
msgs := r.processMessages(rd.Messages)

// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
raftDone <- struct{}{}
notifyc <- struct{}{}

// Candidate or follower needs to wait for all pending configuration
// changes to be applied before sending messages.
Expand All @@ -259,9 +263,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if waitApply {
// blocks until 'applyAll' calls 'applyWait.Trigger'
// to be in sync with scheduled config-change job
// (assume raftDone has cap of 1)
// (assume notifyc has cap of 1)
select {
case raftDone <- struct{}{}:
case notifyc <- struct{}{}:
case <-r.stopped:
return
}
Expand All @@ -271,7 +275,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
raftDone <- struct{}{}
notifyc <- struct{}{}
}

r.Advance()
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestConfgChangeBlocksApply(t *testing.T) {
}

// finish apply, unblock raft routine
<-ap.raftDone
<-ap.notifyc

select {
case <-continueC:
Expand Down
25 changes: 10 additions & 15 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,20 +274,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
beExist := fileutil.Exist(bepath)

var be backend.Backend
beOpened := make(chan struct{})
go func() {
be = newBackend(bepath, cfg.QuotaBackendBytes)
beOpened <- struct{}{}
}()

select {
case <-beOpened:
case <-time.After(time.Second):
plog.Warningf("another etcd process is running with the same data dir and holding the file lock.")
plog.Warningf("waiting for it to exit before starting...")
<-beOpened
}
be := openBackend(bepath, cfg.QuotaBackendBytes)

defer func() {
if err != nil {
Expand Down Expand Up @@ -385,6 +372,11 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
plog.Panicf("recovered store from snapshot error: %v", err)
}
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably move this part to a new func.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

be, err = checkAndRecoverDB(snapshot, be, cfg.QuotaBackendBytes, cfg.SnapDir())
if err != nil {
plog.Panicf("recovering backend from snapshot error: %v", err)
}
}
cfg.Print()
if !cfg.ForceNewCluster {
Expand Down Expand Up @@ -778,7 +770,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
// wait for the raft routine to finish the disk writes before triggering a
// snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine.
<-apply.raftDone
<-apply.notifyc

s.triggerSnapshot(ep)
select {
Expand All @@ -803,6 +795,9 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
apply.snapshot.Metadata.Index, ep.appliedi)
}

// wait for raftNode to persist snashot onto the disk
<-apply.notifyc

snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
if err != nil {
plog.Panicf("get database snapshot file path error: %v", err)
Expand Down
83 changes: 83 additions & 0 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,89 @@ func TestSnapshot(t *testing.T) {
<-ch
}

// TestSnapshotOrdering ensures raft persists snapshot onto disk before
// snapshot db is applied.
func TestSnapshotOrdering(t *testing.T) {
n := newNopReadyNode()
st := store.New()
cl := membership.NewCluster("abc")
cl.SetStore(st)

testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
if err != nil {
t.Fatalf("couldn't open tempdir (%v)", err)
}
defer os.RemoveAll(testdir)
if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
t.Fatalf("couldn't make snap dir (%v)", err)
}

rs := raft.NewMemoryStorage()
p := mockstorage.NewStorageRecorderStream(testdir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p := mockstorage.NewStorageRecorder(testdir)

otherwise hangs on failure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess not, NewStorageRecorder doesn't work with the fix...

tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
r := newRaftNode(raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
storage: p,
raftStorage: rs,
})
s := &EtcdServer{
Cfg: &ServerConfig{
DataDir: testdir,
},
r: *r,
store: st,
cluster: cl,
SyncTicker: &time.Ticker{},
}
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}

be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
s.be = be

s.start()
defer s.Stop()

actionc := p.Chan()
n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
if ac := <-actionc; ac.Name != "Save" {
// MsgSnap triggers raftNode to call Save()
t.Fatalf("expect save() is called, but got %v", ac.Name)
}

// get the snapshot sent by the transport
snapMsg := <-snapDoneC

// Snapshot first triggers raftnode to persists the snapshot onto disk
// before renaming db snapshot file to db
snapMsg.Snapshot.Metadata.Index = 1
n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
var seenSaveSnap bool
timer := time.After(5 * time.Second)
for {
select {
case ac := <-actionc:
switch ac.Name {
// DBFilePath() is called immediately before snapshot renaming.
case "DBFilePath":
if !seenSaveSnap {
t.Fatalf("DBFilePath called before SaveSnap")
}
return
case "SaveSnap":
seenSaveSnap = true
default:
continue
}
case <-timer:
t.Fatalf("timeout waiting on actions")
}
}
}

// Applied > SnapCount should trigger a SaveSnap event
func TestTriggerSnap(t *testing.T) {
be, tmpPath := backend.NewDefaultTmpBackend()
Expand Down
59 changes: 59 additions & 0 deletions etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@
package etcdserver

import (
"fmt"
"os"
"time"

"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/snap"
)

// isConnectedToQuorumSince checks whether the local member is connected to the
Expand Down Expand Up @@ -95,3 +102,55 @@ func (nc *notifier) notify(err error) {
nc.err = err
close(nc.c)
}

// checkAndRecoverDB attempts to recover db in the scenario when
// etcd server crashes before updating its in-state db
// and after persisting snapshot to disk from syncing with leader,
// snapshot can be newer than db where
// (snapshot.Metadata.Index > db.consistentIndex ).
//
// when that happen:
// 1. find xxx.snap.db that matches snap index.
// 2. rename xxx.snap.db to db.
// 3. open the new db as the backend.
func checkAndRecoverDB(snapshot *raftpb.Snapshot, oldbe backend.Backend, quotaBackendBytes int64, snapdir string) (be backend.Backend, err error) {
var cIndex consistentIndex
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
defer kv.Close()
kvindex := kv.ConsistentIndex()
if snapshot.Metadata.Index <= kvindex {
return oldbe, nil
}

id := snapshot.Metadata.Index
snapfn, err := snap.DBFilePathFromID(snapdir, id)
if err != nil {
return nil, fmt.Errorf("finding %v error: %v", snapdir+fmt.Sprintf("%016x.snap.db", id), err)
}

bepath := snapdir + databaseFilename
if err := os.Rename(snapfn, bepath); err != nil {
return nil, fmt.Errorf("rename snapshot file error: %v", err)
}

oldbe.Close()
be = openBackend(bepath, quotaBackendBytes)
return be, nil
}

func openBackend(bepath string, quotaBackendBytes int64) (be backend.Backend) {
beOpened := make(chan struct{})
go func() {
be = newBackend(bepath, quotaBackendBytes)
beOpened <- struct{}{}
}()

select {
case <-beOpened:
case <-time.After(time.Second):
plog.Warningf("another etcd process is running with the same data dir and holding the file lock.")
plog.Warningf("waiting for it to exit before starting...")
<-beOpened
}
return be
}
13 changes: 10 additions & 3 deletions snap/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package snap

import (
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/coreos/etcd/pkg/fileutil"
)

var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")

// SaveDBFrom saves snapshot of the database from the given reader. It
// guarantees the save operation is atomic.
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
Expand Down Expand Up @@ -60,15 +63,19 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
// DBFilePath returns the file path for the snapshot of the database with
// given id. If the snapshot does not exist, it returns error.
func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
fns, err := fileutil.ReadDir(s.dir)
return DBFilePathFromID(s.dir, id)
}

func DBFilePathFromID(dbPath string, id uint64) (string, error) {
fns, err := fileutil.ReadDir(dbPath)
if err != nil {
return "", err
}
wfn := fmt.Sprintf("%016x.snap.db", id)
for _, fn := range fns {
if fn == wfn {
return filepath.Join(s.dir, fn), nil
return filepath.Join(dbPath, fn), nil
}
}
return "", fmt.Errorf("snap: snapshot file doesn't exist")
return "", ErrNoDBSnapshot
}