From 626922c46cccf4d914f5119cc9fbf63e124b0c0f Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 23 Aug 2019 15:24:13 +0200 Subject: [PATCH] storage: allow atomic replication changes in ChangeReplicas They default to OFF. This needs a lot more tests which will be added separately in the course of switching the default to ON and will focus on the interactions of joint states with everything else in the system. We'll also need another audit of consumers of the replica descriptors to make sure nothing was missed in the first pass. Release note: None --- docs/generated/settings/settings.html | 3 +- .../client_atomic_membership_change_test.go | 118 ++++++++-- pkg/storage/replica.go | 10 - pkg/storage/replica_application_cmd.go | 2 +- pkg/storage/replica_command.go | 210 ++++++++++++------ pkg/storage/replicate_queue.go | 3 +- 6 files changed, 238 insertions(+), 108 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 2fd81563fbb5..49d32aba3344 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -27,6 +27,7 @@ kv.allocator.load_based_rebalancingenumerationleases and replicaswhether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2] kv.allocator.qps_rebalance_thresholdfloat0.25minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull kv.allocator.range_rebalance_thresholdfloat0.05minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull +kv.atomic_replication_changes.enabledbooleanfalseuse atomic replication changes kv.bulk_ingest.batch_sizebyte size16 MiBthe maximum size of the payload in an AddSSTable request kv.bulk_ingest.buffer_incrementbyte size32 MiBthe size by which the BulkAdder attempts to grow its buffer before flushing kv.bulk_ingest.index_buffer_sizebyte size32 MiBthe initial size of the BulkAdder buffer handling secondary index imports @@ -129,6 +130,6 @@ trace.debug.enablebooleanfalseif set, traces for recent requests can be seen in the /debug page trace.lightstep.tokenstringif set, traces go to Lightstep using this token trace.zipkin.collectorstringif set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set -versioncustom validation19.1-8set the active cluster version in the format '.' +versioncustom validation19.1-9set the active cluster version in the format '.' diff --git a/pkg/storage/client_atomic_membership_change_test.go b/pkg/storage/client_atomic_membership_change_test.go index a731d01f0cfc..53bc697b2b85 100644 --- a/pkg/storage/client_atomic_membership_change_test.go +++ b/pkg/storage/client_atomic_membership_change_test.go @@ -12,50 +12,122 @@ package storage_test import ( "context" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/kr/pretty" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft/confchange" + "go.etcd.io/etcd/raft/tracker" ) -func TestAtomicMembershipChange(t *testing.T) { +// TestAtomicReplicationChange is a simple smoke test for atomic membership +// changes. +func TestAtomicReplicationChange(t *testing.T) { defer leaktest.AfterTest(t)() - - // This is a simple smoke test to spot obvious issues with atomic replica changes. - // These aren't implemented at the time of writing. The compound change below is - // internally unwound and executed step by step (see executeAdminBatch()). ctx := context.Background() args := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &storage.StoreTestingKnobs{}, + }, + }, ReplicationMode: base.ReplicationManual, } - tc := testcluster.StartTestCluster(t, 4, args) + tc := testcluster.StartTestCluster(t, 6, args) defer tc.Stopper().Stop(ctx) + _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.atomic_replication_changes.enabled = true`) + require.NoError(t, err) + + // Create a range and put it on n1, n2, n3. Intentionally do this one at a + // time so we're not using atomic replication changes yet. k := tc.ScratchRange(t) - expDesc, err := tc.AddReplicas(k, tc.Target(1), tc.Target(2)) + desc, err := tc.AddReplicas(k, tc.Target(1)) + require.NoError(t, err) + desc, err = tc.AddReplicas(k, tc.Target(2)) require.NoError(t, err) - // Range is now on s1,s2,s3. "Atomically" add it to s4 while removing from s3. - // This isn't really atomic yet. + runChange := func(expDesc roachpb.RangeDescriptor, chgs []roachpb.ReplicationChange) roachpb.RangeDescriptor { + t.Helper() + desc, err := tc.Servers[0].DB().AdminChangeReplicas( + // TODO(tbg): when 19.2 is out, remove this "feature gate" here and in + // AdminChangeReplicas. + context.WithValue(ctx, "testing", "testing"), + k, expDesc, chgs, + ) + require.NoError(t, err) - chgs := []roachpb.ReplicationChange{ - {ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(3)}, - {ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(2)}, + return *desc } - // TODO(tbg): when 19.2 is out, remove this "feature gate" here and in - // AdminChangeReplicas. - ctx = context.WithValue(ctx, "testing", "testing") - desc, err := tc.Servers[0].DB().AdminChangeReplicas(ctx, k, expDesc, chgs) - require.NoError(t, err) - var stores []roachpb.StoreID - for _, rDesc := range desc.Replicas().All() { - stores = append(stores, rDesc.StoreID) + + checkDesc := func(desc roachpb.RangeDescriptor, expStores ...roachpb.StoreID) { + testutils.SucceedsSoon(t, func() error { + var sawStores []roachpb.StoreID + for _, s := range tc.Servers { + r, _ := s.Stores().GetReplicaForRangeID(desc.RangeID) + if r == nil { + continue + } + if _, found := desc.GetReplicaDescriptor(r.StoreID()); !found { + // There's a replica but it's not in the new descriptor, so + // it should be replicaGC'ed soon. + return errors.Errorf("%s should have been removed", r) + } + sawStores = append(sawStores, r.StoreID()) + // Check that in-mem descriptor of repl is up-to-date. + if diff := pretty.Diff(&desc, r.Desc()); len(diff) > 0 { + return errors.Errorf("diff(want, have):\n%s", strings.Join(diff, "\n")) + } + // Check that conf state is up to date. This can fail even though + // the descriptor already matches since the descriptor is updated + // a hair earlier. + cfg, _, err := confchange.Restore(confchange.Changer{ + Tracker: tracker.MakeProgressTracker(1), + LastIndex: 1, + }, desc.Replicas().ConfState()) + require.NoError(t, err) + act := r.RaftStatus().Config.Voters + if diff := pretty.Diff(cfg.Voters, act); len(diff) > 0 { + return errors.Errorf("diff(exp,act):\n%s", strings.Join(diff, "\n")) + } + } + assert.Equal(t, expStores, sawStores) + return nil + }) } - exp := []roachpb.StoreID{1, 2, 4} - // TODO(tbg): test more details and scenarios (learners etc). - require.Equal(t, exp, stores) + + // Run a fairly general change. + desc = runChange(desc, []roachpb.ReplicationChange{ + {ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(3)}, + {ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(5)}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(2)}, + {ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(4)}, + }) + + // Replicas should now live on all stores except s3. + checkDesc(desc, 1, 2, 4, 5, 6) + + // Transfer the lease to s5. + require.NoError(t, tc.TransferRangeLease(desc, tc.Target(4))) + + // Rebalance back down all the way. + desc = runChange(desc, []roachpb.ReplicationChange{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(0)}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(1)}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(3)}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(5)}, + }) + + // Only a lone voter on s5 should be left over. + checkDesc(desc, 5) } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 2d84d179b663..304c20532d33 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -92,16 +92,6 @@ var useAtomicReplicationChanges = settings.RegisterBoolSetting( false, ) -// FatalAtomicReplicationChangeUnimplemented is called by code that will need -// to change when atomic replication changes are available to make sure it is -// updated accordingly. -// -// TODO(tbg): update all callers and remove this method in the commit that allows -// atomic replication changes. -func FatalAtomicReplicationChangeUnimplemented(ctx context.Context) { - log.Fatalf(ctx, "atomic configuration changes not yet implemented") -} - // MaxCommandSizeFloor is the minimum allowed value for the MaxCommandSize // cluster setting. const MaxCommandSizeFloor = 4 << 20 // 4MB diff --git a/pkg/storage/replica_application_cmd.go b/pkg/storage/replica_application_cmd.go index 53e06bc37f92..d9bb3b2cfc92 100644 --- a/pkg/storage/replica_application_cmd.go +++ b/pkg/storage/replica_application_cmd.go @@ -170,7 +170,7 @@ func (d *decodedRaftEntry) decode(ctx context.Context, e *raftpb.Entry) error { switch e.Type { case raftpb.EntryNormal: return d.decodeNormalEntry(e) - case raftpb.EntryConfChange: + case raftpb.EntryConfChange, raftpb.EntryConfChangeV2: return d.decodeConfChangeEntry(e) default: log.Fatalf(ctx, "unexpected Raft entry: %v", e) diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index dd5a2980458a..e791cebf7817 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -143,6 +143,15 @@ func (r *Replica) adminSplitWithDescriptor( delayable bool, reason string, ) (roachpb.AdminSplitResponse, error) { + var err error + // The split queue doesn't care about the set of replicas, so if we somehow + // are being handed one that's in a joint state, finalize that before + // continuing. + desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc) + if err != nil { + return roachpb.AdminSplitResponse{}, err + } + var reply roachpb.AdminSplitResponse // Determine split key if not provided with args. This scan is @@ -570,10 +579,11 @@ func (r *Replica) AdminMerge( // Should never happen, but just in case. return errors.Errorf("ranges are not adjacent; %s != %s", origLeftDesc.EndKey, rightDesc.StartKey) } - // For simplicity, don't handle learner replicas, expect the caller to - // resolve them first. (Defensively, we check that there are no non-voter - // replicas, in case some third type is later added). This behavior can be - // changed later if the complexity becomes worth it, but it's not right now. + // For simplicity, don't handle learner replicas or joint states, expect + // the caller to resolve them first. (Defensively, we check that there + // are no non-voter replicas, in case some third type is later added). + // This behavior can be changed later if the complexity becomes worth + // it, but it's not right now. lReplicas, rReplicas := origLeftDesc.Replicas(), rightDesc.Replicas() if len(lReplicas.Voters()) != len(lReplicas.All()) { return errors.Errorf("cannot merge range with non-voter replicas on lhs: %s", lReplicas) @@ -808,9 +818,6 @@ func IsSnapshotError(err error) bool { // leaseholder (which in particular implies that we can never remove all // replicas). // -// NB: at the time of writing, atomic replication changes are not implemented -// yet. Only a single change is supported, everything else returns an error. -// // The returned RangeDescriptor is the new value of the range's descriptor // following the successful commit of the transaction. // @@ -849,29 +856,12 @@ func IsSnapshotError(err error) bool { // protocol; otherwise it has to use joint consensus. In this latter mechanism, // a first configuration change is made which results in a configuration ("joint // configuration") in which a quorum of both the old replicas and the new -// replica sets is required for decision making. Since this joint configuration -// is not represented in the RangeDescriptor (which is the source of truth of -// the replication configuration), additional information about the joint state -// is persisted under RangeDescriptorJointKey, a replicated key located on the -// range similar to the range descriptor (but not versioned). Raft will -// automatically transition out of this joint configuration as soon as it has -// properly been applied (and we clear the extra replicated state atomically); -// ChangeReplicas ensures the final configuration is active before returning. -// -// TODO(tbg): figure out how the "waiting to transition out of the joint config" -// will happen. We want Raft to auto-transition out (rather than doing it -// manually) because that way we know we'll leave that joint state even if the -// coordinator crashes etc. On the other hand, the polling seems a little -// difficult to do idiomatically. If many replication changes are carried out -// back to back, what do we wait for? We only need to know that some replica -// (typically the leader) has transitioned out (i.e. there's no requirement that -// the local replica has done so). It seems most straightforward to stash the -// joint state in an inline key that can be read through KV (mutated below -// Raft), and make sure the generation of the replication change is preserved. -// Then we just need to poll that the key goes away or has a larger generation -// (indicating that we transitioned out of "our" conf change and into another -// joint config driven by someone else). We can avoid the poll in the common -// case by proposing an empty entry first. +// replica sets is required for decision making. Transitioning into this joint +// configuration, the RangeDescriptor (which is the source of truth of +// the replication configuration) is updated with corresponding replicas of +// type VoterIncoming and VoterOutgoing. +// Immediately after committing this change, a second transition updates the +// descriptor with and activates the final configuration. // // A replica that learns that it was removed will queue itself for replicaGC. // Note that a removed replica may never apply the configuration change removing @@ -917,9 +907,6 @@ func (r *Replica) ChangeReplicas( } return desc, nil } - if len(chgs) > 1 { - FatalAtomicReplicationChangeUnimplemented(ctx) - } // Atomic replication change. return r.changeReplicasImpl(ctx, desc, priority, reason, details, chgs) } @@ -932,10 +919,13 @@ func (r *Replica) changeReplicasImpl( details string, chgs roachpb.ReplicationChanges, ) (updatedDesc *roachpb.RangeDescriptor, _ error) { - if len(chgs) != 1 { - // TODO(tbg): lift this restriction when atomic membership changes are - // plumbed into raft. - return nil, errors.Errorf("need exactly one change, got %+v", chgs) + var err error + // If in a joint config, clean up. The assumption here is that the caller + // of ChangeReplicas didn't even realize that they were holding on to a + // joint descriptor and would rather not have to deal with that fact. + desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc) + if err != nil { + return nil, err } if err := validateReplicationChanges(desc, chgs); err != nil { @@ -955,7 +945,7 @@ func (r *Replica) changeReplicasImpl( return r.addReplicaLegacyPreemptiveSnapshot(ctx, chgs[0].Target, desc, priority, reason, details) } // We're removing a single voter. - return r.finalizeChangeReplicas(ctx, desc, priority, reason, details, chgs) + return r.atomicReplicationChange(ctx, desc, priority, reason, details, chgs) } if adds := chgs.Additions(); len(adds) > 0 { @@ -972,8 +962,13 @@ func (r *Replica) changeReplicasImpl( // Catch up any learners, then run the atomic replication change that adds the // final voters and removes any undesirable replicas. - desc, err := r.finalizeChangeReplicas(ctx, desc, priority, reason, details, chgs) + desc, err = r.atomicReplicationChange(ctx, desc, priority, reason, details, chgs) if err != nil { + // If the error occurred while transitioning out of an atomic replication change, + // try again here with a fresh descriptor; this is a noop otherwise. + if _, err := r.maybeLeaveAtomicChangeReplicas(ctx, r.Desc()); err != nil { + return nil, err + } if fn := r.store.cfg.TestingKnobs.ReplicaAddSkipLearnerRollback; fn != nil && fn() { return nil, err } @@ -986,7 +981,29 @@ func (r *Replica) changeReplicasImpl( } return nil, err } - return desc, nil + return desc, err +} + +// maybeLeaveAtomicChangeReplicas transitions out of the joint configuration if +// the descriptor indicates one. This involves running a distributed transaction +// updating said descriptor, the result of which will be returned. +func (r *Replica) maybeLeaveAtomicChangeReplicas( + ctx context.Context, desc *roachpb.RangeDescriptor, +) (*roachpb.RangeDescriptor, error) { + // We want execChangeReplicasTxn to be able to make sure it's only tasked + // with leaving a joint state when it's in one, so make sure we don't call + // it if we're not. + if !desc.Replicas().InAtomicReplicationChange() { + return desc, nil + } + + // NB: reason and detail won't be used because no range log event will be + // emitted. + // + // TODO(tbg): reconsider this. + return execChangeReplicasTxn( + ctx, r.store, desc, storagepb.ReasonUnknown /* unused */, "", nil, /* iChgs */ + ) } func validateReplicationChanges( @@ -1070,13 +1087,18 @@ func addLearnerReplicas( return desc, nil } -// finalizeChangeReplicas carries out the atomic membership change that finalizes -// the addition and/or removal of replicas. Any voters in the process of being -// added (as reflected by the replication changes) must have been added as -// learners already and will be caught up before being promoted to voters. Any -// replica removals (from the replication changes) will be processed. All of this -// occurs in one atomic raft membership change. -func (r *Replica) finalizeChangeReplicas( +// atomicReplicationChange carries out the atomic membership change that +// finalizes the addition and/or removal of replicas. Any voters in the process +// of being added (as reflected by the replication changes) must have been added +// as learners already and will be caught up before being promoted to voters. +// Any replica removals (from the replication changes) will be processed. All of +// this occurs in one atomic raft membership change which is carried out across +// two distributed transactions. On error, it is possible that the range is in +// the intermediate ("joint") configuration in which a quorum of both the old +// and new sets of voters is required. If a range is encountered in this state, +// r.maybeLeaveAtomicReplicationChange can fix this, but it is the caller's +// job to do this when necessary. +func (r *Replica) atomicReplicationChange( ctx context.Context, desc *roachpb.RangeDescriptor, priority SnapshotRequest_Priority, @@ -1137,7 +1159,13 @@ func (r *Replica) finalizeChangeReplicas( iChgs = append(iChgs, internalReplicationChange{target: target, typ: internalChangeTypeRemove}) } - return execChangeReplicasTxn(ctx, r.store, desc, reason, details, iChgs) + var err error + desc, err = execChangeReplicasTxn(ctx, r.store, desc, reason, details, iChgs) + if err != nil { + return nil, err + } + // Leave the joint config if we entered one. + return r.maybeLeaveAtomicChangeReplicas(ctx, desc) } // tryRollbackLearnerReplica attempts to remove a learner specified by the @@ -1281,32 +1309,66 @@ func execChangeReplicasTxn( } var added, removed []roachpb.ReplicaDescriptor - - for _, chg := range chgs { - switch chg.typ { - case internalChangeTypeAddVoterViaPreemptiveSnap: - // Legacy code. - added = append(added, - updatedDesc.AddReplica(chg.target.NodeID, chg.target.StoreID, roachpb.ReplicaType_VoterFull)) - case internalChangeTypeAddLearner: - added = append(added, - updatedDesc.AddReplica(chg.target.NodeID, chg.target.StoreID, roachpb.ReplicaType_Learner)) - case internalChangeTypePromoteLearner: - // TODO(tbg): set to VoterIncoming when going through joint config. - rDesc, ok := updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, roachpb.ReplicaType_VoterFull) - if !ok { - return nil, errors.Errorf("cannot promote target %v which is missing as Learner", chg.target) + if len(chgs) > 0 { + if desc.Replicas().InAtomicReplicationChange() { + return nil, errors.Errorf("must transition out of joint config first: %s", desc) + } + + useJoint := len(chgs) > 1 + for _, chg := range chgs { + switch chg.typ { + case internalChangeTypeAddVoterViaPreemptiveSnap: + // Legacy code. + added = append(added, + updatedDesc.AddReplica(chg.target.NodeID, chg.target.StoreID, roachpb.ReplicaType_VoterFull)) + case internalChangeTypeAddLearner: + added = append(added, + updatedDesc.AddReplica(chg.target.NodeID, chg.target.StoreID, roachpb.ReplicaType_Learner)) + case internalChangeTypePromoteLearner: + typ := roachpb.ReplicaType_VoterFull + if useJoint { + typ = roachpb.ReplicaType_VoterIncoming + } + rDesc, ok := updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, typ) + if !ok { + return nil, errors.Errorf("cannot promote target %v which is missing as Learner", chg.target) + } + added = append(added, rDesc) + case internalChangeTypeRemove: + var rDesc roachpb.ReplicaDescriptor + var ok bool + if !useJoint { + rDesc, ok = updatedDesc.RemoveReplica(chg.target.NodeID, chg.target.StoreID) + } else { + rDesc, ok = updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, roachpb.ReplicaType_VoterOutgoing) + } + if !ok { + return nil, errors.Errorf("cannot remove nonexistent target %v", chg.target) + } + removed = append(removed, rDesc) + default: + return nil, errors.Errorf("unsupported internal change type %d", chg.typ) } - added = append(added, rDesc) - case internalChangeTypeRemove: - // TODO(tbg): set to VoterOutgoing when going through joint config instead. - rDesc, ok := updatedDesc.RemoveReplica(chg.target.NodeID, chg.target.StoreID) - if !ok { - return nil, errors.Errorf("cannot remove nonexistent target %v", chg.target) + } + } else { + // Want to leave a joint config. Note that we're not populating 'added' or 'removed', this + // is intentional; leaving the joint config corresponds to an "empty" raft conf change. + var isJoint bool + // NB: the DeepCopy is needed or we'll skip over an entry every time we + // call RemoveReplica below. + for _, rDesc := range updatedDesc.Replicas().DeepCopy().All() { + switch rDesc.GetType() { + case roachpb.ReplicaType_VoterIncoming: + updatedDesc.SetReplicaType(rDesc.NodeID, rDesc.StoreID, roachpb.ReplicaType_VoterFull) + isJoint = true + case roachpb.ReplicaType_VoterOutgoing: + updatedDesc.RemoveReplica(rDesc.NodeID, rDesc.StoreID) + isJoint = true + default: } - removed = append(removed, rDesc) - default: - return nil, errors.Errorf("unsupported internal change type %d", chg.typ) + } + if !isJoint { + return nil, errors.Errorf("cannot leave a joint config; desc not joint: %s", &updatedDesc) } } @@ -1338,6 +1400,10 @@ func execChangeReplicasTxn( } } + if _, err := crt.ConfChange(nil); err != nil { + return nil, errors.Wrapf(err, "programming error: malformed trigger created from desc %s to %s", desc, &updatedDesc) + } + descKey := keys.RangeDescriptorKey(desc.StartKey) if err := store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error { log.Event(ctx, "attempting txn") diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 4d6735621372..a1e2a40af580 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -347,7 +347,8 @@ func (rq *replicateQueue) processOneChange( case AllocatorConsiderRebalance: return rq.considerRebalance(ctx, repl, voterReplicas, canTransferLease, dryRun) case AllocatorFinalizeAtomicReplicationChange: - FatalAtomicReplicationChangeUnimplemented(ctx) + _, err := repl.maybeLeaveAtomicChangeReplicas(ctx, repl.Desc()) + return false, err } return true, nil }