Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: kvserver: synchronize replica removal with read-write requests #64598

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 110 additions & 62 deletions pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,29 +373,82 @@ func TestAdminRelocateRangeRandom(t *testing.T) {
// Regression test for https://github.com/cockroachdb/cockroach/issues/64325
// which makes sure an in-flight read operation during replica removal won't
// return empty results.
func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) {
func TestReplicaRemovalDuringGet(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx)
defer tc.Stopper().Stop(ctx)

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)

// Perform delayed read during replica removal.
resp, pErr := evalDuringReplicaRemoval(ctx, getArgs(key))
require.Nil(t, pErr)
require.NotNil(t, resp)
require.NotNil(t, resp.(*roachpb.GetResponse).Value)
val, err := resp.(*roachpb.GetResponse).Value.GetBytes()
require.NoError(t, err)
require.Equal(t, []byte("foo"), val)
}

// Regression test for https://github.com/cockroachdb/cockroach/issues/46329
// which makes sure an in-flight conditional put operation during replica
// removal won't spuriously error due to an unexpectedly missing value.
func TestReplicaRemovalDuringCPut(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx)
defer tc.Stopper().Stop(ctx)

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)

// Perform delayed conditional put during replica removal. This will cause
// an ambiguous result error, as outstanding proposals in the leaseholder
// replica's proposal queue will be aborted when the replica is removed.
// If the replica was removed from under us, it would instead return a
// ConditionFailedError since it finds nil in place of "foo".
req := cPutArgs(key, []byte("bar"), []byte("foo"))
_, pErr = evalDuringReplicaRemoval(ctx, req)
require.NotNil(t, pErr)
require.IsType(t, &roachpb.AmbiguousResultError{}, pErr.GetDetail())
}

// setupReplicaRemovalTest sets up a test cluster that can be used to test
// request evaluation during replica removal. It returns a running test
// cluster, the first key of a blank scratch range on the replica to be
// removed, and a function that can execute a delayed request just as the
// replica is being removed.
func setupReplicaRemovalTest(
t *testing.T, ctx context.Context,
) (
*testcluster.TestCluster,
roachpb.Key,
func(context.Context, roachpb.Request) (roachpb.Response, *roachpb.Error),
) {
t.Helper()

type magicKey struct{}

// delayReadC is used to synchronize the in-flight read request with the main
// test goroutine. It is read from twice:
//
// 1. The first read allows the test to block until the request eval filter
// is called, i.e. when the read request is ready.
// 2. The second read allows the test to close the channel to unblock
// the eval filter, causing the read request to be evaluated.
delayReadC := make(chan struct{})
requestReadyC := make(chan struct{}) // signals main thread that request is teed up
requestEvalC := make(chan struct{}) // signals cluster to evaluate the request
evalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error {
if args.Ctx.Value(magicKey{}) != nil {
<-delayReadC
<-delayReadC
requestReadyC <- struct{}{}
<-requestEvalC
}
return nil
}

ctx := context.Background()
manual := hlc.NewHybridManualClock()
args := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
Expand All @@ -415,62 +468,57 @@ func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) {
},
}
tc := testcluster.StartTestCluster(t, 2, args)
defer tc.Stopper().Stop(ctx)

// Create range and upreplicate.
key := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, key, tc.Target(1))

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)
// Return a function that can be used to evaluate a delayed request
// during replica removal.
evalDuringReplicaRemoval := func(ctx context.Context, req roachpb.Request) (roachpb.Response, *roachpb.Error) {
// Submit request and wait for it to block.
type result struct {
resp roachpb.Response
err *roachpb.Error
}
resultC := make(chan result)
err := tc.Stopper().RunAsyncTask(ctx, "request", func(ctx context.Context) {
reqCtx := context.WithValue(ctx, magicKey{}, struct{}{})
resp, pErr := kv.SendWrapped(reqCtx, tc.Servers[0].DistSender(), req)
resultC <- result{resp, pErr}
})
require.NoError(t, err)
<-requestReadyC

// Transfer leaseholder to other store.
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)
repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID)
require.NoError(t, err)
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)

// Remove first store from raft group.
tc.RemoveVotersOrFatal(t, key, tc.Target(0))

// Wait for replica removal. This is a bit iffy. We want to make sure
// that, in the buggy case, we will typically fail (i.e. the request
// returns incorrect results because the replica was removed). However,
// in the non-buggy case the in-flight request will be holding
// readOnlyCmdMu until evaluated, blocking the replica removal, so
// waiting for replica removal would deadlock. We therefore take the
// easy way out by starting an async replica GC and sleeping for a bit.
err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) {
assert.NoError(t, tc.GetFirstStoreFromServer(t, 0).ManualReplicaGC(repl))
})
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)

// Perform read on write and wait for read to block.
type reply struct {
resp roachpb.Response
err *roachpb.Error
// Allow request to resume, and return the result.
close(requestEvalC)
r := <-resultC
return r.resp, r.err
}
readResC := make(chan reply)
err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) {
readCtx := context.WithValue(ctx, magicKey{}, struct{}{})
gArgs := getArgs(key)
resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs)
readResC <- reply{resp, pErr}
})
require.NoError(t, err)
delayReadC <- struct{}{}

// Transfer leaseholder to other store.
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)
repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID)
require.NoError(t, err)
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)

// Remove first store from raft group.
tc.RemoveVotersOrFatal(t, key, tc.Target(0))

// This is a bit iffy. We want to make sure that, in the buggy case, we
// will typically fail (i.e. the read returns empty because the replica was
// removed). However, in the non-buggy case the in-flight read request will
// be holding readOnlyCmdMu until evaluated, blocking the replica removal,
// so waiting for replica removal would deadlock. We therefore take the
// easy way out by starting an async replica GC and sleeping for a bit.
err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) {
assert.NoError(t, tc.GetFirstStoreFromServer(t, 0).ManualReplicaGC(repl))
})
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)

// Allow read to resume. Should return "foo".
close(delayReadC)
r := <-readResC
require.Nil(t, r.err)
require.NotNil(t, r.resp)
require.NotNil(t, r.resp.(*roachpb.GetResponse).Value)
val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes()
require.NoError(t, err)
require.Equal(t, []byte("foo"), val)
return tc, key, evalDuringReplicaRemoval
}
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,23 @@ func putArgs(key roachpb.Key, value []byte) *roachpb.PutRequest {
}
}

// cPutArgs returns a ConditionPutRequest to the default replica
// for the specified key and value, with the given expected value.
func cPutArgs(key roachpb.Key, value, expValue []byte) *roachpb.ConditionalPutRequest {
var expBytes []byte
if expValue != nil {
expBytes = roachpb.MakeValueFromBytes(expValue).TagAndDataBytes()
}

return &roachpb.ConditionalPutRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
Value: roachpb.MakeValueFromBytes(value),
ExpBytes: expBytes,
}
}

// incrementArgs returns an IncrementRequest addressed to the default replica
// for the specified key.
func incrementArgs(key roachpb.Key, inc int64) *roachpb.IncrementRequest {
Expand Down
22 changes: 7 additions & 15 deletions pkg/kv/kvserver/replica_corruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// maybeSetCorrupt is a stand-in for proper handling of failing replicas. Such a
// failure is indicated by a call to maybeSetCorrupt with a ReplicaCorruptionError.
// Currently any error is passed through, but prospectively it should stop the
// range from participating in progress, trigger a rebalance operation and
// decide on an error-by-error basis whether the corruption is limited to the
// range, store, node or cluster with corresponding actions taken.
// setCorruptRaftMuLocked is a stand-in for proper handling of failing replicas.
// Such a failure is indicated by a call to setCorruptRaftMuLocked with a
// ReplicaCorruptionError. Currently any error is passed through, but
// prospectively it should stop the range from participating in progress,
// trigger a rebalance operation and decide on an error-by-error basis whether
// the corruption is limited to the range, store, node or cluster with
// corresponding actions taken.
//
// Despite the fatal log call below this message we still return for the
// sake of testing.
Expand All @@ -36,15 +37,6 @@ import (
// best bet is to not have any of those.
// @bdarnell remarks: Corruption errors should be rare so we may want the store
// to just recompute its stats in the background when one occurs.
func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roachpb.Error {
if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
r.raftMu.Lock()
defer r.raftMu.Unlock()
return r.setCorruptRaftMuLocked(ctx, cErr)
}
return pErr
}

func (r *Replica) setCorruptRaftMuLocked(
ctx context.Context, cErr *roachpb.ReplicaCorruptionError,
) *roachpb.Error {
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,8 +746,9 @@ func (r *Replica) evaluateProposal(
}

// Evaluate the commands. If this returns without an error, the batch should
// be committed. Note that we don't hold any locks at this point. This is
// important since evaluating a proposal is expensive.
// be committed. Note that we don't hold any locks at this point, except a
// shared RLock on raftMuReadOnlyMu. This is important since evaluating a
// proposal is expensive.
//
// Note that, during evaluation, ba's read and write timestamps might get
// bumped (see evaluateWriteBatchWithServersideRefreshes).
Expand All @@ -764,7 +765,9 @@ func (r *Replica) evaluateProposal(
}

if pErr != nil {
pErr = r.maybeSetCorrupt(ctx, pErr)
if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
return &res, false /* needConsensus */, pErr
}

txn := pErr.GetTxn()
if txn != nil && ba.Txn == nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func makeIDKey() kvserverbase.CmdIDKey {
// tok.Move() it into this method. It will be used to untrack the request once
// it comes out of the proposal buffer.
//
// Nothing here or below can take out a raftMu lock, since executeWriteBatch()
// is already holding readOnlyCmdMu when calling this. Locking raftMu after it
// would violate the locking order specified for Store.mu.
//
// Return values:
// - a channel which receives a response or error upon application
// - a closure used to attempt to abandon the command. When called, it unbinds
Expand Down Expand Up @@ -90,6 +94,8 @@ func (r *Replica) evalAndPropose(
if isConcurrencyRetryError(pErr) {
pErr = maybeAttachLease(pErr, &st.Lease)
return nil, nil, 0, pErr
} else if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
return nil, nil, 0, pErr
}

// Attach the endCmds to the proposal and assume responsibility for
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3325,6 +3325,9 @@ func TestReplicaNoTSCacheIncrementWithinTxn(t *testing.T) {
// TestReplicaAbortSpanReadError verifies that an error is returned
// to the client in the event that a AbortSpan entry is found but is
// not decodable.
//
// This doubles as a test that replica corruption errors are propagated
// and handled correctly.
func TestReplicaAbortSpanReadError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
24 changes: 19 additions & 5 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,17 @@ func (r *Replica) executeWriteBatch(
) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) {
startTime := timeutil.Now()

// TODO(nvanbenschoten): unlike on the read-path (executeReadOnlyBatch), we
// don't synchronize with r.readOnlyCmdMu here. Is that ok? What if the
// replica is destroyed concurrently with a write? We won't be able to
// successfully propose as the lease will presumably have changed, but what
// if we hit an error during evaluation (e.g. a ConditionFailedError)?
// Even though we're not a read-only operation by definition, we have to
// take out a read lock on readOnlyCmdMu while performing any reads during
// pre-Raft evaluation (e.g. conditional puts), otherwise we can race with
// replica removal and get evaluated on an empty replica. We must release
// this lock before Raft execution, to avoid deadlocks.
r.readOnlyCmdMu.RLock()

// Verify that the batch can be executed.
st, err := r.checkExecutionCanProceed(ctx, ba, g)
if err != nil {
r.readOnlyCmdMu.RUnlock()
return nil, g, roachpb.NewError(err)
}

Expand Down Expand Up @@ -135,6 +137,7 @@ func (r *Replica) executeWriteBatch(
// Checking the context just before proposing can help avoid ambiguous errors.
if err := ctx.Err(); err != nil {
log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary())
r.readOnlyCmdMu.RUnlock()
return nil, g, roachpb.NewError(errors.Wrapf(err, "aborted before proposing"))
}

Expand All @@ -143,6 +146,13 @@ func (r *Replica) executeWriteBatch(
// evalAndPropose.
ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, st, localUncertaintyLimit, tok.Move(ctx))
if pErr != nil {
r.readOnlyCmdMu.RUnlock()
if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
r.raftMu.Lock()
defer r.raftMu.Unlock()
// This exits with a fatal error, but returns in tests.
return nil, g, r.setCorruptRaftMuLocked(ctx, cErr)
}
if maxLeaseIndex != 0 {
log.Fatalf(
ctx, "unexpected max lease index %d assigned to failed proposal: %s, error %s",
Expand All @@ -162,6 +172,10 @@ func (r *Replica) executeWriteBatch(
untrack(ctx, ctpb.Epoch(st.Lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex))
}

// We are done with pre-Raft evaluation at this point, and have to release the
// read-only command lock to avoid deadlocks during Raft evaluation.
r.readOnlyCmdMu.RUnlock()

// If the command was accepted by raft, wait for the range to apply it.
ctxDone := ctx.Done()
shouldQuiesce := r.store.stopper.ShouldQuiesce()
Expand Down