Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: check data corruption on boot #8554

Merged
merged 4 commits into from
Nov 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ type etcdProcessClusterConfig struct {
isClientAutoTLS bool
isClientCRL bool

forceNewCluster bool
initialToken string
quotaBackendBytes int64
noStrictReconfig bool
forceNewCluster bool
initialToken string
quotaBackendBytes int64
noStrictReconfig bool
initialCorruptCheck bool
}

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -224,6 +225,9 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
if cfg.noStrictReconfig {
args = append(args, "--strict-reconfig-check=false")
}
if cfg.initialCorruptCheck {
args = append(args, "--experimental-initial-corrupt-check")
}
var murl string
if cfg.metricsURLScheme != "" {
murl = (&url.URL{
Expand Down
14 changes: 14 additions & 0 deletions e2e/ctl_v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type ctlCtx struct {
t *testing.T
cfg etcdProcessClusterConfig
quotaBackendBytes int64
corruptFunc func(string) error
noStrictReconfig bool

epc *etcdProcessCluster
Expand All @@ -69,6 +70,8 @@ type ctlCtx struct {
user string
pass string

initialCorruptCheck bool

// for compaction
compactPhysical bool
}
Expand Down Expand Up @@ -105,6 +108,14 @@ func withCompactPhysical() ctlOption {
return func(cx *ctlCtx) { cx.compactPhysical = true }
}

func withInitialCorruptCheck() ctlOption {
return func(cx *ctlCtx) { cx.initialCorruptCheck = true }
}

func withCorruptFunc(f func(string) error) ctlOption {
return func(cx *ctlCtx) { cx.corruptFunc = f }
}

func withNoStrictReconfig() ctlOption {
return func(cx *ctlCtx) { cx.noStrictReconfig = true }
}
Expand All @@ -131,6 +142,9 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
ret.cfg.quotaBackendBytes = ret.quotaBackendBytes
}
ret.cfg.noStrictReconfig = ret.noStrictReconfig
if ret.initialCorruptCheck {
ret.cfg.initialCorruptCheck = ret.initialCorruptCheck
}

epc, err := newEtcdProcessCluster(&ret.cfg)
if err != nil {
Expand Down
129 changes: 129 additions & 0 deletions e2e/etcd_corrupt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"

bolt "github.com/coreos/bbolt"
)

// TODO: test with embedded etcd in integration package

func TestEtcdCorruptHash(t *testing.T) {
oldenv := os.Getenv("EXPECT_DEBUG")
defer os.Setenv("EXPECT_DEBUG", oldenv)
os.Setenv("EXPECT_DEBUG", "1")

cfg := configNoTLS

// trigger snapshot so that restart member can load peers from disk
cfg.snapCount = 3

testCtl(t, corruptTest, withQuorum(),
withCfg(cfg),
withInitialCorruptCheck(),
withCorruptFunc(corruptHash),
)
}

func corruptTest(cx ctlCtx) {
for i := 0; i < 10; i++ {
if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil {
if cx.dialTimeout > 0 && !isGRPCTimedout(err) {
cx.t.Fatalf("putTest ctlV3Put error (%v)", err)
}
}
}
// enough time for all nodes sync on the same data
time.Sleep(3 * time.Second)

eps := cx.epc.EndpointsV3()
cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer cli1.Close()

sresp, err := cli1.Status(context.TODO(), eps[0])
if err != nil {
cx.t.Fatal(err)
}
id0 := sresp.Header.GetMemberId()

cx.epc.procs[0].Stop()

// corrupt first member by modifying backend offline.
fp := filepath.Join(cx.epc.procs[0].Config().dataDirPath, "member", "snap", "db")
if err = cx.corruptFunc(fp); err != nil {
cx.t.Fatal(err)
}

ep := cx.epc.procs[0]
proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...))
if err != nil {
cx.t.Fatal(err)
}
defer proc.Stop()

// restarting corrupted member should fail
waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)})
}

func corruptHash(fpath string) error {
db, derr := bolt.Open(fpath, os.ModePerm, &bolt.Options{})
if derr != nil {
return derr
}
defer db.Close()

return db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("key"))
if b == nil {
return errors.New("got nil bucket for 'key'")
}
keys, vals := [][]byte{}, [][]byte{}
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
keys = append(keys, k)
var kv mvccpb.KeyValue
if uerr := kv.Unmarshal(v); uerr != nil {
return uerr
}
kv.Key[0]++
kv.Value[0]++
v2, v2err := kv.Marshal()
if v2err != nil {
return v2err
}
vals = append(vals, v2)
}
for i := range keys {
if perr := b.Put(keys[i], vals[i]); perr != nil {
return perr
}
}
return nil
})
}
5 changes: 3 additions & 2 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ type Config struct {

// Experimental flags

ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
}

// configYAML holds the config suitable for yaml parsing
Expand Down
13 changes: 13 additions & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
token string
)

memberInitialized := true
if !isMemberInitialized(cfg) {
memberInitialized = false
urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
if err != nil {
return e, fmt.Errorf("error setting up initial cluster: %v", err)
Expand Down Expand Up @@ -175,6 +177,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
}

Expand All @@ -185,6 +188,16 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))

// newly started member ("memberInitialized==false")
// does not need corruption check
if memberInitialized {
if err = e.Server.CheckInitialHashKV(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)
e.Server = nil
return e, err
}
}
e.Server.Start()

if err = e.servePeers(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func newConfig() *config {
fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.")

// experimental
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")

// ignored
Expand Down
4 changes: 3 additions & 1 deletion etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ auth flags:
Specify a v3 authentication token type and its options ('simple' or 'jwt').

experimental flags:
--experimental-initial-corrupt-check 'false'
enable to check data corruption before serving any client/peer traffic.
--experimental-corrupt-check-time '0s'
duration of time between cluster corruption check passes.
duration of time between cluster corruption check passes.
--experimental-enable-v2v3 ''
serve v2 requests through the v3 backend under a given prefix.
`
Expand Down
5 changes: 4 additions & 1 deletion etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ type ServerConfig struct {

AuthToken string

CorruptCheckTime time.Duration
// InitialCorruptCheck is true to check data corruption on boot
// before serving any peer/client traffic.
InitialCorruptCheck bool
CorruptCheckTime time.Duration
}

// VerifyBootstrap sanity-checks the initial config for bootstrap case
Expand Down
Loading