Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] storage: allow atomic removal/addition of multiple LEARNERs #40268

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 57 additions & 26 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,11 +1336,11 @@ func writeTooOldRetryTimestamp(txn *Transaction, err *WriteTooOldError) hlc.Time

// Replicas returns all of the replicas present in the descriptor after this
// trigger applies.
func (crt ChangeReplicasTrigger) Replicas() []ReplicaDescriptor {
func (crt ChangeReplicasTrigger) Replicas() ReplicaDescriptors {
if crt.Desc != nil {
return crt.Desc.Replicas().All()
return crt.Desc.Replicas()
}
return crt.DeprecatedUpdatedReplicas
return MakeReplicaDescriptors(crt.DeprecatedUpdatedReplicas)
}

// ConfChange returns the configuration change described by the trigger.
Expand All @@ -1361,7 +1361,7 @@ func confChangeImpl(
crt interface {
Added() []ReplicaDescriptor
Removed() []ReplicaDescriptor
Replicas() []ReplicaDescriptor
Replicas() ReplicaDescriptors
alwaysV2() bool
},
encodedCtx []byte,
Expand All @@ -1371,7 +1371,7 @@ func confChangeImpl(
var sl []raftpb.ConfChangeSingle

checkExists := func(in ReplicaDescriptor) error {
for _, rDesc := range replicas {
for _, rDesc := range replicas.All() {
if rDesc.ReplicaID == in.ReplicaID {
if a, b := in.GetType(), rDesc.GetType(); a != b {
return errors.Errorf("have %s, but descriptor has %s", in, rDesc)
Expand All @@ -1382,7 +1382,7 @@ func confChangeImpl(
return errors.Errorf("%s missing from descriptors %v", in, replicas)
}
checkNotExists := func(in ReplicaDescriptor) error {
for _, rDesc := range replicas {
for _, rDesc := range replicas.All() {
if rDesc.ReplicaID == in.ReplicaID {
return errors.Errorf("%s must no longer be present in descriptor", in)
}
Expand Down Expand Up @@ -1445,35 +1445,66 @@ func confChangeImpl(
})
}

// Check whether we're entering a joint state. This is the case precisely when
// the resulting descriptors tells us that this is the case. Note that we've
// made sure above that all of the additions/removals are in tune with that
// descriptor already.
var enteringJoint bool
for _, rDesc := range replicas {
switch rDesc.GetType() {
case VOTER_INCOMING, VOTER_OUTGOING:
enteringJoint = true
default:
}
}
// Check whether we're entering a joint state. Note that there is a "gotcha"
// if we carry out multiple changes but none of them affect voters, that is,
// we're adding/removing multiple learners. This does not change the quorum
// requirements, so we *should* be able to get by without going through a
// joint state. In fact, we *have to*, unless we want to add additional
// state to the RangeDescriptor (like a Joint boolean or
// LEARNER_{INCOMING,OUTGOING} replica types).
//
// Unfortunately, etcd/raft *always requires* a joint consensus transition
// when it is handed more than one change. This is because as far as it it
// knows, every addition of a learner could be a demotion of a voter (thus
// changing the quorum size); the configuration change API does not specify
// the config on which the changes are to be applied, and the app needs to
// predictably know whether the change will be joint or not.
//
// However, in CRDB we know the base configuration on which the delta
// applies - it is the config represented by the previous descriptor, and as
// outlined above, in the "learners only" case it never reflects a joint
// configuration in this case. Luckily, we can work around this safely by
// giving etcd/raft the joint transition it needs, but transitioning out of
// it immediately when entering it (in the same entry). More precisely,
// after we call rawNode.ApplyConfChange, if we just moved into a joint
// configuration but our descriptor is not joint, we call ApplyConfChange
// again to transition out of the joint state again.
//
// In summary:
// - if crt.Desc.InAtomicReplicationChange(), then we've changed voters and
// will enter the joint configuration, and will leave it explicitly via
// another replication change txn (see wantLeaveJoint below).
// - if !crt.Desc.InAtomicReplicationChange() but there is more than one
// change, then we're only changing learners and don't need joint consensus,
// but etcd/raft wants it, and we go through the joint state but without
// spending any time in it. This amounts to letting etcd/raft trust us
// that we know what the underlying configuration is.
// - if !crt.Desc.InAtomicReplicationChange() and there is only one change,
// then we're adding/removing a single learner or voter without going
// through a joint transition.
raftEnteringJoint := len(added)+len(removed) > 1
// If there's an incoming/outgoing voter, then even if there is only
// one change apparently we are running it through a joint
// configuration. We're allowed to do that, though we typically
// don't do it outside of tests.
raftEnteringJoint = raftEnteringJoint || replicas.InAtomicReplicationChange()

// A trigger that neither adds nor removes anything is how we encode a request
// to leave a joint state. Note that this is only applicable in case 1 above,
// i.e. we're chaning something about the voters.
wantLeaveJoint := len(added)+len(removed) == 0
if !enteringJoint {
if len(added)+len(removed) > 1 {
return nil, errors.Errorf("change requires joint consensus")
}
} else if wantLeaveJoint {

if raftEnteringJoint && wantLeaveJoint {
return nil, errors.Errorf("descriptor enters joint state, but trigger is requesting to leave one")
}

var cc raftpb.ConfChangeI

if enteringJoint || crt.alwaysV2() {
if raftEnteringJoint || crt.alwaysV2() {
// V2 membership changes, which allow atomic replication changes. We
// track the joint state in the range descriptor and thus we need to be
// in charge of when to leave the joint state.
transition := raftpb.ConfChangeTransitionJointExplicit
if !enteringJoint {
if !raftEnteringJoint {
// If we're using V2 just to avoid V1 (and not because we actually
// have a change that requires V2), then use an auto transition
// which skips the joint state. This is necessary: our descriptor
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/client_atomic_membership_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/kr/pretty"
Expand Down Expand Up @@ -131,3 +132,28 @@ func TestAtomicReplicationChange(t *testing.T) {
// Only a lone voter on s5 should be left over.
checkDesc(desc, 5)
}

// TODO(tbg): finish this test, add comments.
func TestAtomicReplicationChangeMultipleLearners(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
knobs, ltk := makeReplicationTestKnobs()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
db.Exec(t, `SET CLUSTER SETTING kv.atomic_replication_changes.enabled = true`)

k := tc.ScratchRange(t)
var desc roachpb.RangeDescriptor
ltk.withStopAfterLearnerAtomic(func() {
desc = tc.AddReplicasOrFatal(t, k, tc.Target(1), tc.Target(2))
})
require.Len(t, desc.Replicas().Learners(), 2, desc)

desc = tc.RemoveReplicasOrFatal(t, k, tc.Target(1), tc.Target(2))
require.Len(t, desc.Replicas().Learners(), 0, desc)
}
23 changes: 22 additions & 1 deletion pkg/storage/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,28 @@ func (sm *replicaStateMachine) maybeApplyConfChange(ctx context.Context, cmd *re
return nil
}
return sm.r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.ApplyConfChange(cmd.confChange.ConfChangeI)
cc := cmd.confChange.ConfChangeI.AsV2()
raftGroup.ApplyConfChange(cc)
cc.Context = nil // for info below
newCfg := raftGroup.Status().Config
if !cmd.replicatedResult().ChangeReplicas.Desc.Replicas().InAtomicReplicationChange() && len(newCfg.Voters[1]) > 0 {
// If the RawNode entered a joint configuration but the descriptor does not describe one,
// then we have mutated only learners in this change and were only formally forced into
// joint consensus by a restriction in etcd/raft. Transition out of the joint config
// immediately so that the Raft config reflects the descriptor accurately.
//
// Note that the joint state it's in is equivalent to the non-joint config represented by
// the descriptor (since (1 2 3)&&(1 2 3) is equivalent to (1 2 3) alone), though if we
// didn't auto-transition here we'd hit an error the next time we want to change the
// configuration (can't run simple changes when in joint state, can't enter joint
// state when already in joint state, etc).
//
// See the comment within this method for more details:
_ = (*roachpb.ChangeReplicasTrigger)(nil).ConfChange

log.VEventf(ctx, 1, "transitioning out of joint config %s", newCfg.String())
raftGroup.ApplyConfChange(raftpb.ConfChangeV2{})
}
return true, nil
})
default:
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,8 @@ func addLearnerReplicas(
// This isn't crazy, we just need to transition out of the joint config
// before returning from this method, and it's unclear that it's worth
// doing.
//
// WIP(tbg): address the above TODO now.
for _, target := range targets {
iChgs := []internalReplicationChange{{target: target, typ: internalChangeTypeAddLearner}}
var err error
Expand Down