From a20cc3d6e021b850fe8e0ebd96b05776f6704014 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 30 Apr 2021 13:14:52 +0000 Subject: [PATCH] kvserver: synchronize replica removal with read-write requests Replica removal did not synchronize with in-flight read-write requests, which could cause them to be evaluated on a removed (empty) replica. The request would not be able to persist any writes, since it's unable to submit Raft proposals. However, it can affect conditional writes, for example causing a `ConditionalPutRequest` to error because it finds a missing value instead of the expected one. This patch fixes the problem by taking out `Replica.readOnlyCmdMu` during pre-Raft evaluation, to synchronize with replica removal. This can cause such requests to return `AmbiguousResultError` as the write is evaluated. Release note (bug fix): Fixed a race condition where read-write requests during replica removal (e.g. during range merges or rebalancing) could be evaluated on the removed replica. These will not have been able to write any data to persistent storage, but could behave unexpectedly, e.g. returning errors that they should not have returned. --- pkg/kv/kvserver/client_relocate_range_test.go | 170 +++++++++++------- pkg/kv/kvserver/client_test.go | 19 ++ pkg/kv/kvserver/replica_corruption.go | 22 +-- pkg/kv/kvserver/replica_proposal.go | 9 +- pkg/kv/kvserver/replica_raft.go | 2 + pkg/kv/kvserver/replica_test.go | 3 + pkg/kv/kvserver/replica_write.go | 24 ++- 7 files changed, 164 insertions(+), 85 deletions(-) diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index 8820544a8698..0246c4c0968e 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -373,29 +373,80 @@ func TestAdminRelocateRangeRandom(t *testing.T) { // Regression test for https://github.com/cockroachdb/cockroach/issues/64325 // which makes sure an in-flight read operation during replica removal won't // return empty results. -func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) { +func TestReplicaRemovalDuringGet(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx) + defer tc.Stopper().Stop(ctx) + + // Perform write. + pArgs := putArgs(key, []byte("foo")) + _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) + require.Nil(t, pErr) + + // Perform delayed read during replica removal. + resp, pErr := evalDuringReplicaRemoval(ctx, getArgs(key)) + require.Nil(t, pErr) + require.NotNil(t, resp) + require.NotNil(t, resp.(*roachpb.GetResponse).Value) + val, err := resp.(*roachpb.GetResponse).Value.GetBytes() + require.NoError(t, err) + require.Equal(t, []byte("foo"), val) +} + +// Regression test for https://github.com/cockroachdb/cockroach/issues/46329 +// which makes sure an in-flight conditional put operation during replica +// removal won't spuriously error due to an unexpectedly missing value. +func TestReplicaRemovalDuringCPut(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx) + defer tc.Stopper().Stop(ctx) + + // Perform write. + pArgs := putArgs(key, []byte("foo")) + _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) + require.Nil(t, pErr) + + // Perform delayed conditional put during replica removal. This will cause + // an ambiguous result error, as outstanding proposals in the leaseholder + // replica's proposal queue will be aborted when the replica is removed. + req := cPutArgs(key, []byte("bar"), []byte("foo")) + _, pErr = evalDuringReplicaRemoval(ctx, req) + require.NotNil(t, pErr) + require.IsType(t, &roachpb.AmbiguousResultError{}, pErr.GetDetail()) +} + +// setupReplicaRemovalTest sets up a test cluster that can be used to test +// request evaluation during replica removal. It returns a running test +// cluster, the first key of a blank scratch range on the replica to be +// removed, and a function that can execute a delayed request just as the +// replica is being removed. +func setupReplicaRemovalTest( + t *testing.T, ctx context.Context, +) ( + *testcluster.TestCluster, + roachpb.Key, + func(context.Context, roachpb.Request) (roachpb.Response, *roachpb.Error), +) { + t.Helper() + type magicKey struct{} - // delayReadC is used to synchronize the in-flight read request with the main - // test goroutine. It is read from twice: - // - // 1. The first read allows the test to block until the request eval filter - // is called, i.e. when the read request is ready. - // 2. The second read allows the test to close the channel to unblock - // the eval filter, causing the read request to be evaluated. - delayReadC := make(chan struct{}) + requestReadyC := make(chan struct{}) // signals main thread that request is teed up + requestEvalC := make(chan struct{}) // signals cluster to evaluate the request evalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { if args.Ctx.Value(magicKey{}) != nil { - <-delayReadC - <-delayReadC + requestReadyC <- struct{}{} + <-requestEvalC } return nil } - ctx := context.Background() manual := hlc.NewHybridManualClock() args := base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, @@ -415,62 +466,57 @@ func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) { }, } tc := testcluster.StartTestCluster(t, 2, args) - defer tc.Stopper().Stop(ctx) // Create range and upreplicate. key := tc.ScratchRange(t) tc.AddVotersOrFatal(t, key, tc.Target(1)) - // Perform write. - pArgs := putArgs(key, []byte("foo")) - _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) - require.Nil(t, pErr) + // Return a function that can be used to evaluate a delayed request + // during replica removal. + evalDuringReplicaRemoval := func(ctx context.Context, req roachpb.Request) (roachpb.Response, *roachpb.Error) { + // Submit request and wait for it to block. + type result struct { + resp roachpb.Response + err *roachpb.Error + } + resultC := make(chan result) + err := tc.Stopper().RunAsyncTask(ctx, "request", func(ctx context.Context) { + reqCtx := context.WithValue(ctx, magicKey{}, struct{}{}) + resp, pErr := kv.SendWrapped(reqCtx, tc.Servers[0].DistSender(), req) + resultC <- result{resp, pErr} + }) + require.NoError(t, err) + <-requestReadyC + + // Transfer leaseholder to other store. + rangeDesc, err := tc.LookupRange(key) + require.NoError(t, err) + repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID) + require.NoError(t, err) + err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) + require.NoError(t, err) + + // Remove first store from raft group. + tc.RemoveVotersOrFatal(t, key, tc.Target(0)) + + // Wait for replica removal. This is a bit iffy. We want to make sure + // that, in the buggy case, we will typically fail (i.e. the request + // returns incorrect results because the replica was removed). However, + // in the non-buggy case the in-flight request will be holding + // readOnlyCmdMu until evaluated, blocking the replica removal, so + // waiting for replica removal would deadlock. We therefore take the + // easy way out by starting an async replica GC and sleeping for a bit. + err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) { + assert.NoError(t, tc.GetFirstStoreFromServer(t, 0).ManualReplicaGC(repl)) + }) + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) - // Perform read on write and wait for read to block. - type reply struct { - resp roachpb.Response - err *roachpb.Error + // Allow request to resume, and return the result. + close(requestEvalC) + r := <-resultC + return r.resp, r.err } - readResC := make(chan reply) - err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) { - readCtx := context.WithValue(ctx, magicKey{}, struct{}{}) - gArgs := getArgs(key) - resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs) - readResC <- reply{resp, pErr} - }) - require.NoError(t, err) - delayReadC <- struct{}{} - - // Transfer leaseholder to other store. - rangeDesc, err := tc.LookupRange(key) - require.NoError(t, err) - repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID) - require.NoError(t, err) - err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) - require.NoError(t, err) - // Remove first store from raft group. - tc.RemoveVotersOrFatal(t, key, tc.Target(0)) - - // This is a bit iffy. We want to make sure that, in the buggy case, we - // will typically fail (i.e. the read returns empty because the replica was - // removed). However, in the non-buggy case the in-flight read request will - // be holding readOnlyCmdMu until evaluated, blocking the replica removal, - // so waiting for replica removal would deadlock. We therefore take the - // easy way out by starting an async replica GC and sleeping for a bit. - err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) { - assert.NoError(t, tc.GetFirstStoreFromServer(t, 0).ManualReplicaGC(repl)) - }) - require.NoError(t, err) - time.Sleep(500 * time.Millisecond) - - // Allow read to resume. Should return "foo". - close(delayReadC) - r := <-readResC - require.Nil(t, r.err) - require.NotNil(t, r.resp) - require.NotNil(t, r.resp.(*roachpb.GetResponse).Value) - val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes() - require.NoError(t, err) - require.Equal(t, []byte("foo"), val) + return tc, key, evalDuringReplicaRemoval } diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index c01e05915014..8c7afa389f93 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -56,6 +56,25 @@ func putArgs(key roachpb.Key, value []byte) *roachpb.PutRequest { } } +// cPutArgs returns a ConditionPutRequest to the default replica +// for the specified key and value, with the given expected value. +func cPutArgs(key roachpb.Key, value, expValue []byte) *roachpb.ConditionalPutRequest { + var expBytes []byte + if expValue != nil { + var e roachpb.Value + e.SetBytes(expValue) + expBytes = e.TagAndDataBytes() + } + + return &roachpb.ConditionalPutRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: key, + }, + Value: roachpb.MakeValueFromBytes(value), + ExpBytes: expBytes, + } +} + // incrementArgs returns an IncrementRequest addressed to the default replica // for the specified key. func incrementArgs(key roachpb.Key, inc int64) *roachpb.IncrementRequest { diff --git a/pkg/kv/kvserver/replica_corruption.go b/pkg/kv/kvserver/replica_corruption.go index 061aa2d714d7..c8379f6ae54a 100644 --- a/pkg/kv/kvserver/replica_corruption.go +++ b/pkg/kv/kvserver/replica_corruption.go @@ -20,12 +20,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) -// maybeSetCorrupt is a stand-in for proper handling of failing replicas. Such a -// failure is indicated by a call to maybeSetCorrupt with a ReplicaCorruptionError. -// Currently any error is passed through, but prospectively it should stop the -// range from participating in progress, trigger a rebalance operation and -// decide on an error-by-error basis whether the corruption is limited to the -// range, store, node or cluster with corresponding actions taken. +// setCorruptRaftMuLocked is a stand-in for proper handling of failing replicas. +// Such a failure is indicated by a call to setCorruptRaftMuLocked with a +// ReplicaCorruptionError. Currently any error is passed through, but +// prospectively it should stop the range from participating in progress, +// trigger a rebalance operation and decide on an error-by-error basis whether +// the corruption is limited to the range, store, node or cluster with +// corresponding actions taken. // // Despite the fatal log call below this message we still return for the // sake of testing. @@ -36,15 +37,6 @@ import ( // best bet is to not have any of those. // @bdarnell remarks: Corruption errors should be rare so we may want the store // to just recompute its stats in the background when one occurs. -func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roachpb.Error { - if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { - r.raftMu.Lock() - defer r.raftMu.Unlock() - return r.setCorruptRaftMuLocked(ctx, cErr) - } - return pErr -} - func (r *Replica) setCorruptRaftMuLocked( ctx context.Context, cErr *roachpb.ReplicaCorruptionError, ) *roachpb.Error { diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 780d017bf295..886b9f34d10f 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -771,8 +771,9 @@ func (r *Replica) evaluateProposal( } // Evaluate the commands. If this returns without an error, the batch should - // be committed. Note that we don't hold any locks at this point. This is - // important since evaluating a proposal is expensive. + // be committed. Note that we don't hold any locks at this point, except a + // shared RLock on raftMuReadOnlyMu. This is important since evaluating a + // proposal is expensive. // // Note that, during evaluation, ba's read and write timestamps might get // bumped (see evaluateWriteBatchWithServersideRefreshes). @@ -789,7 +790,9 @@ func (r *Replica) evaluateProposal( } if pErr != nil { - pErr = r.maybeSetCorrupt(ctx, pErr) + if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + return &res, false /* needConsensus */, pErr + } txn := pErr.GetTxn() if txn != nil && ba.Txn == nil { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index c6cbcda6f5da..4417d11046fe 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -90,6 +90,8 @@ func (r *Replica) evalAndPropose( if isConcurrencyRetryError(pErr) { pErr = maybeAttachLease(pErr, &st.Lease) return nil, nil, 0, pErr + } else if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + return nil, nil, 0, pErr } // Attach the endCmds to the proposal and assume responsibility for diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index f85356fe52bd..529db86220a5 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -3326,6 +3326,9 @@ func TestReplicaNoTSCacheIncrementWithinTxn(t *testing.T) { // TestReplicaAbortSpanReadError verifies that an error is returned // to the client in the event that a AbortSpan entry is found but is // not decodable. +// +// This doubles as a test that replica corruption errors are propagated +// and handled correctly. func TestReplicaAbortSpanReadError(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 30b90f715baa..27bb683a185b 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -69,15 +69,17 @@ func (r *Replica) executeWriteBatch( ) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) { startTime := timeutil.Now() - // TODO(nvanbenschoten): unlike on the read-path (executeReadOnlyBatch), we - // don't synchronize with r.readOnlyCmdMu here. Is that ok? What if the - // replica is destroyed concurrently with a write? We won't be able to - // successfully propose as the lease will presumably have changed, but what - // if we hit an error during evaluation (e.g. a ConditionFailedError)? + // Even though we're not a read-only operation by definition, we have to + // take out a read lock on readOnlyCmdMu while performing any reads during + // pre-Raft evaluation (e.g. conditional puts), otherwise we can race with + // replica removal and get evaluated on an empty replica. We must release + // this lock before Raft execution, to avoid deadlocks. + r.readOnlyCmdMu.RLock() // Verify that the batch can be executed. st, err := r.checkExecutionCanProceed(ctx, ba, g) if err != nil { + r.readOnlyCmdMu.RUnlock() return nil, g, roachpb.NewError(err) } @@ -135,6 +137,7 @@ func (r *Replica) executeWriteBatch( // Checking the context just before proposing can help avoid ambiguous errors. if err := ctx.Err(); err != nil { log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) + r.readOnlyCmdMu.RUnlock() return nil, g, roachpb.NewError(errors.Wrapf(err, "aborted before proposing")) } @@ -143,6 +146,13 @@ func (r *Replica) executeWriteBatch( // evalAndPropose. ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, st, localUncertaintyLimit, tok.Move(ctx)) if pErr != nil { + r.readOnlyCmdMu.RUnlock() + if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + r.raftMu.Lock() + defer r.raftMu.Unlock() + // This exits with a fatal error, but returns in tests. + return nil, g, r.setCorruptRaftMuLocked(ctx, cErr) + } if maxLeaseIndex != 0 { log.Fatalf( ctx, "unexpected max lease index %d assigned to failed proposal: %s, error %s", @@ -162,6 +172,10 @@ func (r *Replica) executeWriteBatch( untrack(ctx, ctpb.Epoch(st.Lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex)) } + // We are done with pre-Raft evaluation at this point, and have to release the + // read-only command lock to avoid deadlocks during Raft evaluation. + r.readOnlyCmdMu.RUnlock() + // If the command was accepted by raft, wait for the range to apply it. ctxDone := ctx.Done() shouldQuiesce := r.store.stopper.ShouldQuiesce()