diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 60ff1092ea20..42cbbd336ba8 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -577,9 +577,11 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) break } + plog.Warningf("alarm %v raised by peer %+v", m.Alarm, types.ID(m.MemberID)) switch m.Alarm { + case pb.AlarmType_CORRUPT: + a.s.applyV3 = newApplierV3Corrupt(a) case pb.AlarmType_NOSPACE: - plog.Warningf("alarm raised %+v", m) a.s.applyV3 = newApplierV3Capped(a) default: plog.Errorf("unimplemented alarm activation (%+v)", m) @@ -596,7 +598,8 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) } switch m.Alarm { - case pb.AlarmType_NOSPACE: + case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT: + // TODO: check kv hash before deactivating CORRUPT? plog.Infof("alarm disarmed %+v", ar) a.s.applyV3 = a.s.newApplierV3() default: diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go new file mode 100644 index 000000000000..7d9c57e63937 --- /dev/null +++ b/etcdserver/corrupt.go @@ -0,0 +1,180 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "fmt" + "time" + + "github.com/coreos/etcd/clientv3" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/pkg/types" + + "golang.org/x/net/context" +) + +var CorruptCheckInterval = 5 * time.Minute + +func (s *EtcdServer) monitorKVHash() { + for { + select { + case <-s.stopping: + return + case <-time.After(CorruptCheckInterval): + } + if !s.isLeader() { + continue + } + if err := s.checkHashKV(); err != nil { + plog.Debugf("check hash kv failed %v", err) + } + } +} + +func (s *EtcdServer) checkHashKV() error { + h, rev, crev, err := s.kv.HashByRev(0) + if err != nil { + plog.Fatalf("failed to hash kv store (%v)", err) + } + resps := []*clientv3.HashKVResponse{} + for _, m := range s.cluster.Members() { + if m.ID == s.ID() { + continue + } + + cli, cerr := clientv3.New(clientv3.Config{Endpoints: m.PeerURLs}) + if cerr != nil { + continue + } + + respsLen := len(resps) + for _, c := range cli.Endpoints() { + ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + resp, err := cli.HashKV(ctx, c, rev) + fmt.Printf("got %+v %v\n", resp, err) + cancel() + if err == nil { + cerr = err + resps = append(resps, resp) + break + } + } + cli.Close() + + if respsLen == len(resps) { + plog.Warningf("failed to hash kv for peer %v (%v)", types.ID(m.ID), cerr) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + err = s.linearizableReadNotify(ctx) + cancel() + if err != nil { + return err + } + + h2, rev2, crev2, err := s.kv.HashByRev(0) + if err != nil { + plog.Warningf("failed to hash kv store (%v)", err) + return err + } + + alarmed := false + mismatch := func() { + if alarmed { + return + } + alarmed = true + a := &pb.AlarmRequest{ + MemberID: uint64(s.ID()), + Action: pb.AlarmRequest_ACTIVATE, + Alarm: pb.AlarmType_CORRUPT, + } + s.goAttach(func() { + s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) + }) + } + + if h2 != h && rev2 == rev && crev == crev2 { + plog.Warningf("hash %d and %d for revision %d", h, h2, rev) + mismatch() + } + + for _, resp := range resps { + if resp.Header.Revision > rev2 { + plog.Warningf( + "revision %d from member %v, expected at most %d", + resp.Header.Revision, + types.ID(resp.Header.MemberId), + rev2) + mismatch() + } + if resp.CompactRevision > crev2 { + plog.Warningf( + "compact revision %d from member %v, expected at most %d", + resp.CompactRevision, + types.ID(resp.Header.MemberId), + crev2, + ) + mismatch() + } + if resp.CompactRevision == crev && resp.Hash != h { + plog.Warningf( + "hash %d at revision %d from member %v, expected hash %d", + resp.Hash, + rev, + types.ID(resp.Header.MemberId), + h, + ) + mismatch() + } + } + return nil +} + +type applierV3Corrupt struct { + applierV3 +} + +func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} } + +func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { + return nil, ErrCorrupt +} + +func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { + return nil, ErrCorrupt +} + +func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + return nil, ErrCorrupt +} + +func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { + return nil, ErrCorrupt +} + +func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) { + return nil, nil, ErrCorrupt +} + +func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { + return nil, ErrCorrupt +} + +func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { + return nil, ErrCorrupt +} diff --git a/etcdserver/errors.go b/etcdserver/errors.go index 09571e56e31f..fb93c4b2a1dd 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -35,6 +35,7 @@ var ( ErrTooManyRequests = errors.New("etcdserver: too many requests") ErrUnhealthy = errors.New("etcdserver: unhealthy cluster") ErrKeyNotFound = errors.New("etcdserver: key not found") + ErrCorrupt = errors.New("etcdserver: corrupt cluster") ) type DiscoveryError struct { diff --git a/etcdserver/server.go b/etcdserver/server.go index 8fb830c1a8e8..ac7601ecb0ee 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -524,6 +524,7 @@ func (s *EtcdServer) Start() { s.goAttach(func() { monitorFileDescriptor(s.stopping) }) s.goAttach(s.monitorVersions) s.goAttach(s.linearizableReadLoop) + s.goAttach(s.monitorKVHash) } // start prepares and starts server in a new goroutine. It is no longer safe to @@ -1628,6 +1629,9 @@ func (s *EtcdServer) restoreAlarms() error { if len(as.Get(pb.AlarmType_NOSPACE)) > 0 { s.applyV3 = newApplierV3Capped(s.applyV3) } + if len(as.Get(pb.AlarmType_CORRUPT)) > 0 { + s.applyV3 = newApplierV3Corrupt(s.applyV3) + } return nil }