Skip to content

Commit

Permalink
Merge pull request #7513 from gyuho/raft-applied-term
Browse files Browse the repository at this point in the history
etcdserver: remove possibly compacted entry look-up
  • Loading branch information
gyuho committed Mar 15, 2017
2 parents 902c676 + 80c10e1 commit 5856c8b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 16 deletions.
18 changes: 10 additions & 8 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
}

Expand Down Expand Up @@ -676,6 +677,7 @@ func (s *EtcdServer) run() {
ep := etcdProgress{
confState: sn.Metadata.ConfState,
snapi: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
}

Expand Down Expand Up @@ -777,7 +779,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
}
Expand Down Expand Up @@ -879,6 +881,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}
plog.Info("finished adding peers from new cluster configuration into network...")

ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
Expand All @@ -900,7 +903,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
return
}
var shouldstop bool
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
Expand Down Expand Up @@ -1254,9 +1257,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
// apply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer.
// The given entries should not be empty.
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
var applied uint64
var shouldstop bool
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
for i := range es {
e := es[i]
switch e.Type {
Expand All @@ -1266,16 +1267,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState)
shouldstop = shouldstop || removedSelf
shouldStop = shouldStop || removedSelf
s.w.Trigger(cc.ID, err)
default:
plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
}
atomic.StoreUint64(&s.r.index, e.Index)
atomic.StoreUint64(&s.r.term, e.Term)
applied = e.Index
appliedt = e.Term
appliedi = e.Index
}
return applied, shouldstop
return appliedt, appliedi, shouldStop
}

// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
ents = append(ents, ent)
}

_, shouldStop := srv.apply(ents, &raftpb.ConfState{})
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
if !shouldStop {
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
}
Expand Down
8 changes: 1 addition & 7 deletions etcdserver/snapshot_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package etcdserver

import (
"io"
"log"

"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
Expand All @@ -26,12 +25,7 @@ import (
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
// as ReadCloser.
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
snapt, err := s.r.raftStorage.Term(snapi)
if err != nil {
log.Panicf("get term should never fail: %v", err)
}

func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
// get a snapshot of v2 store as []byte
clone := s.store.Clone()
d, err := clone.SaveNoCopy()
Expand Down

0 comments on commit 5856c8b

Please sign in to comment.