Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): joinIndex is not set after recovery from full outage #819

Merged
merged 5 commits into from
Jun 2, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (e *Etcd) Run() {
DataDir: e.Config.DataDir,
}
e.StandbyServer = server.NewStandbyServer(ssConfig, client)
e.StandbyServer.SetRaftServer(raftServer)

// Generating config could be slow.
// Put it here to make listen happen immediately after peer-server starting.
Expand Down Expand Up @@ -347,6 +348,7 @@ func (e *Etcd) runServer() {
raftServer.SetElectionTimeout(electionTimeout)
raftServer.SetHeartbeatInterval(heartbeatInterval)
e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot)
e.StandbyServer.SetRaftServer(raftServer)

e.PeerServer.SetJoinIndex(e.StandbyServer.JoinIndex())
e.setMode(PeerMode)
Expand Down
1 change: 1 addition & 0 deletions server/peer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bo
// TODO(yichengq): Think about the action that should be done
// if it cannot connect any of the previous known node.
log.Debugf("%s is restarting the cluster %v", name, possiblePeers)
s.SetJoinIndex(s.raftServer.CommitIndex())
toStart = true
return
}
Expand Down
11 changes: 8 additions & 3 deletions server/standby_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ type standbyInfo struct {
}

type StandbyServer struct {
Config StandbyServerConfig
client *Client
Config StandbyServerConfig
client *Client
raftServer raft.Server

standbyInfo
joinIndex uint64
Expand All @@ -62,6 +63,10 @@ func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer
return s
}

func (s *StandbyServer) SetRaftServer(raftServer raft.Server) {
s.raftServer = raftServer
}

func (s *StandbyServer) Start() {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -237,7 +242,7 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error {
func (s *StandbyServer) join(peer string) error {
for _, url := range s.ClusterURLs() {
if s.Config.PeerURL == url {
s.joinIndex = 0
s.joinIndex = s.raftServer.CommitIndex()
return nil
}
}
Expand Down
68 changes: 68 additions & 0 deletions tests/functional/multi_node_kill_all_and_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"os"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -100,6 +101,8 @@ func TestTLSMultiNodeKillAllAndRecovery(t *testing.T) {
t.Fatal("cannot create cluster")
}

time.Sleep(time.Second)

c := etcd.NewClient(nil)

go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
Expand Down Expand Up @@ -239,3 +242,68 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 7)
}

// Create a five nodes
// Kill all the nodes and restart, then remove the leader
func TestMultiNodeKillAllAndRecoveryAndRemoveLeader(t *testing.T) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}

stop := make(chan bool)
leaderChan := make(chan string, 1)
all := make(chan bool, 1)

clusterSize := 5
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
defer DestroyCluster(etcds)

if err != nil {
t.Fatal("cannot create cluster")
}

c := etcd.NewClient(nil)

go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
<-all
<-leaderChan
stop <- true

c.SyncCluster()

// kill all
DestroyCluster(etcds)

time.Sleep(time.Second)

stop = make(chan bool)
leaderChan = make(chan string, 1)
all = make(chan bool, 1)

time.Sleep(time.Second)

for i := 0; i < clusterSize; i++ {
etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr)
}

go Monitor(clusterSize, 1, leaderChan, all, stop)

<-all
leader := <-leaderChan

_, err = c.Set("foo", "bar", 0)
if err != nil {
t.Fatalf("Recovery error: %s", err)
}

port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
num := port - 7000
resp, _ := tests.Delete(leader+"/v2/admin/machines/node"+strconv.Itoa(num), "application/json", nil)
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}

// check the old leader is in standby mode now
time.Sleep(time.Second)
resp, _ = tests.Get(leader + "/name")
assert.Equal(t, resp.StatusCode, 404)
}
3 changes: 2 additions & 1 deletion tests/functional/remove_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ func TestRemovePausedNode(t *testing.T) {
if !assert.Equal(t, r.StatusCode, 200) {
t.FailNow()
}
time.Sleep(2 * time.Second)
// Wait for standby instances to update its cluster config
time.Sleep(6 * time.Second)

resp, err := c.Get("_etcd/machines", false, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/simple_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSnapshot(t *testing.T) {

index, _ = strconv.Atoi(snapshots[0].Name()[2:6])

if index < 1010 || index > 1025 {
if index < 1010 || index > 1029 {
t.Fatal("wrong name of snapshot :", snapshots[0].Name())
}
}
Expand Down