Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: fix nil pointer panic during replica changes #40961

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2237,6 +2237,160 @@ func TestAdminRelocateRangeSafety(t *testing.T) {
}
}

// TestChangeReplicasLeaveAtomicRacesWithMerge exercises a hazardous case which
// arises during concurrent AdminChangeReplicas requests. The code reads the
// descriptor from range id local, checks to make sure that the read
// descriptor matches the expectation and then uses the bytes of the read read
// bytes in a CPut with the update. The code contains an optimization to
// transition out of joint consensus even if the read descriptor does not match
// the expectation. That optimization did not verify anything about the read
// descriptor, not even if it was nil.
//
// This test wants to exercise this scenario. We need to get the replica in
// a state where it has an outgoing voter and then we need to have two
// different requests trying to make changes and only the merge succeeds. The
// race is that the second command will notice the voter outgoing and will
// attempt to fix it. In order to do that it reads the range descriptor to
// ensure that it has not changed (and to get the raw bytes of the range
// descriptor for use in a CPut as the current API only uses the in-memory
// value and we need the encoding is not necessarily stable.
//
// The test also contains a variant whereby the range is re-split at the
// same key producing a range descriptor with a different range ID.
//
// See https://github.com/cockroachdb/cockroach/issues/40877.
func TestChangeReplicasLeaveAtomicRacesWithMerge(t *testing.T) {
defer leaktest.AfterTest(t)()
testutils.RunTrueAndFalse(t, "resplit", func(t *testing.T, resplit bool) {
const numNodes = 4
var stopAfterJointConfig atomic.Value
stopAfterJointConfig.Store(false)
var rangeToBlockRangeDescriptorRead atomic.Value
rangeToBlockRangeDescriptorRead.Store(roachpb.RangeID(0))
blockRangeDescriptorReadChan := make(chan struct{}, 1)
blockOnChangeReplicasRead := storagebase.ReplicaRequestFilter(func(ba roachpb.BatchRequest) *roachpb.Error {
if req, isGet := ba.GetArg(roachpb.Get); !isGet ||
ba.RangeID != rangeToBlockRangeDescriptorRead.Load().(roachpb.RangeID) ||
!ba.IsSingleRequest() ||
!bytes.HasSuffix([]byte(req.(*roachpb.GetRequest).Key),
[]byte(keys.LocalRangeDescriptorSuffix)) {
return nil
}
select {
case <-blockRangeDescriptorReadChan:
<-blockRangeDescriptorReadChan
default:
}
return nil
})
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
TestingRequestFilter: blockOnChangeReplicasRead,
ReplicaAddStopAfterJointConfig: func() bool {
return stopAfterJointConfig.Load().(bool)
},
},
},
},
ReplicationMode: base.ReplicationManual,
})
// Make a magical context which will allow us to use atomic replica changes.
ctx := context.WithValue(context.Background(), "testing", "testing")
defer tc.Stopper().Stop(ctx)

// We want to first get into a joint consensus scenario.
// Then we want to issue a ChangeReplicasRequest on a goroutine that will
// block trying to read the RHS's range descriptor. Then we'll merge the RHS
// away.

// Set up a userspace range to mess around with.
lhs := tc.ScratchRange(t)
_, err := tc.AddReplicas(lhs, tc.Targets(1, 2)...)
require.NoError(t, err)

// Split it and then we're going to try to up-replicate.
// We're going to have one goroutine trying to ADD the 4th node.
// and another goroutine trying to move out of a joint config on both
// sides and then merge the range. We ensure that the first goroutine
// blocks and the second one succeeds. This will test that the first
// goroutine detects reading the nil descriptor.
rhs := append(lhs[:len(lhs):len(lhs)], 'a')
lhsDesc, rhsDesc := &roachpb.RangeDescriptor{}, &roachpb.RangeDescriptor{}
*lhsDesc, *rhsDesc, err = tc.SplitRange(rhs)
require.NoError(t, err)

err = tc.WaitForSplitAndInitialization(rhs)
require.NoError(t, err)

// Manually construct the batch because the (*DB).AdminChangeReplicas does
// not yet support atomic replication changes.
db := tc.Servers[0].DB()
swapReplicas := func(key roachpb.Key, desc roachpb.RangeDescriptor, add, remove int) (*roachpb.RangeDescriptor, error) {
return db.AdminChangeReplicas(ctx, key, desc, []roachpb.ReplicationChange{
{ChangeType: roachpb.ADD_REPLICA, Target: tc.Target(add)},
{ChangeType: roachpb.REMOVE_REPLICA, Target: tc.Target(remove)},
})
}

// Move the RHS and LHS to 3 from 2.
_, err = swapReplicas(lhs, *lhsDesc, 3, 2)
require.NoError(t, err)
stopAfterJointConfig.Store(true) // keep the RHS in a joint config.
rhsDesc, err = swapReplicas(rhs, *rhsDesc, 3, 2)
require.NoError(t, err)
stopAfterJointConfig.Store(false)

// Run a goroutine which sends an AdminChangeReplicasRequest which will try to
// move the range out of joint config but will end up blocking on
// blockRangeDescriptorReadChan until we close it later.
rangeToBlockRangeDescriptorRead.Store(rhsDesc.RangeID)
blockRangeDescriptorReadChan <- struct{}{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := db.AdminChangeReplicas(
ctx, rhs, *rhsDesc, roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, tc.Target(2)),
)
// We'll ultimately fail because we're going to race with the work below.
msg := "descriptor changed"
if resplit {
// We don't convert ConditionFailedError to the "descriptor changed"
// error if the range ID changed.
msg = "unexpected value"
}
require.True(t, testutils.IsError(err, msg), err)
}()
// Wait until our goroutine is blocked.
testutils.SucceedsSoon(t, func() error {
if len(blockRangeDescriptorReadChan) != 0 {
return errors.New("not blocked yet")
}
return nil
})
// Add and remove a replica to exit the joint config.
_, err = tc.AddReplicas(rhs, tc.Target(2))
require.NoError(t, err)
_, err = tc.RemoveReplicas(rhs, tc.Target(2))
require.NoError(t, err)
// Merge the RHS away.
err = db.AdminMerge(ctx, lhs)
require.NoError(t, err)
if resplit {
require.NoError(t, db.AdminSplit(ctx, lhs, rhs, hlc.Timestamp{WallTime: math.MaxInt64}))
err = tc.WaitForSplitAndInitialization(rhs)
require.NoError(t, err)
}

// Unblock the original add on the separate goroutine to ensure that it
// properly handles reading a nil range descriptor.
close(blockRangeDescriptorReadChan)
wg.Wait()
})
}

// getRangeInfo retreives range info by performing a get against the provided
// key and setting the ReturnRangeInfo flag to true.
func getRangeInfo(
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,9 @@ func execChangeReplicasTxn(
descKey := keys.RangeDescriptorKey(referenceDesc.StartKey)

check := func(kvDesc *roachpb.RangeDescriptor) bool {
if chgs.leaveJoint() {
// NB: We might fail to find the range if the range has been merged away
// in which case we definitely want to fail the check below.
if kvDesc != nil && kvDesc.RangeID == referenceDesc.RangeID && chgs.leaveJoint() {
// If there are no changes, we're trying to leave a joint config,
// so that's all we care about. But since leaving a joint config
// is done opportunistically whenever one is encountered, this is
Expand Down
10 changes: 10 additions & 0 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,16 @@ func (tc *TestCluster) Target(serverIdx int) roachpb.ReplicationTarget {
}
}

// Targets creates a slice of ReplicationTarget where each entry corresponds to
// a call to tc.Target() for serverIdx in serverIdxs.
func (tc *TestCluster) Targets(serverIdxs ...int) []roachpb.ReplicationTarget {
ret := make([]roachpb.ReplicationTarget, 0, len(serverIdxs))
for _, serverIdx := range serverIdxs {
ret = append(ret, tc.Target(serverIdx))
}
return ret
}

func (tc *TestCluster) changeReplicas(
changeType roachpb.ReplicaChangeType, startKey roachpb.RKey, targets ...roachpb.ReplicationTarget,
) (roachpb.RangeDescriptor, error) {
Expand Down