Skip to content

Commit

Permalink
storage: allow atomic replication changes in ChangeReplicas
Browse files Browse the repository at this point in the history
They default to OFF.

This needs a lot more tests which will be added separately in the course
of switching the default to ON and will focus on the interactions of
joint states with everything else in the system.

We'll also need another audit of consumers of the replica descriptors to
make sure nothing was missed in the first pass.

Release note: None
  • Loading branch information
tbg committed Aug 26, 2019
1 parent f71a407 commit 626922c
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 108 deletions.
3 changes: 2 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<tr><td><code>kv.allocator.load_based_rebalancing</code></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><code>kv.allocator.qps_rebalance_threshold</code></td><td>float</td><td><code>0.25</code></td><td>minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.range_rebalance_threshold</code></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.atomic_replication_changes.enabled</code></td><td>boolean</td><td><code>false</code></td><td>use atomic replication changes</td></tr>
<tr><td><code>kv.bulk_ingest.batch_size</code></td><td>byte size</td><td><code>16 MiB</code></td><td>the maximum size of the payload in an AddSSTable request</td></tr>
<tr><td><code>kv.bulk_ingest.buffer_increment</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the size by which the BulkAdder attempts to grow its buffer before flushing</td></tr>
<tr><td><code>kv.bulk_ingest.index_buffer_size</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the initial size of the BulkAdder buffer handling secondary index imports</td></tr>
Expand Down Expand Up @@ -129,6 +130,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>19.1-8</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>19.1-9</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
118 changes: 95 additions & 23 deletions pkg/storage/client_atomic_membership_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,122 @@ package storage_test

import (
"context"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/kr/pretty"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/confchange"
"go.etcd.io/etcd/raft/tracker"
)

func TestAtomicMembershipChange(t *testing.T) {
// TestAtomicReplicationChange is a simple smoke test for atomic membership
// changes.
func TestAtomicReplicationChange(t *testing.T) {
defer leaktest.AfterTest(t)()

// This is a simple smoke test to spot obvious issues with atomic replica changes.
// These aren't implemented at the time of writing. The compound change below is
// internally unwound and executed step by step (see executeAdminBatch()).
ctx := context.Background()

args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{},
},
},
ReplicationMode: base.ReplicationManual,
}
tc := testcluster.StartTestCluster(t, 4, args)
tc := testcluster.StartTestCluster(t, 6, args)
defer tc.Stopper().Stop(ctx)

_, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.atomic_replication_changes.enabled = true`)
require.NoError(t, err)

// Create a range and put it on n1, n2, n3. Intentionally do this one at a
// time so we're not using atomic replication changes yet.
k := tc.ScratchRange(t)
expDesc, err := tc.AddReplicas(k, tc.Target(1), tc.Target(2))
desc, err := tc.AddReplicas(k, tc.Target(1))
require.NoError(t, err)
desc, err = tc.AddReplicas(k, tc.Target(2))
require.NoError(t, err)

// Range is now on s1,s2,s3. "Atomically" add it to s4 while removing from s3.
// This isn't really atomic yet.
runChange := func(expDesc roachpb.RangeDescriptor, chgs []roachpb.ReplicationChange) roachpb.RangeDescriptor {
t.Helper()
desc, err := tc.Servers[0].DB().AdminChangeReplicas(
// TODO(tbg): when 19.2 is out, remove this "feature gate" here and in
// AdminChangeReplicas.
context.WithValue(ctx, "testing", "testing"),
k, expDesc, chgs,
)
require.NoError(t, err)

chgs := []roachpb.ReplicationChange{
{ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(3)},
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(2)},
return *desc
}
// TODO(tbg): when 19.2 is out, remove this "feature gate" here and in
// AdminChangeReplicas.
ctx = context.WithValue(ctx, "testing", "testing")
desc, err := tc.Servers[0].DB().AdminChangeReplicas(ctx, k, expDesc, chgs)
require.NoError(t, err)
var stores []roachpb.StoreID
for _, rDesc := range desc.Replicas().All() {
stores = append(stores, rDesc.StoreID)

checkDesc := func(desc roachpb.RangeDescriptor, expStores ...roachpb.StoreID) {
testutils.SucceedsSoon(t, func() error {
var sawStores []roachpb.StoreID
for _, s := range tc.Servers {
r, _ := s.Stores().GetReplicaForRangeID(desc.RangeID)
if r == nil {
continue
}
if _, found := desc.GetReplicaDescriptor(r.StoreID()); !found {
// There's a replica but it's not in the new descriptor, so
// it should be replicaGC'ed soon.
return errors.Errorf("%s should have been removed", r)
}
sawStores = append(sawStores, r.StoreID())
// Check that in-mem descriptor of repl is up-to-date.
if diff := pretty.Diff(&desc, r.Desc()); len(diff) > 0 {
return errors.Errorf("diff(want, have):\n%s", strings.Join(diff, "\n"))
}
// Check that conf state is up to date. This can fail even though
// the descriptor already matches since the descriptor is updated
// a hair earlier.
cfg, _, err := confchange.Restore(confchange.Changer{
Tracker: tracker.MakeProgressTracker(1),
LastIndex: 1,
}, desc.Replicas().ConfState())
require.NoError(t, err)
act := r.RaftStatus().Config.Voters
if diff := pretty.Diff(cfg.Voters, act); len(diff) > 0 {
return errors.Errorf("diff(exp,act):\n%s", strings.Join(diff, "\n"))
}
}
assert.Equal(t, expStores, sawStores)
return nil
})
}
exp := []roachpb.StoreID{1, 2, 4}
// TODO(tbg): test more details and scenarios (learners etc).
require.Equal(t, exp, stores)

// Run a fairly general change.
desc = runChange(desc, []roachpb.ReplicationChange{
{ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(3)},
{ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(5)},
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(2)},
{ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(4)},
})

// Replicas should now live on all stores except s3.
checkDesc(desc, 1, 2, 4, 5, 6)

// Transfer the lease to s5.
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(4)))

// Rebalance back down all the way.
desc = runChange(desc, []roachpb.ReplicationChange{
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(0)},
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(1)},
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(3)},
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(5)},
})

// Only a lone voter on s5 should be left over.
checkDesc(desc, 5)
}
10 changes: 0 additions & 10 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,6 @@ var useAtomicReplicationChanges = settings.RegisterBoolSetting(
false,
)

// FatalAtomicReplicationChangeUnimplemented is called by code that will need
// to change when atomic replication changes are available to make sure it is
// updated accordingly.
//
// TODO(tbg): update all callers and remove this method in the commit that allows
// atomic replication changes.
func FatalAtomicReplicationChangeUnimplemented(ctx context.Context) {
log.Fatalf(ctx, "atomic configuration changes not yet implemented")
}

// MaxCommandSizeFloor is the minimum allowed value for the MaxCommandSize
// cluster setting.
const MaxCommandSizeFloor = 4 << 20 // 4MB
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (d *decodedRaftEntry) decode(ctx context.Context, e *raftpb.Entry) error {
switch e.Type {
case raftpb.EntryNormal:
return d.decodeNormalEntry(e)
case raftpb.EntryConfChange:
case raftpb.EntryConfChange, raftpb.EntryConfChangeV2:
return d.decodeConfChangeEntry(e)
default:
log.Fatalf(ctx, "unexpected Raft entry: %v", e)
Expand Down
Loading

0 comments on commit 626922c

Please sign in to comment.