diff --git a/integration/v3_alarm_test.go b/integration/v3_alarm_test.go new file mode 100644 index 000000000000..4582e1e0064e --- /dev/null +++ b/integration/v3_alarm_test.go @@ -0,0 +1,206 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "os" + "sync" + "testing" + "time" + + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/pkg/testutil" + "golang.org/x/net/context" +) + +// TestV3StorageQuotaApply tests the V3 server respects quotas during apply +func TestV3StorageQuotaApply(t *testing.T) { + testutil.AfterTest(t) + quotasize := int64(16 * os.Getpagesize()) + + clus := NewClusterV3(t, &ClusterConfig{Size: 2}) + defer clus.Terminate(t) + kvc0 := toGRPC(clus.Client(0)).KV + kvc1 := toGRPC(clus.Client(1)).KV + + // Set a quota on one node + clus.Members[0].QuotaBackendBytes = quotasize + clus.Members[0].Stop(t) + clus.Members[0].Restart(t) + clus.waitLeader(t, clus.Members) + waitForRestart(t, kvc0) + + key := []byte("abc") + + // test small put still works + smallbuf := make([]byte, 1024) + _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) + if serr != nil { + t.Fatal(serr) + } + + // test big put + bigbuf := make([]byte, quotasize) + _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf}) + if err != nil { + t.Fatal(err) + } + + // quorum get should work regardless of whether alarm is raised + _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}) + if err != nil { + t.Fatal(err) + } + + // wait until alarm is raised for sure-- poll the alarms + stopc := time.After(5 * time.Second) + for { + req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET} + resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req) + if aerr != nil { + t.Fatal(aerr) + } + if len(resp.Alarms) != 0 { + break + } + select { + case <-stopc: + t.Fatalf("timed out waiting for alarm") + case <-time.After(10 * time.Millisecond): + } + } + + // small quota machine should reject put + if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + t.Fatalf("past-quota instance should reject put") + } + + // large quota machine should reject put + if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + t.Fatalf("past-quota instance should reject put") + } + + // reset large quota node to ensure alarm persisted + clus.Members[1].Stop(t) + clus.Members[1].Restart(t) + clus.waitLeader(t, clus.Members) + + if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + t.Fatalf("alarmed instance should reject put after reset") + } +} + +// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through. +func TestV3AlarmDeactivate(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + kvc := toGRPC(clus.RandClient()).KV + mt := toGRPC(clus.RandClient()).Maintenance + + alarmReq := &pb.AlarmRequest{ + MemberID: 123, + Action: pb.AlarmRequest_ACTIVATE, + Alarm: pb.AlarmType_NOSPACE, + } + if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil { + t.Fatal(err) + } + + key := []byte("abc") + smallbuf := make([]byte, 512) + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) + if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) { + t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace) + } + + alarmReq.Action = pb.AlarmRequest_DEACTIVATE + if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil { + t.Fatal(err) + } + + if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil { + t.Fatal(err) + } +} + +type fakeConsistentIndex struct{ rev uint64 } + +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev } + +func TestV3CorruptAlarm(t *testing.T) { + oldInvl := etcdserver.CorruptCheckInterval + defer func() { etcdserver.CorruptCheckInterval = oldInvl }() + etcdserver.CorruptCheckInterval = 2 * time.Second + + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + if _, err := clus.Client(i%3).Put(context.TODO(), "k", "v"); err != nil { + t.Fatal(err) + } + }() + } + wg.Wait() + + // corrupt member 0 + clus.Members[0].Stop(t) + be := backend.NewDefaultBackend(clus.Members[0].DataDir + "/member/snap/db") + s := mvcc.NewStore(be, nil, &fakeConsistentIndex{13}) + if r, err := s.Range([]byte("k"), nil, mvcc.RangeOptions{}); err != nil || len(r.KVs) == 0 { + t.Fatalf("key 'k' does not exist (%v)", err) + } + s.Put([]byte("abc"), []byte("def"), 0) + s.Put([]byte("xyz"), []byte("123"), 0) + s.Compact(5) + s.Commit() + s.Close() + be.Close() + + clus.Client(1).Put(context.TODO(), "xyz", "321") + clus.Client(1).Put(context.TODO(), "abc", "fed") + + // member 0 restarts into split brain + clus.Members[0].Restart(t) + + resp0, err0 := clus.Client(0).Get(context.TODO(), "abc") + if err0 != nil { + t.Fatal(err0) + } + resp1, err1 := clus.Client(1).Get(context.TODO(), "abc") + if err1 != nil { + t.Fatal(err1) + } + + if resp0.Kvs[0].ModRevision == resp1.Kvs[0].ModRevision { + t.Fatalf("matching ModRevision values") + } + + time.Sleep(3 * time.Second) + + presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa") + if perr == nil || !eqErrGRPC(perr, rpctypes.ErrCorrupt) { + t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr) + } +} diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 13c2b875636f..fb131f090b87 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -1277,116 +1277,6 @@ func TestV3StorageQuotaAPI(t *testing.T) { } } -// TestV3StorageQuotaApply tests the V3 server respects quotas during apply -func TestV3StorageQuotaApply(t *testing.T) { - testutil.AfterTest(t) - quotasize := int64(16 * os.Getpagesize()) - - clus := NewClusterV3(t, &ClusterConfig{Size: 2}) - defer clus.Terminate(t) - kvc0 := toGRPC(clus.Client(0)).KV - kvc1 := toGRPC(clus.Client(1)).KV - - // Set a quota on one node - clus.Members[0].QuotaBackendBytes = quotasize - clus.Members[0].Stop(t) - clus.Members[0].Restart(t) - clus.waitLeader(t, clus.Members) - waitForRestart(t, kvc0) - - key := []byte("abc") - - // test small put still works - smallbuf := make([]byte, 1024) - _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) - if serr != nil { - t.Fatal(serr) - } - - // test big put - bigbuf := make([]byte, quotasize) - _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf}) - if err != nil { - t.Fatal(err) - } - - // quorum get should work regardless of whether alarm is raised - _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}) - if err != nil { - t.Fatal(err) - } - - // wait until alarm is raised for sure-- poll the alarms - stopc := time.After(5 * time.Second) - for { - req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET} - resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req) - if aerr != nil { - t.Fatal(aerr) - } - if len(resp.Alarms) != 0 { - break - } - select { - case <-stopc: - t.Fatalf("timed out waiting for alarm") - case <-time.After(10 * time.Millisecond): - } - } - - // small quota machine should reject put - if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { - t.Fatalf("past-quota instance should reject put") - } - - // large quota machine should reject put - if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { - t.Fatalf("past-quota instance should reject put") - } - - // reset large quota node to ensure alarm persisted - clus.Members[1].Stop(t) - clus.Members[1].Restart(t) - clus.waitLeader(t, clus.Members) - - if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { - t.Fatalf("alarmed instance should reject put after reset") - } -} - -// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through. -func TestV3AlarmDeactivate(t *testing.T) { - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - defer clus.Terminate(t) - kvc := toGRPC(clus.RandClient()).KV - mt := toGRPC(clus.RandClient()).Maintenance - - alarmReq := &pb.AlarmRequest{ - MemberID: 123, - Action: pb.AlarmRequest_ACTIVATE, - Alarm: pb.AlarmType_NOSPACE, - } - if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil { - t.Fatal(err) - } - - key := []byte("abc") - smallbuf := make([]byte, 512) - _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) - if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) { - t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace) - } - - alarmReq.Action = pb.AlarmRequest_DEACTIVATE - if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil { - t.Fatal(err) - } - - if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil { - t.Fatal(err) - } -} - func TestV3RangeRequest(t *testing.T) { defer testutil.AfterTest(t) tests := []struct {