From 04cfb4c660e05f83889dd6ee5de29901b9ff37bb Mon Sep 17 00:00:00 2001 From: caojiamingalan Date: Thu, 18 May 2023 16:55:32 -0500 Subject: [PATCH] etcdserver: add cluster id check for hashKVHandler Signed-off-by: caojiamingalan Signed-off-by: Marek Siarkowicz --- api/v3rpc/rpctypes/error.go | 3 + server/etcdserver/api/rafthttp/http.go | 4 +- server/etcdserver/api/rafthttp/stream.go | 6 +- server/etcdserver/api/rafthttp/util.go | 4 +- server/etcdserver/corrupt.go | 25 ++++- server/etcdserver/corrupt_test.go | 119 +++++++++++++++++++++++ tests/e2e/cluster_test.go | 15 +++ tests/e2e/corrupt_test.go | 85 ++++++++++++++++ tests/integration/hashkv_test.go | 9 +- 9 files changed, 257 insertions(+), 13 deletions(-) diff --git a/api/v3rpc/rpctypes/error.go b/api/v3rpc/rpctypes/error.go index ae112ae131b..23201302e83 100644 --- a/api/v3rpc/rpctypes/error.go +++ b/api/v3rpc/rpctypes/error.go @@ -45,6 +45,7 @@ var ( ErrGRPCMemberNotLearner = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member").Err() ErrGRPCLearnerNotReady = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which is in sync with leader").Err() ErrGRPCTooManyLearners = status.New(codes.FailedPrecondition, "etcdserver: too many learner members in cluster").Err() + ErrGRPCClusterIdMismatch = status.New(codes.FailedPrecondition, "etcdserver: cluster ID mismatch").Err() ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err() ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err() @@ -114,6 +115,7 @@ var ( ErrorDesc(ErrGRPCMemberNotLearner): ErrGRPCMemberNotLearner, ErrorDesc(ErrGRPCLearnerNotReady): ErrGRPCLearnerNotReady, ErrorDesc(ErrGRPCTooManyLearners): ErrGRPCTooManyLearners, + ErrorDesc(ErrGRPCClusterIdMismatch): ErrGRPCClusterIdMismatch, ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge, ErrorDesc(ErrGRPCRequestTooManyRequests): ErrGRPCRequestTooManyRequests, @@ -200,6 +202,7 @@ var ( ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken) ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision) ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt) + ErrClusterIdMismatch = Error(ErrGRPCClusterIdMismatch) ErrNoLeader = Error(ErrGRPCNoLeader) ErrNotLeader = Error(ErrGRPCNotLeader) diff --git a/server/etcdserver/api/rafthttp/http.go b/server/etcdserver/api/rafthttp/http.go index 149c50b79c2..f60383f4791 100644 --- a/server/etcdserver/api/rafthttp/http.go +++ b/server/etcdserver/api/rafthttp/http.go @@ -54,7 +54,7 @@ var ( RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot") errIncompatibleVersion = errors.New("incompatible version") - errClusterIDMismatch = errors.New("cluster ID mismatch") + ErrClusterIDMismatch = errors.New("cluster ID mismatch") ) type peerGetter interface { @@ -508,7 +508,7 @@ func checkClusterCompatibilityFromHeader(lg *zap.Logger, localID types.ID, heade zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs), zap.String("remote-peer-cluster-id", gcid), ) - return errClusterIDMismatch + return ErrClusterIDMismatch } return nil } diff --git a/server/etcdserver/api/rafthttp/stream.go b/server/etcdserver/api/rafthttp/stream.go index 321fd5283de..1b180fca3f7 100644 --- a/server/etcdserver/api/rafthttp/stream.go +++ b/server/etcdserver/api/rafthttp/stream.go @@ -648,7 +648,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { } return nil, errIncompatibleVersion - case errClusterIDMismatch.Error(): + case ErrClusterIDMismatch.Error(): if cr.lg != nil { cr.lg.Warn( "request sent was ignored by remote peer due to cluster ID mismatch", @@ -656,10 +656,10 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")), zap.String("local-member-id", cr.tr.ID.String()), zap.String("local-member-cluster-id", cr.tr.ClusterID.String()), - zap.Error(errClusterIDMismatch), + zap.Error(ErrClusterIDMismatch), ) } - return nil, errClusterIDMismatch + return nil, ErrClusterIDMismatch default: return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) diff --git a/server/etcdserver/api/rafthttp/util.go b/server/etcdserver/api/rafthttp/util.go index 91bc6884e4b..6d0e6bdd6b0 100644 --- a/server/etcdserver/api/rafthttp/util.go +++ b/server/etcdserver/api/rafthttp/util.go @@ -94,7 +94,7 @@ func checkPostResponse(lg *zap.Logger, resp *http.Response, body []byte, req *ht ) } return errIncompatibleVersion - case errClusterIDMismatch.Error(): + case ErrClusterIDMismatch.Error(): if lg != nil { lg.Error( "request sent was ignored due to cluster ID mismatch", @@ -103,7 +103,7 @@ func checkPostResponse(lg *zap.Logger, resp *http.Response, body []byte, req *ht zap.String("local-member-cluster-id", req.Header.Get("X-Etcd-Cluster-ID")), ) } - return errClusterIDMismatch + return ErrClusterIDMismatch default: return fmt.Errorf("unhandled error %q when precondition failed", string(body)) } diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index a9ad631eaec..6b465125a6f 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/traceutil" + "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/mvcc" "go.uber.org/zap" @@ -155,6 +156,17 @@ func (cm *corruptionChecker) InitialCheck() error { zap.Strings("remote-peer-endpoints", p.eps), zap.Error(err), ) + case rpctypes.ErrClusterIdMismatch: + cm.lg.Warn( + "cluster ID mismatch", + zap.String("local-member-id", cm.hasher.MemberId().String()), + zap.Int64("local-member-revision", h.Revision), + zap.Int64("local-member-compact-revision", h.CompactRevision), + zap.Uint32("local-member-hash", h.Hash), + zap.String("remote-peer-id", p.id.String()), + zap.Strings("remote-peer-endpoints", p.eps), + zap.Error(err), + ) } } } @@ -389,7 +401,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { var lastErr error for _, ep := range p.eps { ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - resp, lastErr := HashByRev(ctx, cc, ep, rev) + resp, lastErr := HashByRev(ctx, s.cluster.ID(), cc, ep, rev) cancel() if lastErr == nil { resps = append(resps, &peerHashKVResp{peerInfo: p, resp: resp, err: nil}) @@ -467,6 +479,10 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "bad path", http.StatusBadRequest) return } + if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != "" && gcid != h.server.cluster.ID().String() { + http.Error(w, rafthttp.ErrClusterIDMismatch.Error(), http.StatusPreconditionFailed) + return + } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) @@ -505,7 +521,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // HashByRev fetch hash of kv store at the given rev via http call to the given url -func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb.HashKVResponse, error) { +func HashByRev(ctx context.Context, cid types.ID, cc *http.Client, url string, rev int64) (*pb.HashKVResponse, error) { hashReq := &pb.HashKVRequest{Revision: rev} hashReqBytes, err := json.Marshal(hashReq) if err != nil { @@ -518,6 +534,7 @@ func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb } req = req.WithContext(ctx) req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Etcd-Cluster-ID", cid.String()) req.Cancel = ctx.Done() resp, err := cc.Do(req) @@ -537,6 +554,10 @@ func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb if strings.Contains(string(b), mvcc.ErrFutureRev.Error()) { return nil, rpctypes.ErrFutureRev } + } else if resp.StatusCode == http.StatusPreconditionFailed { + if strings.Contains(string(b), rafthttp.ErrClusterIDMismatch.Error()) { + return nil, rpctypes.ErrClusterIdMismatch + } } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("unknown error: %s", string(b)) diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 614ee3c325b..3ff1760999f 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -15,16 +15,28 @@ package etcdserver import ( + "bytes" "context" + "encoding/json" "fmt" + "io" + "net/http" + "net/http/httptest" + "strconv" + "strings" "testing" "time" + "go.uber.org/zap" + + "go.etcd.io/etcd/server/v3/lease" + "github.com/stretchr/testify/assert" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/mvcc" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" "go.uber.org/zap/zaptest" ) @@ -85,6 +97,13 @@ func TestInitialCheck(t *testing.T) { hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, }, + { + name: "Cluster ID Mismatch does not fail CorruptionChecker.InitialCheck()", + hasher: fakeHasher{ + peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -202,6 +221,13 @@ func TestPeriodicCheck(t *testing.T) { expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(0)"}, expectCorrupt: true, }, + { + name: "Cluster ID Mismatch does not fail CorruptionChecker.PeriodicCheck()", + hasher: fakeHasher{ + peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}}, + }, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -290,6 +316,14 @@ func TestCompactHashCheck(t *testing.T) { }, expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"}, }, + { + name: "Cluster ID Mismatch does not fail CorruptionChecker.CompactHashCheck()", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}}, + peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"}, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -375,3 +409,88 @@ func (f *fakeHasher) TriggerCorruptAlarm(memberId types.ID) { f.actions = append(f.actions, fmt.Sprintf("TriggerCorruptAlarm(%d)", memberId)) f.alarmTriggered = true } + +func TestHashKVHandler(t *testing.T) { + var remoteClusterID = 111195 + var localClusterID = 111196 + var revision = 1 + + etcdSrv := &EtcdServer{} + etcdSrv.cluster = newTestCluster(t, nil) + etcdSrv.cluster.SetID(types.ID(localClusterID), types.ID(localClusterID)) + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + ph := &hashKVHandler{ + lg: zap.NewNop(), + server: etcdSrv, + } + srv := httptest.NewServer(ph) + defer srv.Close() + + tests := []struct { + name string + remoteClusterID int + wcode int + wKeyWords string + }{ + { + name: "HashKV returns 200 if cluster hash matches", + remoteClusterID: localClusterID, + wcode: http.StatusOK, + wKeyWords: "", + }, + { + name: "HashKV returns 400 if cluster hash doesn't matche", + remoteClusterID: remoteClusterID, + wcode: http.StatusPreconditionFailed, + wKeyWords: "cluster ID mismatch", + }, + } + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hashReq := &pb.HashKVRequest{Revision: int64(revision)} + hashReqBytes, err := json.Marshal(hashReq) + if err != nil { + t.Fatalf("failed to marshal request: %v", err) + } + req, err := http.NewRequest(http.MethodGet, srv.URL+PeerHashKVPath, bytes.NewReader(hashReqBytes)) + if err != nil { + t.Fatalf("failed to create request: %v", err) + } + req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(uint64(tt.remoteClusterID), 16)) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("failed to get http response: %v", err) + } + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + t.Fatalf("unexpected io.ReadAll error: %v", err) + } + if resp.StatusCode != tt.wcode { + t.Fatalf("#%d: code = %d, want %d", i, resp.StatusCode, tt.wcode) + } + if resp.StatusCode != http.StatusOK { + if !strings.Contains(string(body), tt.wKeyWords) { + t.Errorf("#%d: body: %s, want body to contain keywords: %s", i, string(body), tt.wKeyWords) + } + return + } + + hashKVResponse := pb.HashKVResponse{} + err = json.Unmarshal(body, &hashKVResponse) + if err != nil { + t.Fatalf("unmarshal response error: %v", err) + } + hashValue, _, err := etcdSrv.KV().HashStorage().HashByRev(int64(revision)) + if err != nil { + t.Fatalf("etcd server hash failed: %v", err) + } + if hashKVResponse.Hash != hashValue.Hash { + t.Fatalf("hash value inconsistent: %d != %d", hashKVResponse.Hash, hashValue) + } + }) + } +} diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index bc39a5ac48a..43156cd1ad3 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -181,6 +181,17 @@ type etcdProcessClusterConfig struct { // newEtcdProcessCluster launches a new cluster from etcd processes, returning // a new etcdProcessCluster once all nodes are ready to accept client requests. func newEtcdProcessCluster(t testing.TB, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { + epc, err := initEtcdProcessCluster(t, cfg) + if err != nil { + return nil, err + } + + return startEtcdProcessCluster(epc, cfg) +} + +// initEtcdProcessCluster initializes a new cluster based on the given config. +// It doesn't start the cluster. +func initEtcdProcessCluster(t testing.TB, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { skipInShortMode(t) etcdCfgs := cfg.etcdServerProcessConfigs(t) @@ -199,7 +210,11 @@ func newEtcdProcessCluster(t testing.TB, cfg *etcdProcessClusterConfig) (*etcdPr } epc.procs[i] = proc } + return epc, nil +} +// startEtcdProcessCluster launches a new cluster from etcd processes. +func startEtcdProcessCluster(epc *etcdProcessCluster, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { if cfg.rollingStart { if err := epc.RollingStart(); err != nil { return nil, fmt.Errorf("Cannot rolling-start: %v", err) diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index 5eb9cafb66e..451311a6f74 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -96,6 +96,91 @@ func corruptTest(cx ctlCtx) { waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) } +func TestInPlaceRecovery(t *testing.T) { + basePort := 20000 + BeforeTest(t) + + // Initialize the cluster. + epcOld, err := newEtcdProcessCluster(t, + &etcdProcessClusterConfig{ + clusterSize: 3, + initialToken: "old", + keepDataDir: false, + CorruptCheckTime: time.Second, + basePort: basePort, + }, + ) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epcOld.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + t.Log("old cluster started.") + + //Put some data into the old cluster, so that after recovering from a blank db, the hash diverges. + t.Log("putting 10 keys...") + + oldCc := NewEtcdctl(epcOld.EndpointsV3(), clientNonTLS, false, false) + for i := 0; i < 10; i++ { + err := oldCc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + + // Create a new cluster config, but with the same port numbers. In this way the new servers can stay in + // contact with the old ones. + epcNewConfig := &etcdProcessClusterConfig{ + clusterSize: 3, + initialToken: "new", + keepDataDir: false, + CorruptCheckTime: time.Second, + basePort: basePort, + initialCorruptCheck: true, + } + epcNew, err := initEtcdProcessCluster(t, epcNewConfig) + if err != nil { + t.Fatalf("could not init etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epcNew.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + newCc := NewEtcdctl(epcNew.EndpointsV3(), clientNonTLS, false, false) + assert.NoError(t, err) + + // Rolling recovery of the servers. + t.Log("rolling updating servers in place...") + for i, newProc := range epcNew.procs { + oldProc := epcOld.procs[i] + err = oldProc.Close() + if err != nil { + t.Fatalf("could not stop etcd process (%v)", err) + } + t.Logf("old cluster server %d: %s stopped.", i, oldProc.Config().name) + err = newProc.Start() + if err != nil { + t.Fatalf("could not start etcd process (%v)", err) + } + t.Logf("new cluster server %d: %s started in-place with blank db.", i, newProc.Config().name) + t.Log("sleeping 5 sec to let nodes do periodical check...") + time.Sleep(5 * time.Second) + } + t.Log("new cluster started.") + + alarmResponse, err := newCc.AlarmList() + assert.NoError(t, err, "error on alarm list") + for _, alarm := range alarmResponse.Alarms { + if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT { + t.Fatalf("there is no corruption after in-place recovery, but corruption reported.") + } + } + t.Log("no corruption detected.") +} + func TestPeriodicCheckDetectsCorruption(t *testing.T) { checkTime := time.Second BeforeTest(t) diff --git a/tests/integration/hashkv_test.go b/tests/integration/hashkv_test.go index b99173f7ba6..81fa4cd6164 100644 --- a/tests/integration/hashkv_test.go +++ b/tests/integration/hashkv_test.go @@ -52,13 +52,14 @@ func TestCompactionHash(t *testing.T) { }, } - testutil.TestCompactionHash(context.Background(), t, hashTestCase{cc, clus.Members[0].GRPCURL(), client}, 1000) + testutil.TestCompactionHash(context.Background(), t, hashTestCase{cc, clus.Members[0].GRPCURL(), client, clus.Members[0].s}, 1000) } type hashTestCase struct { *clientv3.Client - url string - http *http.Client + url string + http *http.Client + server *etcdserver.EtcdServer } func (tc hashTestCase) Put(ctx context.Context, key, value string) error { @@ -72,7 +73,7 @@ func (tc hashTestCase) Delete(ctx context.Context, key string) error { } func (tc hashTestCase) HashByRev(ctx context.Context, rev int64) (testutil.KeyValueHash, error) { - resp, err := etcdserver.HashByRev(ctx, tc.http, "http://unix", rev) + resp, err := etcdserver.HashByRev(ctx, tc.server.Cluster().ID(), tc.http, "http://unix", rev) return testutil.KeyValueHash{Hash: resp.Hash, CompactRevision: resp.CompactRevision, Revision: resp.Header.Revision}, err }