Skip to content

Commit

Permalink
Merge pull request #819 from unihorn/97
Browse files Browse the repository at this point in the history
fix(server): joinIndex is not set after recovery from full outage
  • Loading branch information
yichengq committed Jun 2, 2014
2 parents d5bfca9 + 7cb1269 commit 2387ef3
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 5 deletions.
2 changes: 2 additions & 0 deletions etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (e *Etcd) Run() {
DataDir: e.Config.DataDir,
}
e.StandbyServer = server.NewStandbyServer(ssConfig, client)
e.StandbyServer.SetRaftServer(raftServer)

// Generating config could be slow.
// Put it here to make listen happen immediately after peer-server starting.
Expand Down Expand Up @@ -347,6 +348,7 @@ func (e *Etcd) runServer() {
raftServer.SetElectionTimeout(electionTimeout)
raftServer.SetHeartbeatInterval(heartbeatInterval)
e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot)
e.StandbyServer.SetRaftServer(raftServer)

e.PeerServer.SetJoinIndex(e.StandbyServer.JoinIndex())
e.setMode(PeerMode)
Expand Down
1 change: 1 addition & 0 deletions server/peer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bo
// TODO(yichengq): Think about the action that should be done
// if it cannot connect any of the previous known node.
log.Debugf("%s is restarting the cluster %v", name, possiblePeers)
s.SetJoinIndex(s.raftServer.CommitIndex())
toStart = true
return
}
Expand Down
11 changes: 8 additions & 3 deletions server/standby_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ type standbyInfo struct {
}

type StandbyServer struct {
Config StandbyServerConfig
client *Client
Config StandbyServerConfig
client *Client
raftServer raft.Server

standbyInfo
joinIndex uint64
Expand All @@ -62,6 +63,10 @@ func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer
return s
}

func (s *StandbyServer) SetRaftServer(raftServer raft.Server) {
s.raftServer = raftServer
}

func (s *StandbyServer) Start() {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -237,7 +242,7 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error {
func (s *StandbyServer) join(peer string) error {
for _, url := range s.ClusterURLs() {
if s.Config.PeerURL == url {
s.joinIndex = 0
s.joinIndex = s.raftServer.CommitIndex()
return nil
}
}
Expand Down
68 changes: 68 additions & 0 deletions tests/functional/multi_node_kill_all_and_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"os"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -100,6 +101,8 @@ func TestTLSMultiNodeKillAllAndRecovery(t *testing.T) {
t.Fatal("cannot create cluster")
}

time.Sleep(time.Second)

c := etcd.NewClient(nil)

go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
Expand Down Expand Up @@ -239,3 +242,68 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 7)
}

// Create a five nodes
// Kill all the nodes and restart, then remove the leader
func TestMultiNodeKillAllAndRecoveryAndRemoveLeader(t *testing.T) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}

stop := make(chan bool)
leaderChan := make(chan string, 1)
all := make(chan bool, 1)

clusterSize := 5
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
defer DestroyCluster(etcds)

if err != nil {
t.Fatal("cannot create cluster")
}

c := etcd.NewClient(nil)

go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
<-all
<-leaderChan
stop <- true

c.SyncCluster()

// kill all
DestroyCluster(etcds)

time.Sleep(time.Second)

stop = make(chan bool)
leaderChan = make(chan string, 1)
all = make(chan bool, 1)

time.Sleep(time.Second)

for i := 0; i < clusterSize; i++ {
etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr)
}

go Monitor(clusterSize, 1, leaderChan, all, stop)

<-all
leader := <-leaderChan

_, err = c.Set("foo", "bar", 0)
if err != nil {
t.Fatalf("Recovery error: %s", err)
}

port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
num := port - 7000
resp, _ := tests.Delete(leader+"/v2/admin/machines/node"+strconv.Itoa(num), "application/json", nil)
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}

// check the old leader is in standby mode now
time.Sleep(time.Second)
resp, _ = tests.Get(leader + "/name")
assert.Equal(t, resp.StatusCode, 404)
}
3 changes: 2 additions & 1 deletion tests/functional/remove_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ func TestRemovePausedNode(t *testing.T) {
if !assert.Equal(t, r.StatusCode, 200) {
t.FailNow()
}
time.Sleep(2 * time.Second)
// Wait for standby instances to update its cluster config
time.Sleep(6 * time.Second)

resp, err := c.Get("_etcd/machines", false, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/simple_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSnapshot(t *testing.T) {

index, _ = strconv.Atoi(snapshots[0].Name()[2:6])

if index < 1010 || index > 1025 {
if index < 1010 || index > 1029 {
t.Fatal("wrong name of snapshot :", snapshots[0].Name())
}
}
Expand Down

0 comments on commit 2387ef3

Please sign in to comment.