Skip to content

Commit

Permalink
tests: Add a functional tests that sends sigkill under high stress
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Mar 25, 2022
1 parent 4019c59 commit b8e4b38
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 259 deletions.
19 changes: 19 additions & 0 deletions tests/functional/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response

case rpcpb.Operation_SIGTERM_ETCD:
return srv.handle_SIGTERM_ETCD()
case rpcpb.Operation_SIGKILL_ETCD:
return srv.handle_SIGKILL_ETCD()
case rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA:
return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA()

Expand Down Expand Up @@ -551,6 +553,23 @@ func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
}, nil
}

func (srv *Server) handle_SIGKILL_ETCD() (*rpcpb.Response, error) {
if err := srv.stopEtcd(syscall.SIGKILL); err != nil {
return nil, err
}

if srv.etcdServer != nil {
srv.etcdServer.GetLogger().Sync()
} else {
srv.etcdLogFile.Sync()
}

return &rpcpb.Response{
Success: true,
Status: "killed etcd",
}, nil
}

func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error) {
err := srv.stopEtcd(syscall.SIGQUIT)
if err != nil {
Expand Down
50 changes: 26 additions & 24 deletions tests/functional/functional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,28 +177,30 @@ tester-config:
# For full descriptions,
# https://pkg.go.dev/go.etcd.io/etcd/tests/v3/functional/rpcpb#Case
cases:
- SIGTERM_ONE_FOLLOWER
- SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
- SIGTERM_LEADER
- SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT
- SIGTERM_QUORUM
- SIGTERM_ALL
- SIGQUIT_AND_REMOVE_ONE_FOLLOWER
- SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
- BLACKHOLE_PEER_PORT_TX_RX_LEADER
- BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
- BLACKHOLE_PEER_PORT_TX_RX_QUORUM
- BLACKHOLE_PEER_PORT_TX_RX_ALL
- DELAY_PEER_PORT_TX_RX_LEADER
- RANDOM_DELAY_PEER_PORT_TX_RX_LEADER
- DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
- RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
- DELAY_PEER_PORT_TX_RX_QUORUM
- RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM
- DELAY_PEER_PORT_TX_RX_ALL
- RANDOM_DELAY_PEER_PORT_TX_RX_ALL
- NO_FAIL_WITH_STRESS
- NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS
# - SIGTERM_ONE_FOLLOWER
# - SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
# - SIGTERM_LEADER
# - SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT
# - SIGTERM_QUORUM
# - SIGTERM_ALL
# - SIGQUIT_AND_REMOVE_ONE_FOLLOWER
- SIGKILL_FOLLOWER
- SIGKILL_LEADER
# - SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
# - BLACKHOLE_PEER_PORT_TX_RX_LEADER
# - BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
# - BLACKHOLE_PEER_PORT_TX_RX_QUORUM
# - BLACKHOLE_PEER_PORT_TX_RX_ALL
# - DELAY_PEER_PORT_TX_RX_LEADER
# - RANDOM_DELAY_PEER_PORT_TX_RX_LEADER
# - DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
# - RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
# - DELAY_PEER_PORT_TX_RX_QUORUM
# - RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM
# - DELAY_PEER_PORT_TX_RX_ALL
# - RANDOM_DELAY_PEER_PORT_TX_RX_ALL
# - NO_FAIL_WITH_STRESS
# - NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS
# - FAILPOINTS_WITH_DISK_IO_LATENCY

# TODO: use iptables for discarding outbound rafthttp traffic to peer port
Expand Down Expand Up @@ -255,5 +257,5 @@ tester-config:
stress-key-suffix-range-txn: 100
stress-key-txn-ops: 10

stress-clients: 100
stress-qps: 2000
stress-clients: 400
stress-qps: 8000
400 changes: 206 additions & 194 deletions tests/functional/rpcpb/rpc.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions tests/functional/rpcpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ enum Operation {
// SIGQUIT_ETCD_AND_REMOVE_DATA kills etcd process and removes all data
// directories to simulate destroying the whole machine.
SIGQUIT_ETCD_AND_REMOVE_DATA = 21;
// SIGKILL_ETCD kills etcd process while keeping data directories
// and previous etcd configurations.
SIGKILL_ETCD = 22;

// SAVE_SNAPSHOT is sent to trigger local member to download its snapshot
// onto its local disk with the specified path from tester.
Expand Down Expand Up @@ -426,6 +429,8 @@ enum Case {
// each member must be able to process client requests.
SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH = 14;

SIGKILL_FOLLOWER = 15;
SIGKILL_LEADER = 16;
// BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER drops all outgoing/incoming
// packets from/to the peer port on a randomly chosen follower
// (non-leader), and waits for "delay-ms" until recovery.
Expand Down
42 changes: 42 additions & 0 deletions tests/functional/tester/case_sigquit_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,30 @@ func recover_SIGQUIT_ETCD_AND_REMOVE_DATA(clus *Cluster, idx1 int) error {
return err
}

func inject_SIGKILL(clus *Cluster, index int) error {
clus.lg.Info(
"disastrous machine failure START",
zap.String("target-endpoint", clus.Members[index].EtcdClientEndpoint),
)
err := clus.sendOp(index, rpcpb.Operation_SIGKILL_ETCD)
clus.lg.Info(
"disastrous machine failure END",
zap.String("target-endpoint", clus.Members[index].EtcdClientEndpoint),
zap.Error(err),
)
return err
}

func recover_SIGKILL(clus *Cluster, idx1 int) error {
err := clus.sendOp(idx1, rpcpb.Operation_RESTART_ETCD)
clus.lg.Info(
"restart machine",
zap.String("target-endpoint", clus.Members[idx1].EtcdClientEndpoint),
zap.Error(err),
)
return err
}

func new_Case_SIGQUIT_AND_REMOVE_ONE_FOLLOWER(clus *Cluster) Case {
cc := caseByFunc{
rpcpbCase: rpcpb.Case_SIGQUIT_AND_REMOVE_ONE_FOLLOWER,
Expand All @@ -187,6 +211,24 @@ func new_Case_SIGQUIT_AND_REMOVE_ONE_FOLLOWER(clus *Cluster) Case {
}
}

func new_Case_SIGKILL_FOLLOWER() Case {
cc := caseByFunc{
rpcpbCase: rpcpb.Case_SIGKILL_FOLLOWER,
injectMember: inject_SIGKILL,
recoverMember: recover_SIGKILL,
}
return &caseFollower{cc, -1, -1}
}

func new_Case_SIGKILL_LEADER() Case {
cc := caseByFunc{
rpcpbCase: rpcpb.Case_SIGKILL_LEADER,
injectMember: inject_SIGKILL,
recoverMember: recover_SIGKILL,
}
return &caseLeader{cc, -1, -1}
}

func new_Case_SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus *Cluster) Case {
return &caseUntilSnapshot{
rpcpbCase: rpcpb.Case_SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
Expand Down
6 changes: 6 additions & 0 deletions tests/functional/tester/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ func (clus *Cluster) updateCases() {
case "SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH":
clus.cases = append(clus.cases,
new_Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH(clus))
case "SIGKILL_FOLLOWER":
clus.cases = append(clus.cases,
new_Case_SIGKILL_FOLLOWER())
case "SIGKILL_LEADER":
clus.cases = append(clus.cases,
new_Case_SIGKILL_LEADER())

case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
clus.cases = append(clus.cases,
Expand Down
63 changes: 35 additions & 28 deletions tests/functional/tester/cluster_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,6 @@ func (clus *Cluster) doTestCase(t *testing.T, fa Case) {
zap.String("desc", fa.Desc()),
)

clus.lg.Info("wait health before injecting failures")
if err := clus.WaitHealth(); err != nil {
t.Fatalf("wait full health error: %v", err)
}

stressStarted := false
fcase := fa.TestCase()
if fcase != rpcpb.Case_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
Expand All @@ -191,30 +186,42 @@ func (clus *Cluster) doTestCase(t *testing.T, fa Case) {
}
stressStarted = true
}
done := time.After(time.Minute * 10)
inject:
for {
select {
case <- done:
break inject
default:
}

clus.lg.Info(
"inject START",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Int("case-total", len(clus.cases)),
zap.String("desc", fa.Desc()),
)
if err := fa.Inject(clus); err != nil {
t.Fatalf("injection error: %v", err)
}

// if run local, recovering server may conflict
// with stressing client ports
// TODO: use unix for local tests
clus.lg.Info(
"recover START",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Int("case-total", len(clus.cases)),
zap.String("desc", fa.Desc()),
)
if err := fa.Recover(clus); err != nil {
t.Fatalf("recovery error: %v", err)
clus.lg.Info("wait health before injecting failure")
if err := clus.WaitHealth(); err != nil {
t.Fatalf("wait full health error: %v", err)
}
clus.lg.Info(
"inject START",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Int("case-total", len(clus.cases)),
zap.String("desc", fa.Desc()),
)
if err := fa.Inject(clus); err != nil {
t.Fatalf("injection error: %v", err)
}
// if run local, recovering server may conflict
// with stressing client ports
// TODO: use unix for local tests
clus.lg.Info(
"recover START",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Int("case-total", len(clus.cases)),
zap.String("desc", fa.Desc()),
)
if err := fa.Recover(clus); err != nil {
t.Fatalf("recovery error: %v", err)
}
}

if stressStarted {
Expand Down
13 changes: 0 additions & 13 deletions tests/functional/tester/stresser_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"math/rand"
"reflect"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -129,10 +128,6 @@ func (s *keyStresser) run() {
continue
}

if !s.isRetryableError(err) {
return
}

// only record errors before pausing stressers
s.emu.Lock()
if !s.paused {
Expand Down Expand Up @@ -183,14 +178,6 @@ func (s *keyStresser) isRetryableError(err error) bool {
return true
}

s.lg.Warn(
"stress run exiting",
zap.String("stress-type", "KV"),
zap.String("endpoint", s.m.EtcdClientEndpoint),
zap.String("error-type", reflect.TypeOf(err).String()),
zap.String("error-desc", rpctypes.ErrorDesc(err)),
zap.Error(err),
)
return false
}

Expand Down

0 comments on commit b8e4b38

Please sign in to comment.