From 733ad65d6c7c716adf4503e1ce6112f6d0469d5b Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 30 Apr 2021 13:14:52 +0000 Subject: [PATCH] kvserver: synchronize replica removal with read-write requests Replica removal did not synchronize with in-flight read-write requests, which could cause them to be evaluated on a removed (empty) replica. The request would not be able to persist any writes, since it's unable to submit Raft proposals. However, it can affect conditional writes, for example causing a `ConditionalPutRequest` to error because it finds a missing value instead of the expected one. This patch fixes the problem by taking out `Replica.readOnlyCmdMu` during pre-Raft evaluation, to synchronize with replica removal. This can cause such requests to return `AmbiguousResultError` as the write is evaluated. Release note (bug fix): Fixed a race condition where read-write requests during replica removal (e.g. during range merges or rebalancing) could be evaluated on the removed replica. These will not have been able to write any data to persistent storage, but could behave unexpectedly, e.g. returning errors that they should not have returned. --- pkg/kv/kvserver/client_relocate_range_test.go | 176 +++++++++++------- pkg/kv/kvserver/client_test.go | 18 ++ pkg/kv/kvserver/replica_corruption.go | 22 +-- pkg/kv/kvserver/replica_proposal.go | 13 +- pkg/kv/kvserver/replica_raft.go | 6 + pkg/kv/kvserver/replica_test.go | 3 + pkg/kv/kvserver/replica_write.go | 25 ++- 7 files changed, 176 insertions(+), 87 deletions(-) diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index 77996e2f22a9..536378006abf 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -210,29 +210,82 @@ func TestAdminRelocateRange(t *testing.T) { // Regression test for https://github.com/cockroachdb/cockroach/issues/64325 // which makes sure an in-flight read operation during replica removal won't // return empty results. -func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) { +func TestReplicaRemovalDuringGet(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx) + defer tc.Stopper().Stop(ctx) + + // Perform write. + pArgs := putArgs(key, []byte("foo")) + _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) + require.Nil(t, pErr) + + // Perform delayed read during replica removal. + resp, pErr := evalDuringReplicaRemoval(ctx, getArgs(key)) + require.Nil(t, pErr) + require.NotNil(t, resp) + require.NotNil(t, resp.(*roachpb.GetResponse).Value) + val, err := resp.(*roachpb.GetResponse).Value.GetBytes() + require.NoError(t, err) + require.Equal(t, []byte("foo"), val) +} + +// Regression test for https://github.com/cockroachdb/cockroach/issues/46329 +// which makes sure an in-flight conditional put operation during replica +// removal won't spuriously error due to an unexpectedly missing value. +func TestReplicaRemovalDuringCPut(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx) + defer tc.Stopper().Stop(ctx) + + // Perform write. + pArgs := putArgs(key, []byte("foo")) + _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) + require.Nil(t, pErr) + + // Perform delayed conditional put during replica removal. This will cause + // an ambiguous result error, as outstanding proposals in the leaseholder + // replica's proposal queue will be aborted when the replica is removed. + // If the replica was removed from under us, it would instead return a + // ConditionFailedError since it finds nil in place of "foo". + req := cPutArgs(key, []byte("bar"), []byte("foo")) + _, pErr = evalDuringReplicaRemoval(ctx, req) + require.NotNil(t, pErr) + require.IsType(t, &roachpb.AmbiguousResultError{}, pErr.GetDetail()) +} + +// setupReplicaRemovalTest sets up a test cluster that can be used to test +// request evaluation during replica removal. It returns a running test +// cluster, the first key of a blank scratch range on the replica to be +// removed, and a function that can execute a delayed request just as the +// replica is being removed. +func setupReplicaRemovalTest( + t *testing.T, ctx context.Context, +) ( + *testcluster.TestCluster, + roachpb.Key, + func(context.Context, roachpb.Request) (roachpb.Response, *roachpb.Error), +) { + t.Helper() + type magicKey struct{} - // delayReadC is used to synchronize the in-flight read request with the main - // test goroutine. It is read from twice: - // - // 1. The first read allows the test to block until the request eval filter - // is called, i.e. when the read request is ready. - // 2. The second read allows the test to close the channel to unblock - // the eval filter, causing the read request to be evaluated. - delayReadC := make(chan struct{}) + requestReadyC := make(chan struct{}) // signals main thread that request is teed up + requestEvalC := make(chan struct{}) // signals cluster to evaluate the request evalFilter := func(args storagebase.FilterArgs) *roachpb.Error { if args.Ctx.Value(magicKey{}) != nil { - <-delayReadC - <-delayReadC + requestReadyC <- struct{}{} + <-requestEvalC } return nil } - ctx := context.Background() manual := hlc.NewHybridManualClock() args := base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, @@ -250,64 +303,59 @@ func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) { }, } tc := testcluster.StartTestCluster(t, 2, args) - defer tc.Stopper().Stop(ctx) // Create range and upreplicate. key := tc.ScratchRange(t) tc.AddReplicasOrFatal(t, key, tc.Target(1)) - // Perform write. - pArgs := putArgs(key, []byte("foo")) - _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) - require.Nil(t, pErr) + // Return a function that can be used to evaluate a delayed request + // during replica removal. + evalDuringReplicaRemoval := func(ctx context.Context, req roachpb.Request) (roachpb.Response, *roachpb.Error) { + // Submit request and wait for it to block. + type result struct { + resp roachpb.Response + err *roachpb.Error + } + resultC := make(chan result) + err := tc.Stopper().RunAsyncTask(ctx, "request", func(ctx context.Context) { + reqCtx := context.WithValue(ctx, magicKey{}, struct{}{}) + resp, pErr := kv.SendWrapped(reqCtx, tc.Servers[0].DistSender(), req) + resultC <- result{resp, pErr} + }) + require.NoError(t, err) + <-requestReadyC - // Perform read on write and wait for read to block. - type reply struct { - resp roachpb.Response - err *roachpb.Error - } - readResC := make(chan reply) - err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) { - readCtx := context.WithValue(ctx, magicKey{}, struct{}{}) - gArgs := getArgs(key) - resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs) - readResC <- reply{resp, pErr} - }) - require.NoError(t, err) - delayReadC <- struct{}{} + // Transfer leaseholder to other store. + rangeDesc, err := tc.LookupRange(key) + require.NoError(t, err) + store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID()) + require.NoError(t, err) + repl, err := store.GetReplica(rangeDesc.RangeID) + require.NoError(t, err) + err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) + require.NoError(t, err) - // Transfer leaseholder to other store. - rangeDesc, err := tc.LookupRange(key) - require.NoError(t, err) - store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID()) - require.NoError(t, err) - repl, err := store.GetReplica(rangeDesc.RangeID) - require.NoError(t, err) - err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) - require.NoError(t, err) + // Remove first store from raft group. + tc.RemoveReplicasOrFatal(t, key, tc.Target(0)) - // Remove first store from raft group. - tc.RemoveReplicasOrFatal(t, key, tc.Target(0)) - - // This is a bit iffy. We want to make sure that, in the buggy case, we - // will typically fail (i.e. the read returns empty because the replica was - // removed). However, in the non-buggy case the in-flight read request will - // be holding readOnlyCmdMu until evaluated, blocking the replica removal, - // so waiting for replica removal would deadlock. We therefore take the - // easy way out by starting an async replica GC and sleeping for a bit. - err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) { - assert.NoError(t, store.ManualReplicaGC(repl)) - }) - require.NoError(t, err) - time.Sleep(500 * time.Millisecond) - - // Allow read to resume. Should return "foo". - close(delayReadC) - r := <-readResC - require.Nil(t, r.err) - require.NotNil(t, r.resp) - require.NotNil(t, r.resp.(*roachpb.GetResponse).Value) - val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes() - require.NoError(t, err) - require.Equal(t, []byte("foo"), val) + // Wait for replica removal. This is a bit iffy. We want to make sure + // that, in the buggy case, we will typically fail (i.e. the request + // returns incorrect results because the replica was removed). However, + // in the non-buggy case the in-flight request will be holding + // readOnlyCmdMu until evaluated, blocking the replica removal, so + // waiting for replica removal would deadlock. We therefore take the + // easy way out by starting an async replica GC and sleeping for a bit. + err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) { + assert.NoError(t, store.ManualReplicaGC(repl)) + }) + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) + + // Allow request to resume, and return the result. + close(requestEvalC) + r := <-resultC + return r.resp, r.err + } + + return tc, key, evalDuringReplicaRemoval } diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 22ec45356ca7..1bceca0a1a0d 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -1481,6 +1481,24 @@ func putArgs(key roachpb.Key, value []byte) *roachpb.PutRequest { } } +// cPutArgs returns a ConditionPutRequest to the default replica +// for the specified key and value, with the given expected value. +func cPutArgs(key roachpb.Key, value, expValue []byte) *roachpb.ConditionalPutRequest { + var ev *roachpb.Value + if expValue != nil { + value := roachpb.MakeValueFromBytes(expValue) + ev = &value + } + + return &roachpb.ConditionalPutRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: key, + }, + Value: roachpb.MakeValueFromBytes(value), + ExpValue: ev, + } +} + // incrementArgs returns an IncrementRequest addressed to the default replica // for the specified key. func incrementArgs(key roachpb.Key, inc int64) *roachpb.IncrementRequest { diff --git a/pkg/kv/kvserver/replica_corruption.go b/pkg/kv/kvserver/replica_corruption.go index 7cdf7a048b45..1b30b01e65e6 100644 --- a/pkg/kv/kvserver/replica_corruption.go +++ b/pkg/kv/kvserver/replica_corruption.go @@ -21,12 +21,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) -// maybeSetCorrupt is a stand-in for proper handling of failing replicas. Such a -// failure is indicated by a call to maybeSetCorrupt with a ReplicaCorruptionError. -// Currently any error is passed through, but prospectively it should stop the -// range from participating in progress, trigger a rebalance operation and -// decide on an error-by-error basis whether the corruption is limited to the -// range, store, node or cluster with corresponding actions taken. +// setCorruptRaftMuLocked is a stand-in for proper handling of failing replicas. +// Such a failure is indicated by a call to setCorruptRaftMuLocked with a +// ReplicaCorruptionError. Currently any error is passed through, but +// prospectively it should stop the range from participating in progress, +// trigger a rebalance operation and decide on an error-by-error basis whether +// the corruption is limited to the range, store, node or cluster with +// corresponding actions taken. // // Despite the fatal log call below this message we still return for the // sake of testing. @@ -37,15 +38,6 @@ import ( // best bet is to not have any of those. // @bdarnell remarks: Corruption errors should be rare so we may want the store // to just recompute its stats in the background when one occurs. -func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roachpb.Error { - if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { - r.raftMu.Lock() - defer r.raftMu.Unlock() - return r.setCorruptRaftMuLocked(ctx, cErr) - } - return pErr -} - func (r *Replica) setCorruptRaftMuLocked( ctx context.Context, cErr *roachpb.ReplicaCorruptionError, ) *roachpb.Error { diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index b4f24469f653..ec9b907d1030 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -721,8 +721,13 @@ func (r *Replica) evaluateProposal( } // Evaluate the commands. If this returns without an error, the batch should - // be committed. Note that we don't hold any locks at this point. This is - // important since evaluating a proposal is expensive. + // be committed. Note that we don't hold any locks at this point, except a + // shared RLock on readOnlyCmdMu. This is important since evaluating a + // proposal is expensive. + // + // Note that, during evaluation, ba's read and write timestamps might get + // bumped (see evaluateWriteBatchWithServersideRefreshes). + // // TODO(tschottdorf): absorb all returned values in `res` below this point // in the call stack as well. batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, latchSpans) @@ -735,7 +740,9 @@ func (r *Replica) evaluateProposal( } if pErr != nil { - pErr = r.maybeSetCorrupt(ctx, pErr) + if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + return &res, false /* needConsensus */, pErr + } txn := pErr.GetTxn() if txn != nil && ba.Txn == nil { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index b18c98553b88..860aa0cead23 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -57,6 +57,10 @@ func makeIDKey() storagebase.CmdIDKey { // caller should relinquish all ownership of it. If it does return an error, the // caller retains full ownership over the guard. // +// Nothing here or below can take out a raftMu lock, since executeWriteBatch() +// is already holding readOnlyCmdMu when calling this. Locking raftMu after it +// would violate the locking order specified for Store.mu. +// // Return values: // - a channel which receives a response or error upon application // - a closure used to attempt to abandon the command. When called, it unbinds @@ -77,6 +81,8 @@ func (r *Replica) evalAndPropose( // proagate the error. Don't assume ownership of the concurrency guard. if isConcurrencyRetryError(pErr) { return nil, nil, 0, pErr + } else if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + return nil, nil, 0, pErr } // Attach the endCmds to the proposal and assume responsibility for diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index d177de7f15f0..4e845b72129f 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -3168,6 +3168,9 @@ func TestReplicaNoTimestampIncrementWithinTxn(t *testing.T) { // TestReplicaAbortSpanReadError verifies that an error is returned // to the client in the event that a AbortSpan entry is found but is // not decodable. +// +// This doubles as a test that replica corruption errors are propagated +// and handled correctly. func TestReplicaAbortSpanReadError(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index d64e38ae5f59..cd2bd1d195c1 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -67,11 +67,12 @@ func (r *Replica) executeWriteBatch( ) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) { startTime := timeutil.Now() - // TODO(nvanbenschoten): unlike on the read-path (executeReadOnlyBatch), we - // don't synchronize with r.readOnlyCmdMu here. Is that ok? What if the - // replica is destroyed concurrently with a write? We won't be able to - // successfully propose as the lease will presumably have changed, but what - // if we hit an error during evaluation (e.g. a ConditionFailedError)? + // Even though we're not a read-only operation by definition, we have to + // take out a read lock on readOnlyCmdMu while performing any reads during + // pre-Raft evaluation (e.g. conditional puts), otherwise we can race with + // replica removal and get evaluated on an empty replica. We must release + // this lock before Raft execution, to avoid deadlocks. + r.readOnlyCmdMu.RLock() // Verify that the batch can be executed. // NB: we only need to check that the request is in the Range's key bounds @@ -79,6 +80,7 @@ func (r *Replica) executeWriteBatch( // will synchronize all requests (notably EndTxn with SplitTrigger) that may // cause this condition to change. if err := r.checkExecutionCanProceed(ba, g, &st); err != nil { + r.readOnlyCmdMu.RUnlock() return nil, g, roachpb.NewError(err) } @@ -110,6 +112,7 @@ func (r *Replica) executeWriteBatch( // Checking the context just before proposing can help avoid ambiguous errors. if err := ctx.Err(); err != nil { + r.readOnlyCmdMu.RUnlock() log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) return nil, g, roachpb.NewError(errors.Wrap(err, "aborted before proposing")) } @@ -121,6 +124,7 @@ func (r *Replica) executeWriteBatch( // other transactions to be released while sequencing in the concurrency // manager. if curLease, _ := r.GetLease(); curLease.Sequence > st.Lease.Sequence { + r.readOnlyCmdMu.RUnlock() curLeaseCpy := curLease // avoid letting curLease escape err := newNotLeaseHolderError(&curLeaseCpy, r.store.StoreID(), r.Desc()) log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) @@ -132,6 +136,13 @@ func (r *Replica) executeWriteBatch( // evalAndPropose. ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, &st.Lease) if pErr != nil { + r.readOnlyCmdMu.RUnlock() + if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + r.raftMu.Lock() + defer r.raftMu.Unlock() + // This exits with a fatal error, but returns in tests. + return nil, g, r.setCorruptRaftMuLocked(ctx, cErr) + } if maxLeaseIndex != 0 { log.Fatalf( ctx, "unexpected max lease index %d assigned to failed proposal: %s, error %s", @@ -150,6 +161,10 @@ func (r *Replica) executeWriteBatch( untrack(ctx, ctpb.Epoch(st.Lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex)) } + // We are done with pre-Raft evaluation at this point, and have to release the + // read-only command lock to avoid deadlocks during Raft evaluation. + r.readOnlyCmdMu.RUnlock() + // If the command was accepted by raft, wait for the range to apply it. ctxDone := ctx.Done() shouldQuiesce := r.store.stopper.ShouldQuiesce()