Skip to content

Commit

Permalink
Merge pull request #64604 from erikgrinaker/backport20.1-64471
Browse files Browse the repository at this point in the history
release-20.1: kvserver: synchronize replica removal with read-write requests
  • Loading branch information
erikgrinaker authored May 4, 2021
2 parents 1a0b6fa + 733ad65 commit 3ae6b59
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 87 deletions.
176 changes: 112 additions & 64 deletions pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,29 +210,82 @@ func TestAdminRelocateRange(t *testing.T) {
// Regression test for https://github.com/cockroachdb/cockroach/issues/64325
// which makes sure an in-flight read operation during replica removal won't
// return empty results.
func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) {
func TestReplicaRemovalDuringGet(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx)
defer tc.Stopper().Stop(ctx)

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)

// Perform delayed read during replica removal.
resp, pErr := evalDuringReplicaRemoval(ctx, getArgs(key))
require.Nil(t, pErr)
require.NotNil(t, resp)
require.NotNil(t, resp.(*roachpb.GetResponse).Value)
val, err := resp.(*roachpb.GetResponse).Value.GetBytes()
require.NoError(t, err)
require.Equal(t, []byte("foo"), val)
}

// Regression test for https://github.com/cockroachdb/cockroach/issues/46329
// which makes sure an in-flight conditional put operation during replica
// removal won't spuriously error due to an unexpectedly missing value.
func TestReplicaRemovalDuringCPut(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx)
defer tc.Stopper().Stop(ctx)

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)

// Perform delayed conditional put during replica removal. This will cause
// an ambiguous result error, as outstanding proposals in the leaseholder
// replica's proposal queue will be aborted when the replica is removed.
// If the replica was removed from under us, it would instead return a
// ConditionFailedError since it finds nil in place of "foo".
req := cPutArgs(key, []byte("bar"), []byte("foo"))
_, pErr = evalDuringReplicaRemoval(ctx, req)
require.NotNil(t, pErr)
require.IsType(t, &roachpb.AmbiguousResultError{}, pErr.GetDetail())
}

// setupReplicaRemovalTest sets up a test cluster that can be used to test
// request evaluation during replica removal. It returns a running test
// cluster, the first key of a blank scratch range on the replica to be
// removed, and a function that can execute a delayed request just as the
// replica is being removed.
func setupReplicaRemovalTest(
t *testing.T, ctx context.Context,
) (
*testcluster.TestCluster,
roachpb.Key,
func(context.Context, roachpb.Request) (roachpb.Response, *roachpb.Error),
) {
t.Helper()

type magicKey struct{}

// delayReadC is used to synchronize the in-flight read request with the main
// test goroutine. It is read from twice:
//
// 1. The first read allows the test to block until the request eval filter
// is called, i.e. when the read request is ready.
// 2. The second read allows the test to close the channel to unblock
// the eval filter, causing the read request to be evaluated.
delayReadC := make(chan struct{})
requestReadyC := make(chan struct{}) // signals main thread that request is teed up
requestEvalC := make(chan struct{}) // signals cluster to evaluate the request
evalFilter := func(args storagebase.FilterArgs) *roachpb.Error {
if args.Ctx.Value(magicKey{}) != nil {
<-delayReadC
<-delayReadC
requestReadyC <- struct{}{}
<-requestEvalC
}
return nil
}

ctx := context.Background()
manual := hlc.NewHybridManualClock()
args := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
Expand All @@ -250,64 +303,59 @@ func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) {
},
}
tc := testcluster.StartTestCluster(t, 2, args)
defer tc.Stopper().Stop(ctx)

// Create range and upreplicate.
key := tc.ScratchRange(t)
tc.AddReplicasOrFatal(t, key, tc.Target(1))

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)
// Return a function that can be used to evaluate a delayed request
// during replica removal.
evalDuringReplicaRemoval := func(ctx context.Context, req roachpb.Request) (roachpb.Response, *roachpb.Error) {
// Submit request and wait for it to block.
type result struct {
resp roachpb.Response
err *roachpb.Error
}
resultC := make(chan result)
err := tc.Stopper().RunAsyncTask(ctx, "request", func(ctx context.Context) {
reqCtx := context.WithValue(ctx, magicKey{}, struct{}{})
resp, pErr := kv.SendWrapped(reqCtx, tc.Servers[0].DistSender(), req)
resultC <- result{resp, pErr}
})
require.NoError(t, err)
<-requestReadyC

// Perform read on write and wait for read to block.
type reply struct {
resp roachpb.Response
err *roachpb.Error
}
readResC := make(chan reply)
err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) {
readCtx := context.WithValue(ctx, magicKey{}, struct{}{})
gArgs := getArgs(key)
resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs)
readResC <- reply{resp, pErr}
})
require.NoError(t, err)
delayReadC <- struct{}{}
// Transfer leaseholder to other store.
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)
store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID())
require.NoError(t, err)
repl, err := store.GetReplica(rangeDesc.RangeID)
require.NoError(t, err)
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)

// Transfer leaseholder to other store.
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)
store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID())
require.NoError(t, err)
repl, err := store.GetReplica(rangeDesc.RangeID)
require.NoError(t, err)
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)
// Remove first store from raft group.
tc.RemoveReplicasOrFatal(t, key, tc.Target(0))

// Remove first store from raft group.
tc.RemoveReplicasOrFatal(t, key, tc.Target(0))

// This is a bit iffy. We want to make sure that, in the buggy case, we
// will typically fail (i.e. the read returns empty because the replica was
// removed). However, in the non-buggy case the in-flight read request will
// be holding readOnlyCmdMu until evaluated, blocking the replica removal,
// so waiting for replica removal would deadlock. We therefore take the
// easy way out by starting an async replica GC and sleeping for a bit.
err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) {
assert.NoError(t, store.ManualReplicaGC(repl))
})
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)

// Allow read to resume. Should return "foo".
close(delayReadC)
r := <-readResC
require.Nil(t, r.err)
require.NotNil(t, r.resp)
require.NotNil(t, r.resp.(*roachpb.GetResponse).Value)
val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes()
require.NoError(t, err)
require.Equal(t, []byte("foo"), val)
// Wait for replica removal. This is a bit iffy. We want to make sure
// that, in the buggy case, we will typically fail (i.e. the request
// returns incorrect results because the replica was removed). However,
// in the non-buggy case the in-flight request will be holding
// readOnlyCmdMu until evaluated, blocking the replica removal, so
// waiting for replica removal would deadlock. We therefore take the
// easy way out by starting an async replica GC and sleeping for a bit.
err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) {
assert.NoError(t, store.ManualReplicaGC(repl))
})
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)

// Allow request to resume, and return the result.
close(requestEvalC)
r := <-resultC
return r.resp, r.err
}

return tc, key, evalDuringReplicaRemoval
}
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,24 @@ func putArgs(key roachpb.Key, value []byte) *roachpb.PutRequest {
}
}

// cPutArgs returns a ConditionPutRequest to the default replica
// for the specified key and value, with the given expected value.
func cPutArgs(key roachpb.Key, value, expValue []byte) *roachpb.ConditionalPutRequest {
var ev *roachpb.Value
if expValue != nil {
value := roachpb.MakeValueFromBytes(expValue)
ev = &value
}

return &roachpb.ConditionalPutRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
Value: roachpb.MakeValueFromBytes(value),
ExpValue: ev,
}
}

// incrementArgs returns an IncrementRequest addressed to the default replica
// for the specified key.
func incrementArgs(key roachpb.Key, inc int64) *roachpb.IncrementRequest {
Expand Down
22 changes: 7 additions & 15 deletions pkg/kv/kvserver/replica_corruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// maybeSetCorrupt is a stand-in for proper handling of failing replicas. Such a
// failure is indicated by a call to maybeSetCorrupt with a ReplicaCorruptionError.
// Currently any error is passed through, but prospectively it should stop the
// range from participating in progress, trigger a rebalance operation and
// decide on an error-by-error basis whether the corruption is limited to the
// range, store, node or cluster with corresponding actions taken.
// setCorruptRaftMuLocked is a stand-in for proper handling of failing replicas.
// Such a failure is indicated by a call to setCorruptRaftMuLocked with a
// ReplicaCorruptionError. Currently any error is passed through, but
// prospectively it should stop the range from participating in progress,
// trigger a rebalance operation and decide on an error-by-error basis whether
// the corruption is limited to the range, store, node or cluster with
// corresponding actions taken.
//
// Despite the fatal log call below this message we still return for the
// sake of testing.
Expand All @@ -37,15 +38,6 @@ import (
// best bet is to not have any of those.
// @bdarnell remarks: Corruption errors should be rare so we may want the store
// to just recompute its stats in the background when one occurs.
func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roachpb.Error {
if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
r.raftMu.Lock()
defer r.raftMu.Unlock()
return r.setCorruptRaftMuLocked(ctx, cErr)
}
return pErr
}

func (r *Replica) setCorruptRaftMuLocked(
ctx context.Context, cErr *roachpb.ReplicaCorruptionError,
) *roachpb.Error {
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,13 @@ func (r *Replica) evaluateProposal(
}

// Evaluate the commands. If this returns without an error, the batch should
// be committed. Note that we don't hold any locks at this point. This is
// important since evaluating a proposal is expensive.
// be committed. Note that we don't hold any locks at this point, except a
// shared RLock on readOnlyCmdMu. This is important since evaluating a
// proposal is expensive.
//
// Note that, during evaluation, ba's read and write timestamps might get
// bumped (see evaluateWriteBatchWithServersideRefreshes).
//
// TODO(tschottdorf): absorb all returned values in `res` below this point
// in the call stack as well.
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, latchSpans)
Expand All @@ -735,7 +740,9 @@ func (r *Replica) evaluateProposal(
}

if pErr != nil {
pErr = r.maybeSetCorrupt(ctx, pErr)
if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
return &res, false /* needConsensus */, pErr
}

txn := pErr.GetTxn()
if txn != nil && ba.Txn == nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func makeIDKey() storagebase.CmdIDKey {
// caller should relinquish all ownership of it. If it does return an error, the
// caller retains full ownership over the guard.
//
// Nothing here or below can take out a raftMu lock, since executeWriteBatch()
// is already holding readOnlyCmdMu when calling this. Locking raftMu after it
// would violate the locking order specified for Store.mu.
//
// Return values:
// - a channel which receives a response or error upon application
// - a closure used to attempt to abandon the command. When called, it unbinds
Expand All @@ -77,6 +81,8 @@ func (r *Replica) evalAndPropose(
// proagate the error. Don't assume ownership of the concurrency guard.
if isConcurrencyRetryError(pErr) {
return nil, nil, 0, pErr
} else if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
return nil, nil, 0, pErr
}

// Attach the endCmds to the proposal and assume responsibility for
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3168,6 +3168,9 @@ func TestReplicaNoTimestampIncrementWithinTxn(t *testing.T) {
// TestReplicaAbortSpanReadError verifies that an error is returned
// to the client in the event that a AbortSpan entry is found but is
// not decodable.
//
// This doubles as a test that replica corruption errors are propagated
// and handled correctly.
func TestReplicaAbortSpanReadError(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
25 changes: 20 additions & 5 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ func (r *Replica) executeWriteBatch(
) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) {
startTime := timeutil.Now()

// TODO(nvanbenschoten): unlike on the read-path (executeReadOnlyBatch), we
// don't synchronize with r.readOnlyCmdMu here. Is that ok? What if the
// replica is destroyed concurrently with a write? We won't be able to
// successfully propose as the lease will presumably have changed, but what
// if we hit an error during evaluation (e.g. a ConditionFailedError)?
// Even though we're not a read-only operation by definition, we have to
// take out a read lock on readOnlyCmdMu while performing any reads during
// pre-Raft evaluation (e.g. conditional puts), otherwise we can race with
// replica removal and get evaluated on an empty replica. We must release
// this lock before Raft execution, to avoid deadlocks.
r.readOnlyCmdMu.RLock()

// Verify that the batch can be executed.
// NB: we only need to check that the request is in the Range's key bounds
// at proposal time, not at application time, because the spanlatch manager
// will synchronize all requests (notably EndTxn with SplitTrigger) that may
// cause this condition to change.
if err := r.checkExecutionCanProceed(ba, g, &st); err != nil {
r.readOnlyCmdMu.RUnlock()
return nil, g, roachpb.NewError(err)
}

Expand Down Expand Up @@ -110,6 +112,7 @@ func (r *Replica) executeWriteBatch(

// Checking the context just before proposing can help avoid ambiguous errors.
if err := ctx.Err(); err != nil {
r.readOnlyCmdMu.RUnlock()
log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary())
return nil, g, roachpb.NewError(errors.Wrap(err, "aborted before proposing"))
}
Expand All @@ -121,6 +124,7 @@ func (r *Replica) executeWriteBatch(
// other transactions to be released while sequencing in the concurrency
// manager.
if curLease, _ := r.GetLease(); curLease.Sequence > st.Lease.Sequence {
r.readOnlyCmdMu.RUnlock()
curLeaseCpy := curLease // avoid letting curLease escape
err := newNotLeaseHolderError(&curLeaseCpy, r.store.StoreID(), r.Desc())
log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary())
Expand All @@ -132,6 +136,13 @@ func (r *Replica) executeWriteBatch(
// evalAndPropose.
ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, &st.Lease)
if pErr != nil {
r.readOnlyCmdMu.RUnlock()
if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
r.raftMu.Lock()
defer r.raftMu.Unlock()
// This exits with a fatal error, but returns in tests.
return nil, g, r.setCorruptRaftMuLocked(ctx, cErr)
}
if maxLeaseIndex != 0 {
log.Fatalf(
ctx, "unexpected max lease index %d assigned to failed proposal: %s, error %s",
Expand All @@ -150,6 +161,10 @@ func (r *Replica) executeWriteBatch(
untrack(ctx, ctpb.Epoch(st.Lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex))
}

// We are done with pre-Raft evaluation at this point, and have to release the
// read-only command lock to avoid deadlocks during Raft evaluation.
r.readOnlyCmdMu.RUnlock()

// If the command was accepted by raft, wait for the range to apply it.
ctxDone := ctx.Done()
shouldQuiesce := r.store.stopper.ShouldQuiesce()
Expand Down

0 comments on commit 3ae6b59

Please sign in to comment.