Skip to content

Commit

Permalink
etcdserver: rework update committed index logic
Browse files Browse the repository at this point in the history
  • Loading branch information
fanminshi committed Dec 22, 2016
1 parent bd62e35 commit 7bf2163
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
15 changes: 15 additions & 0 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
return
}

updateCommittedIndex(&ap, rh)

// the leader can write to its disk in parallel with replicating to the followers and them
// writing to their disks.
// For more details, check raft thesis 10.2.1
Expand Down Expand Up @@ -231,6 +233,19 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}()
}

func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
var ci uint64
if len(ap.entries) != 0 {
ci = ap.entries[len(ap.entries)-1].Index
}
if ap.snapshot.Metadata.Index > ci {
ci = ap.snapshot.Metadata.Index
}
if ci != 0 {
rh.committedIdxUpdate(ci)
}
}

func (r *raftNode) sendMessages(ms []raftpb.Message) {
sentAppResp := false
for i := len(ms) - 1; i >= 0; i-- {
Expand Down
19 changes: 8 additions & 11 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,8 @@ type etcdProgress struct {
// and helps decouple state machine logic from Raft algorithms.
// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
type raftReadyHandler struct {
leadershipUpdate func()
leadershipUpdate func()
committedIdxUpdate func(uint64)
}

func (s *EtcdServer) run() {
Expand Down Expand Up @@ -648,6 +649,12 @@ func (s *EtcdServer) run() {
s.r.td.Reset()
}
},
committedIdxUpdate: func(ci uint64) {
cci := s.getCommittedIndex()
if ci > cci {
s.setCommittedIndex(ci)
}
},
}
s.r.start(rh)

Expand Down Expand Up @@ -701,16 +708,6 @@ func (s *EtcdServer) run() {
for {
select {
case ap := <-s.r.apply():
var ci uint64
if len(ap.entries) != 0 {
ci = ap.entries[len(ap.entries)-1].Index
}
if ap.snapshot.Metadata.Index > ci {
ci = ap.snapshot.Metadata.Index
}
if ci != 0 {
s.setCommittedIndex(ci)
}
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
Expand Down

0 comments on commit 7bf2163

Please sign in to comment.