diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go index 48d3203557c6..8e37f6e47be3 100644 --- a/pkg/storage/client_replica_test.go +++ b/pkg/storage/client_replica_test.go @@ -2237,6 +2237,160 @@ func TestAdminRelocateRangeSafety(t *testing.T) { } } +// TestChangeReplicasLeaveAtomicRacesWithMerge exercises a hazardous case which +// arises during concurrent AdminChangeReplicas requests. The code reads the +// descriptor from range id local, checks to make sure that the read +// descriptor matches the expectation and then uses the bytes of the read read +// bytes in a CPut with the update. The code contains an optimization to +// transition out of joint consensus even if the read descriptor does not match +// the expectation. That optimization did not verify anything about the read +// descriptor, not even if it was nil. +// +// This test wants to exercise this scenario. We need to get the replica in +// a state where it has an outgoing voter and then we need to have two +// different requests trying to make changes and only the merge succeeds. The +// race is that the second command will notice the voter outgoing and will +// attempt to fix it. In order to do that it reads the range descriptor to +// ensure that it has not changed (and to get the raw bytes of the range +// descriptor for use in a CPut as the current API only uses the in-memory +// value and we need the encoding is not necessarily stable. +// +// The test also contains a variant whereby the range is re-split at the +// same key producing a range descriptor with a different range ID. +// +// See https://github.com/cockroachdb/cockroach/issues/40877. +func TestChangeReplicasLeaveAtomicRacesWithMerge(t *testing.T) { + defer leaktest.AfterTest(t)() + testutils.RunTrueAndFalse(t, "resplit", func(t *testing.T, resplit bool) { + const numNodes = 4 + var stopAfterJointConfig atomic.Value + stopAfterJointConfig.Store(false) + var rangeToBlockRangeDescriptorRead atomic.Value + rangeToBlockRangeDescriptorRead.Store(roachpb.RangeID(0)) + blockRangeDescriptorReadChan := make(chan struct{}, 1) + blockOnChangeReplicasRead := storagebase.ReplicaRequestFilter(func(ba roachpb.BatchRequest) *roachpb.Error { + if req, isGet := ba.GetArg(roachpb.Get); !isGet || + ba.RangeID != rangeToBlockRangeDescriptorRead.Load().(roachpb.RangeID) || + !ba.IsSingleRequest() || + !bytes.HasSuffix([]byte(req.(*roachpb.GetRequest).Key), + []byte(keys.LocalRangeDescriptorSuffix)) { + return nil + } + select { + case <-blockRangeDescriptorReadChan: + <-blockRangeDescriptorReadChan + default: + } + return nil + }) + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &storage.StoreTestingKnobs{ + TestingRequestFilter: blockOnChangeReplicasRead, + ReplicaAddStopAfterJointConfig: func() bool { + return stopAfterJointConfig.Load().(bool) + }, + }, + }, + }, + ReplicationMode: base.ReplicationManual, + }) + // Make a magical context which will allow us to use atomic replica changes. + ctx := context.WithValue(context.Background(), "testing", "testing") + defer tc.Stopper().Stop(ctx) + + // We want to first get into a joint consensus scenario. + // Then we want to issue a ChangeReplicasRequest on a goroutine that will + // block trying to read the RHS's range descriptor. Then we'll merge the RHS + // away. + + // Set up a userspace range to mess around with. + lhs := tc.ScratchRange(t) + _, err := tc.AddReplicas(lhs, tc.Targets(1, 2)...) + require.NoError(t, err) + + // Split it and then we're going to try to up-replicate. + // We're going to have one goroutine trying to ADD the 4th node. + // and another goroutine trying to move out of a joint config on both + // sides and then merge the range. We ensure that the first goroutine + // blocks and the second one succeeds. This will test that the first + // goroutine detects reading the nil descriptor. + rhs := append(lhs[:len(lhs):len(lhs)], 'a') + lhsDesc, rhsDesc := &roachpb.RangeDescriptor{}, &roachpb.RangeDescriptor{} + *lhsDesc, *rhsDesc, err = tc.SplitRange(rhs) + require.NoError(t, err) + + err = tc.WaitForSplitAndInitialization(rhs) + require.NoError(t, err) + + // Manually construct the batch because the (*DB).AdminChangeReplicas does + // not yet support atomic replication changes. + db := tc.Servers[0].DB() + swapReplicas := func(key roachpb.Key, desc roachpb.RangeDescriptor, add, remove int) (*roachpb.RangeDescriptor, error) { + return db.AdminChangeReplicas(ctx, key, desc, []roachpb.ReplicationChange{ + {ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(add)}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(remove)}, + }) + } + + // Move the RHS and LHS to 3 from 2. + _, err = swapReplicas(lhs, *lhsDesc, 3, 2) + require.NoError(t, err) + stopAfterJointConfig.Store(true) // keep the RHS in a joint config. + rhsDesc, err = swapReplicas(rhs, *rhsDesc, 3, 2) + require.NoError(t, err) + stopAfterJointConfig.Store(false) + + // Run a goroutine which sends an AdminChangeReplicasRequest which will try to + // move the range out of joint config but will end up blocking on + // blockRangeDescriptorReadChan until we close it later. + rangeToBlockRangeDescriptorRead.Store(rhsDesc.RangeID) + blockRangeDescriptorReadChan <- struct{}{} + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := db.AdminChangeReplicas( + ctx, rhs, *rhsDesc, roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, tc.Target(2)), + ) + // We'll ultimately fail because we're going to race with the work below. + msg := "descriptor changed" + if resplit { + // We don't convert ConditionFailedError to the "descriptor changed" + // error if the range ID changed. + msg = "unexpected value" + } + require.True(t, testutils.IsError(err, msg), err) + }() + // Wait until our goroutine is blocked. + testutils.SucceedsSoon(t, func() error { + if len(blockRangeDescriptorReadChan) != 0 { + return errors.New("not blocked yet") + } + return nil + }) + // Add and remove a replica to exit the joint config. + _, err = tc.AddReplicas(rhs, tc.Target(2)) + require.NoError(t, err) + _, err = tc.RemoveReplicas(rhs, tc.Target(2)) + require.NoError(t, err) + // Merge the RHS away. + err = db.AdminMerge(ctx, lhs) + require.NoError(t, err) + if resplit { + require.NoError(t, db.AdminSplit(ctx, lhs, rhs, hlc.Timestamp{WallTime: math.MaxInt64})) + err = tc.WaitForSplitAndInitialization(rhs) + require.NoError(t, err) + } + + // Unblock the original add on the separate goroutine to ensure that it + // properly handles reading a nil range descriptor. + close(blockRangeDescriptorReadChan) + wg.Wait() + }) +} + // getRangeInfo retreives range info by performing a get against the provided // key and setting the ReturnRangeInfo flag to true. func getRangeInfo( diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 7ce6fde9d462..1e29805874ef 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -1542,7 +1542,9 @@ func execChangeReplicasTxn( descKey := keys.RangeDescriptorKey(referenceDesc.StartKey) check := func(kvDesc *roachpb.RangeDescriptor) bool { - if chgs.leaveJoint() { + // NB: We might fail to find the range if the range has been merged away + // in which case we definitely want to fail the check below. + if kvDesc != nil && kvDesc.RangeID == referenceDesc.RangeID && chgs.leaveJoint() { // If there are no changes, we're trying to leave a joint config, // so that's all we care about. But since leaving a joint config // is done opportunistically whenever one is encountered, this is diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index e90c63c2f75d..cd436341a402 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -391,6 +391,16 @@ func (tc *TestCluster) Target(serverIdx int) roachpb.ReplicationTarget { } } +// Targets creates a slice of ReplicationTarget where each entry corresponds to +// a call to tc.Target() for serverIdx in serverIdxs. +func (tc *TestCluster) Targets(serverIdxs ...int) []roachpb.ReplicationTarget { + ret := make([]roachpb.ReplicationTarget, 0, len(serverIdxs)) + for _, serverIdx := range serverIdxs { + ret = append(ret, tc.Target(serverIdx)) + } + return ret +} + func (tc *TestCluster) changeReplicas( changeType roachpb.ReplicaChangeType, startKey roachpb.RKey, targets ...roachpb.ReplicationTarget, ) (roachpb.RangeDescriptor, error) {