-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
client_atomic_membership_change_test.go
159 lines (141 loc) · 5.43 KB
/
client_atomic_membership_change_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package storage_test
import (
"context"
"strings"
"testing"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/kr/pretty"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/confchange"
"go.etcd.io/etcd/raft/tracker"
)
// TestAtomicReplicationChange is a simple smoke test for atomic membership
// changes.
func TestAtomicReplicationChange(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{},
},
},
ReplicationMode: base.ReplicationManual,
}
tc := testcluster.StartTestCluster(t, 6, args)
defer tc.Stopper().Stop(ctx)
_, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.atomic_replication_changes.enabled = true`)
require.NoError(t, err)
// Create a range and put it on n1, n2, n3. Intentionally do this one at a
// time so we're not using atomic replication changes yet.
k := tc.ScratchRange(t)
desc, err := tc.AddReplicas(k, tc.Target(1))
require.NoError(t, err)
desc, err = tc.AddReplicas(k, tc.Target(2))
require.NoError(t, err)
runChange := func(expDesc roachpb.RangeDescriptor, chgs []roachpb.ReplicationChange) roachpb.RangeDescriptor {
t.Helper()
desc, err := tc.Servers[0].DB().AdminChangeReplicas(
// TODO(tbg): when 19.2 is out, remove this "feature gate" here and in
// AdminChangeReplicas.
context.WithValue(ctx, "testing", "testing"),
k, expDesc, chgs,
)
require.NoError(t, err)
return *desc
}
checkDesc := func(desc roachpb.RangeDescriptor, expStores ...roachpb.StoreID) {
testutils.SucceedsSoon(t, func() error {
var sawStores []roachpb.StoreID
for _, s := range tc.Servers {
r, _ := s.Stores().GetReplicaForRangeID(desc.RangeID)
if r == nil {
continue
}
if _, found := desc.GetReplicaDescriptor(r.StoreID()); !found {
// There's a replica but it's not in the new descriptor, so
// it should be replicaGC'ed soon.
return errors.Errorf("%s should have been removed", r)
}
sawStores = append(sawStores, r.StoreID())
// Check that in-mem descriptor of repl is up-to-date.
if diff := pretty.Diff(&desc, r.Desc()); len(diff) > 0 {
return errors.Errorf("diff(want, have):\n%s", strings.Join(diff, "\n"))
}
// Check that conf state is up to date. This can fail even though
// the descriptor already matches since the descriptor is updated
// a hair earlier.
cfg, _, err := confchange.Restore(confchange.Changer{
Tracker: tracker.MakeProgressTracker(1),
LastIndex: 1,
}, desc.Replicas().ConfState())
require.NoError(t, err)
act := r.RaftStatus().Config.Voters
if diff := pretty.Diff(cfg.Voters, act); len(diff) > 0 {
return errors.Errorf("diff(exp,act):\n%s", strings.Join(diff, "\n"))
}
}
assert.Equal(t, expStores, sawStores)
return nil
})
}
// Run a fairly general change.
desc = runChange(desc, []roachpb.ReplicationChange{
{ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(3)},
{ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(5)},
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(2)},
{ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(4)},
})
// Replicas should now live on all stores except s3.
checkDesc(desc, 1, 2, 4, 5, 6)
// Transfer the lease to s5.
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(4)))
// Rebalance back down all the way.
desc = runChange(desc, []roachpb.ReplicationChange{
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(0)},
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(1)},
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(3)},
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(5)},
})
// Only a lone voter on s5 should be left over.
checkDesc(desc, 5)
}
// TODO(tbg): finish this test, add comments.
func TestAtomicReplicationChangeMultipleLearners(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
knobs, ltk := makeReplicationTestKnobs()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
db.Exec(t, `SET CLUSTER SETTING kv.atomic_replication_changes.enabled = true`)
k := tc.ScratchRange(t)
var desc roachpb.RangeDescriptor
ltk.withStopAfterLearnerAtomic(func() {
desc = tc.AddReplicasOrFatal(t, k, tc.Target(1), tc.Target(2))
})
require.Len(t, desc.Replicas().Learners(), 2, desc)
desc = tc.RemoveReplicasOrFatal(t, k, tc.Target(1), tc.Target(2))
require.Len(t, desc.Replicas().Learners(), 0, desc)
}