diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index 8820544a8698..2742b9ec9fc7 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -373,29 +373,82 @@ func TestAdminRelocateRangeRandom(t *testing.T) { // Regression test for https://github.com/cockroachdb/cockroach/issues/64325 // which makes sure an in-flight read operation during replica removal won't // return empty results. -func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) { +func TestReplicaRemovalDuringGet(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx) + defer tc.Stopper().Stop(ctx) + + // Perform write. + pArgs := putArgs(key, []byte("foo")) + _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) + require.Nil(t, pErr) + + // Perform delayed read during replica removal. + resp, pErr := evalDuringReplicaRemoval(ctx, getArgs(key)) + require.Nil(t, pErr) + require.NotNil(t, resp) + require.NotNil(t, resp.(*roachpb.GetResponse).Value) + val, err := resp.(*roachpb.GetResponse).Value.GetBytes() + require.NoError(t, err) + require.Equal(t, []byte("foo"), val) +} + +// Regression test for https://github.com/cockroachdb/cockroach/issues/46329 +// which makes sure an in-flight conditional put operation during replica +// removal won't spuriously error due to an unexpectedly missing value. +func TestReplicaRemovalDuringCPut(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx) + defer tc.Stopper().Stop(ctx) + + // Perform write. + pArgs := putArgs(key, []byte("foo")) + _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) + require.Nil(t, pErr) + + // Perform delayed conditional put during replica removal. This will cause + // an ambiguous result error, as outstanding proposals in the leaseholder + // replica's proposal queue will be aborted when the replica is removed. + // If the replica was removed from under us, it would instead return a + // ConditionFailedError since it finds nil in place of "foo". + req := cPutArgs(key, []byte("bar"), []byte("foo")) + _, pErr = evalDuringReplicaRemoval(ctx, req) + require.NotNil(t, pErr) + require.IsType(t, &roachpb.AmbiguousResultError{}, pErr.GetDetail()) +} + +// setupReplicaRemovalTest sets up a test cluster that can be used to test +// request evaluation during replica removal. It returns a running test +// cluster, the first key of a blank scratch range on the replica to be +// removed, and a function that can execute a delayed request just as the +// replica is being removed. +func setupReplicaRemovalTest( + t *testing.T, ctx context.Context, +) ( + *testcluster.TestCluster, + roachpb.Key, + func(context.Context, roachpb.Request) (roachpb.Response, *roachpb.Error), +) { + t.Helper() + type magicKey struct{} - // delayReadC is used to synchronize the in-flight read request with the main - // test goroutine. It is read from twice: - // - // 1. The first read allows the test to block until the request eval filter - // is called, i.e. when the read request is ready. - // 2. The second read allows the test to close the channel to unblock - // the eval filter, causing the read request to be evaluated. - delayReadC := make(chan struct{}) + requestReadyC := make(chan struct{}) // signals main thread that request is teed up + requestEvalC := make(chan struct{}) // signals cluster to evaluate the request evalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { if args.Ctx.Value(magicKey{}) != nil { - <-delayReadC - <-delayReadC + requestReadyC <- struct{}{} + <-requestEvalC } return nil } - ctx := context.Background() manual := hlc.NewHybridManualClock() args := base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, @@ -415,62 +468,57 @@ func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) { }, } tc := testcluster.StartTestCluster(t, 2, args) - defer tc.Stopper().Stop(ctx) // Create range and upreplicate. key := tc.ScratchRange(t) tc.AddVotersOrFatal(t, key, tc.Target(1)) - // Perform write. - pArgs := putArgs(key, []byte("foo")) - _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) - require.Nil(t, pErr) + // Return a function that can be used to evaluate a delayed request + // during replica removal. + evalDuringReplicaRemoval := func(ctx context.Context, req roachpb.Request) (roachpb.Response, *roachpb.Error) { + // Submit request and wait for it to block. + type result struct { + resp roachpb.Response + err *roachpb.Error + } + resultC := make(chan result) + err := tc.Stopper().RunAsyncTask(ctx, "request", func(ctx context.Context) { + reqCtx := context.WithValue(ctx, magicKey{}, struct{}{}) + resp, pErr := kv.SendWrapped(reqCtx, tc.Servers[0].DistSender(), req) + resultC <- result{resp, pErr} + }) + require.NoError(t, err) + <-requestReadyC + + // Transfer leaseholder to other store. + rangeDesc, err := tc.LookupRange(key) + require.NoError(t, err) + repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID) + require.NoError(t, err) + err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) + require.NoError(t, err) + + // Remove first store from raft group. + tc.RemoveVotersOrFatal(t, key, tc.Target(0)) + + // Wait for replica removal. This is a bit iffy. We want to make sure + // that, in the buggy case, we will typically fail (i.e. the request + // returns incorrect results because the replica was removed). However, + // in the non-buggy case the in-flight request will be holding + // readOnlyCmdMu until evaluated, blocking the replica removal, so + // waiting for replica removal would deadlock. We therefore take the + // easy way out by starting an async replica GC and sleeping for a bit. + err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) { + assert.NoError(t, tc.GetFirstStoreFromServer(t, 0).ManualReplicaGC(repl)) + }) + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) - // Perform read on write and wait for read to block. - type reply struct { - resp roachpb.Response - err *roachpb.Error + // Allow request to resume, and return the result. + close(requestEvalC) + r := <-resultC + return r.resp, r.err } - readResC := make(chan reply) - err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) { - readCtx := context.WithValue(ctx, magicKey{}, struct{}{}) - gArgs := getArgs(key) - resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs) - readResC <- reply{resp, pErr} - }) - require.NoError(t, err) - delayReadC <- struct{}{} - - // Transfer leaseholder to other store. - rangeDesc, err := tc.LookupRange(key) - require.NoError(t, err) - repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID) - require.NoError(t, err) - err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) - require.NoError(t, err) - // Remove first store from raft group. - tc.RemoveVotersOrFatal(t, key, tc.Target(0)) - - // This is a bit iffy. We want to make sure that, in the buggy case, we - // will typically fail (i.e. the read returns empty because the replica was - // removed). However, in the non-buggy case the in-flight read request will - // be holding readOnlyCmdMu until evaluated, blocking the replica removal, - // so waiting for replica removal would deadlock. We therefore take the - // easy way out by starting an async replica GC and sleeping for a bit. - err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) { - assert.NoError(t, tc.GetFirstStoreFromServer(t, 0).ManualReplicaGC(repl)) - }) - require.NoError(t, err) - time.Sleep(500 * time.Millisecond) - - // Allow read to resume. Should return "foo". - close(delayReadC) - r := <-readResC - require.Nil(t, r.err) - require.NotNil(t, r.resp) - require.NotNil(t, r.resp.(*roachpb.GetResponse).Value) - val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes() - require.NoError(t, err) - require.Equal(t, []byte("foo"), val) + return tc, key, evalDuringReplicaRemoval } diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index c01e05915014..54405929f758 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -56,6 +56,23 @@ func putArgs(key roachpb.Key, value []byte) *roachpb.PutRequest { } } +// cPutArgs returns a ConditionPutRequest to the default replica +// for the specified key and value, with the given expected value. +func cPutArgs(key roachpb.Key, value, expValue []byte) *roachpb.ConditionalPutRequest { + var expBytes []byte + if expValue != nil { + expBytes = roachpb.MakeValueFromBytes(expValue).TagAndDataBytes() + } + + return &roachpb.ConditionalPutRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: key, + }, + Value: roachpb.MakeValueFromBytes(value), + ExpBytes: expBytes, + } +} + // incrementArgs returns an IncrementRequest addressed to the default replica // for the specified key. func incrementArgs(key roachpb.Key, inc int64) *roachpb.IncrementRequest { diff --git a/pkg/kv/kvserver/replica_corruption.go b/pkg/kv/kvserver/replica_corruption.go index 061aa2d714d7..c8379f6ae54a 100644 --- a/pkg/kv/kvserver/replica_corruption.go +++ b/pkg/kv/kvserver/replica_corruption.go @@ -20,12 +20,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) -// maybeSetCorrupt is a stand-in for proper handling of failing replicas. Such a -// failure is indicated by a call to maybeSetCorrupt with a ReplicaCorruptionError. -// Currently any error is passed through, but prospectively it should stop the -// range from participating in progress, trigger a rebalance operation and -// decide on an error-by-error basis whether the corruption is limited to the -// range, store, node or cluster with corresponding actions taken. +// setCorruptRaftMuLocked is a stand-in for proper handling of failing replicas. +// Such a failure is indicated by a call to setCorruptRaftMuLocked with a +// ReplicaCorruptionError. Currently any error is passed through, but +// prospectively it should stop the range from participating in progress, +// trigger a rebalance operation and decide on an error-by-error basis whether +// the corruption is limited to the range, store, node or cluster with +// corresponding actions taken. // // Despite the fatal log call below this message we still return for the // sake of testing. @@ -36,15 +37,6 @@ import ( // best bet is to not have any of those. // @bdarnell remarks: Corruption errors should be rare so we may want the store // to just recompute its stats in the background when one occurs. -func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roachpb.Error { - if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { - r.raftMu.Lock() - defer r.raftMu.Unlock() - return r.setCorruptRaftMuLocked(ctx, cErr) - } - return pErr -} - func (r *Replica) setCorruptRaftMuLocked( ctx context.Context, cErr *roachpb.ReplicaCorruptionError, ) *roachpb.Error { diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 780d017bf295..886b9f34d10f 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -771,8 +771,9 @@ func (r *Replica) evaluateProposal( } // Evaluate the commands. If this returns without an error, the batch should - // be committed. Note that we don't hold any locks at this point. This is - // important since evaluating a proposal is expensive. + // be committed. Note that we don't hold any locks at this point, except a + // shared RLock on raftMuReadOnlyMu. This is important since evaluating a + // proposal is expensive. // // Note that, during evaluation, ba's read and write timestamps might get // bumped (see evaluateWriteBatchWithServersideRefreshes). @@ -789,7 +790,9 @@ func (r *Replica) evaluateProposal( } if pErr != nil { - pErr = r.maybeSetCorrupt(ctx, pErr) + if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + return &res, false /* needConsensus */, pErr + } txn := pErr.GetTxn() if txn != nil && ba.Txn == nil { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index c6cbcda6f5da..8c5c871c296a 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -63,6 +63,10 @@ func makeIDKey() kvserverbase.CmdIDKey { // tok.Move() it into this method. It will be used to untrack the request once // it comes out of the proposal buffer. // +// Nothing here or below can take out a raftMu lock, since executeWriteBatch() +// is already holding readOnlyCmdMu when calling this. Locking raftMu after it +// would violate the locking order specified for Store.mu. +// // Return values: // - a channel which receives a response or error upon application // - a closure used to attempt to abandon the command. When called, it unbinds @@ -90,6 +94,8 @@ func (r *Replica) evalAndPropose( if isConcurrencyRetryError(pErr) { pErr = maybeAttachLease(pErr, &st.Lease) return nil, nil, 0, pErr + } else if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + return nil, nil, 0, pErr } // Attach the endCmds to the proposal and assume responsibility for diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index f85356fe52bd..529db86220a5 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -3326,6 +3326,9 @@ func TestReplicaNoTSCacheIncrementWithinTxn(t *testing.T) { // TestReplicaAbortSpanReadError verifies that an error is returned // to the client in the event that a AbortSpan entry is found but is // not decodable. +// +// This doubles as a test that replica corruption errors are propagated +// and handled correctly. func TestReplicaAbortSpanReadError(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 30b90f715baa..27bb683a185b 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -69,15 +69,17 @@ func (r *Replica) executeWriteBatch( ) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) { startTime := timeutil.Now() - // TODO(nvanbenschoten): unlike on the read-path (executeReadOnlyBatch), we - // don't synchronize with r.readOnlyCmdMu here. Is that ok? What if the - // replica is destroyed concurrently with a write? We won't be able to - // successfully propose as the lease will presumably have changed, but what - // if we hit an error during evaluation (e.g. a ConditionFailedError)? + // Even though we're not a read-only operation by definition, we have to + // take out a read lock on readOnlyCmdMu while performing any reads during + // pre-Raft evaluation (e.g. conditional puts), otherwise we can race with + // replica removal and get evaluated on an empty replica. We must release + // this lock before Raft execution, to avoid deadlocks. + r.readOnlyCmdMu.RLock() // Verify that the batch can be executed. st, err := r.checkExecutionCanProceed(ctx, ba, g) if err != nil { + r.readOnlyCmdMu.RUnlock() return nil, g, roachpb.NewError(err) } @@ -135,6 +137,7 @@ func (r *Replica) executeWriteBatch( // Checking the context just before proposing can help avoid ambiguous errors. if err := ctx.Err(); err != nil { log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) + r.readOnlyCmdMu.RUnlock() return nil, g, roachpb.NewError(errors.Wrapf(err, "aborted before proposing")) } @@ -143,6 +146,13 @@ func (r *Replica) executeWriteBatch( // evalAndPropose. ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, st, localUncertaintyLimit, tok.Move(ctx)) if pErr != nil { + r.readOnlyCmdMu.RUnlock() + if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + r.raftMu.Lock() + defer r.raftMu.Unlock() + // This exits with a fatal error, but returns in tests. + return nil, g, r.setCorruptRaftMuLocked(ctx, cErr) + } if maxLeaseIndex != 0 { log.Fatalf( ctx, "unexpected max lease index %d assigned to failed proposal: %s, error %s", @@ -162,6 +172,10 @@ func (r *Replica) executeWriteBatch( untrack(ctx, ctpb.Epoch(st.Lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex)) } + // We are done with pre-Raft evaluation at this point, and have to release the + // read-only command lock to avoid deadlocks during Raft evaluation. + r.readOnlyCmdMu.RUnlock() + // If the command was accepted by raft, wait for the range to apply it. ctxDone := ctx.Done() shouldQuiesce := r.store.stopper.ShouldQuiesce()