Skip to content

Commit

Permalink
etcdserver: renaming db happens before snapshot persists to wal and s…
Browse files Browse the repository at this point in the history
…nap files

In the case that follower recieves a snapshot from leader
and crashes before renaming xxx.snap.db to db but after
snapshot has persisted to .wal and .snap, restarting
follower results loading old db, new .wal, and new .snap.
This will causes a index mismatch between snap metadata index
and consistent index from db.

This pr forces an ordering where saving/renaming db must
happen before snapshot is persisted to wal and snap file.
this ensures that db file can never be newer than  wal and snap file.
hence, it guarantees the invariant snapshot.Metadata.Index <= db.ConsistentIndex()
in NewServer() when checking validity of db and snap file.

FIXES etcd-io#7628
  • Loading branch information
fanminshi committed May 5, 2017
1 parent 505bf8c commit 0121c23
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
16 changes: 9 additions & 7 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type RaftTimer interface {
type apply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
raftDone <-chan struct{} // rx {} after raft has persisted messages
notifyc chan struct{} // notifyc acts a bridge between etcdserver and raftNode
}

type raftNode struct {
Expand Down Expand Up @@ -190,11 +190,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}
}

raftDone := make(chan struct{}, 1)
notifyc := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
raftDone: raftDone,
notifyc: notifyc,
}

updateCommittedIndex(&ap, rh)
Expand Down Expand Up @@ -223,6 +223,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftAfterSave struct{}

if !raft.IsEmptySnap(rd.Snapshot) {
// waits etcd server to finish renaming snap db to db.
<-notifyc
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
Expand All @@ -240,7 +242,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
msgs := r.processMessages(rd.Messages)

// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
raftDone <- struct{}{}
notifyc <- struct{}{}

// Candidate or follower needs to wait for all pending configuration
// changes to be applied before sending messages.
Expand All @@ -259,9 +261,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if waitApply {
// blocks until 'applyAll' calls 'applyWait.Trigger'
// to be in sync with scheduled config-change job
// (assume raftDone has cap of 1)
// (assume notifyc has cap of 1)
select {
case raftDone <- struct{}{}:
case notifyc <- struct{}{}:
case <-r.stopped:
return
}
Expand All @@ -271,7 +273,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
raftDone <- struct{}{}
notifyc <- struct{}{}
}

r.Advance()
Expand Down
4 changes: 3 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
// wait for the raft routine to finish the disk writes before triggering a
// snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine.
<-apply.raftDone
<-apply.notifyc

s.triggerSnapshot(ep)
select {
Expand Down Expand Up @@ -812,6 +812,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
// notifies raftNode that db has been replaced.
apply.notifyc <- struct{}{}

newbe := newBackend(fn, s.Cfg.QuotaBackendBytes)

Expand Down

0 comments on commit 0121c23

Please sign in to comment.