Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: support server-side refreshes of uncertainty errors #75905

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2262,9 +2262,9 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return txn.Run(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
filter: newUncertaintyFilter(roachpb.Key("a")),
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
// Even if accounting for the refresh spans would have exhausted the
Expand Down Expand Up @@ -2952,8 +2952,9 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
txnCoordRetry: true,
filter: newUncertaintyFilter(roachpb.Key("a")),
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
name: "cput within uncertainty interval after timestamp leaked",
Expand All @@ -2963,7 +2964,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
filter: newUncertaintyFilter(roachpb.Key("a")),
clientRetry: true,
tsLeaked: true,
},
Expand All @@ -2984,7 +2985,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))),
filter: newUncertaintyFilter(roachpb.Key("ac")),
txnCoordRetry: true,
},
{
Expand All @@ -3007,7 +3008,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return nil
},
filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))),
filter: newUncertaintyFilter(roachpb.Key("ac")),
clientRetry: true, // note this txn is read-only but still restarts
},
{
Expand All @@ -3023,7 +3024,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
txnCoordRetry: true,
},
{
Expand All @@ -3045,7 +3046,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
filter: newUncertaintyFilter(roachpb.Key("a")),
clientRetry: true, // will fail because of conflict on refresh span for the Get
},
{
Expand All @@ -3059,7 +3060,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
Expand All @@ -3069,7 +3070,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
_, err := txn.Scan(ctx, "a", "d", 0)
return err
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
Expand All @@ -3079,7 +3080,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
_, err := txn.DelRange(ctx, "a", "d", false /* returnKeys */)
return err
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,26 @@ func (g *Guard) AssertNoLatches() {
}
}

// IsolatedAtLaterTimestamps returns whether the request holding the guard would
// continue to be isolated from other requests / transactions even if it were to
// increase its request timestamp while evaluating. If the method returns false,
// the concurrency guard must be dropped and re-acquired with the new timestamp
// before the request can evaluate at that later timestamp.
func (g *Guard) IsolatedAtLaterTimestamps() bool {
// If the request acquired any read latches with bounded (MVCC) timestamps
// then it can not trivially bump its timestamp without dropping and
// re-acquiring those latches. Doing so could allow the request to read at an
// unprotected timestamp. We only look at global latch spans because local
// latch spans always use unbounded (NonMVCC) timestamps.
return len(g.Req.LatchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) == 0 &&
// Similarly, if the request declared any global or local read lock spans
// then it can not trivially bump its timestamp without dropping its
// lockTableGuard and re-scanning the lockTable. Doing so could allow the
// request to conflict with locks that it previously did not conflict with.
len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) == 0 &&
len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanLocal)) == 0
}

// CheckOptimisticNoConflicts checks that the {latch,lock}SpansRead do not
// have a conflicting latch, lock.
func (g *Guard) CheckOptimisticNoConflicts(
Expand Down
44 changes: 17 additions & 27 deletions pkg/kv/kvserver/replica_batch_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -183,7 +183,7 @@ func maybeStripInFlightWrites(ba *roachpb.BatchRequest) (*roachpb.BatchRequest,
// works for batches that exclusively contain writes; reads cannot be bumped
// like this because they've already acquired timestamp-aware latches.
func maybeBumpReadTimestampToWriteTimestamp(
ctx context.Context, ba *roachpb.BatchRequest, latchSpans *spanset.SpanSet,
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard,
) bool {
if ba.Txn == nil {
return false
Expand All @@ -202,53 +202,43 @@ func maybeBumpReadTimestampToWriteTimestamp(
if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) {
return false
}
return tryBumpBatchTimestamp(ctx, ba, ba.Txn.WriteTimestamp, latchSpans)
return tryBumpBatchTimestamp(ctx, ba, g, ba.Txn.WriteTimestamp)
}

// tryBumpBatchTimestamp attempts to bump ba's read and write timestamps to ts.
//
// This function is called both below and above latching, which is indicated by
// the concurrency guard argument. The concurrency guard, if not nil, indicates
// that the caller is holding latches and cannot adjust its timestamp beyond the
// limits of what is protected by those latches. If the concurrency guard is
// nil, the caller indicates that it is not holding latches and can therefore
// more freely adjust its timestamp because it will re-acquire latches at
// whatever timestamp the batch is bumped to.
//
// Returns true if the timestamp was bumped. Returns false if the timestamp could
// not be bumped.
func tryBumpBatchTimestamp(
ctx context.Context, ba *roachpb.BatchRequest, ts hlc.Timestamp, latchSpans *spanset.SpanSet,
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, ts hlc.Timestamp,
) bool {
if len(latchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) > 0 {
// If the batch acquired any read latches with bounded (MVCC) timestamps
// then we can not trivially bump the batch's timestamp without dropping
// and re-acquiring those latches. Doing so could allow the request to
// read at an unprotected timestamp. We only look at global latch spans
// because local latch spans always use unbounded (NonMVCC) timestamps.
//
// NOTE: even if we hold read latches with high enough timestamps to
// fully cover ("protect") the batch at the new timestamp, we still
// don't want to allow the bump. This is because a batch with read spans
// and a higher timestamp may now conflict with locks that it previously
// did not. However, server-side retries don't re-scan the lock table.
// This can lead to requests missing unreplicated locks in the lock
// table that they should have seen or discovering replicated intents in
// MVCC that they should not have seen (from the perspective of the lock
// table's AddDiscoveredLock method).
//
// NOTE: we could consider adding a retry-loop above the latch
// acquisition to allow this to be retried, but given that we try not to
// mix read-only and read-write requests, doing so doesn't seem worth
// it.
if g != nil && !g.IsolatedAtLaterTimestamps() {
return false
}
if ts.Less(ba.Timestamp) {
log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp)
}
ba.Timestamp = ts
if ba.Txn == nil {
log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp)
ba.Timestamp = ts
return true
}
if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) {
log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+
"ba.Txn.WriteTimestamp: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
}
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s)",
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s",
ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
ba.Txn = ba.Txn.Clone()
ba.Txn.Refresh(ts)
ba.Timestamp = ba.Txn.ReadTimestamp // Refresh just updated ReadTimestamp
return true
}
34 changes: 18 additions & 16 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -518,8 +518,16 @@ func evaluateCommand(
// for transactional requests, retrying is possible if the transaction had not
// performed any prior reads that need refreshing.
//
// This function is called both below and above latching, which is indicated by
// the concurrency guard argument. The concurrency guard, if not nil, indicates
// that the caller is holding latches and cannot adjust its timestamp beyond the
// limits of what is protected by those latches. If the concurrency guard is
// nil, the caller indicates that it is not holding latches and can therefore
// more freely adjust its timestamp because it will re-acquire latches at
// whatever timestamp the batch is bumped to.
//
// deadline, if not nil, specifies the highest timestamp (exclusive) at which
// the request can be evaluated. If ba is a transactional request, then dealine
// the request can be evaluated. If ba is a transactional request, then deadline
// cannot be specified; a transaction's deadline comes from it's EndTxn request.
//
// If true is returned, ba and ba.Txn will have been updated with the new
Expand All @@ -529,7 +537,7 @@ func canDoServersideRetry(
pErr *roachpb.Error,
ba *roachpb.BatchRequest,
br *roachpb.BatchResponse,
latchSpans *spanset.SpanSet,
g *concurrency.Guard,
deadline *hlc.Timestamp,
) bool {
if ba.Txn != nil {
Expand All @@ -548,17 +556,6 @@ func canDoServersideRetry(
var newTimestamp hlc.Timestamp
if ba.Txn != nil {
if pErr != nil {
// TODO(nvanbenschoten): This is intentionally not allowing server-side
// refreshes of ReadWithinUncertaintyIntervalErrors for now, even though
// that is the eventual goal here. Lifting that limitation will likely
// need to be accompanied by an above-latching retry loop, because read
// latches will usually prevent below-latch retries of
// ReadWithinUncertaintyIntervalErrors. See the comment in
// tryBumpBatchTimestamp.
if _, ok := pErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); ok {
return false
}

var ok bool
ok, newTimestamp = roachpb.TransactionRefreshTimestamp(pErr)
if !ok {
Expand All @@ -576,7 +573,12 @@ func canDoServersideRetry(
}
switch tErr := pErr.GetDetail().(type) {
case *roachpb.WriteTooOldError:
newTimestamp = tErr.ActualTimestamp
newTimestamp = tErr.RetryTimestamp()

// TODO(nvanbenschoten): give non-txn requests uncertainty intervals. #73732.
//case *roachpb.ReadWithinUncertaintyIntervalError:
// newTimestamp = tErr.RetryTimestamp()

default:
return false
}
Expand All @@ -585,5 +587,5 @@ func canDoServersideRetry(
if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) {
return false
}
return tryBumpBatchTimestamp(ctx, ba, newTimestamp, latchSpans)
return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp)
}
13 changes: 5 additions & 8 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -773,7 +773,7 @@ func (r *Replica) evaluateProposal(
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
ui uncertainty.Interval,
latchSpans *spanset.SpanSet,
g *concurrency.Guard,
) (*result.Result, bool, *roachpb.Error) {
if ba.Timestamp.IsEmpty() {
return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp")
Expand All @@ -789,7 +789,7 @@ func (r *Replica) evaluateProposal(
//
// TODO(tschottdorf): absorb all returned values in `res` below this point
// in the call stack as well.
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, latchSpans)
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, g)

// Note: reusing the proposer's batch when applying the command on the
// proposer was explored as an optimization but resulted in no performance
Expand Down Expand Up @@ -875,18 +875,15 @@ func (r *Replica) evaluateProposal(
// evaluating it. The returned ProposalData is partially valid even
// on a non-nil *roachpb.Error and should be proposed through Raft
// if ProposalData.command is non-nil.
//
// TODO(nvanbenschoten): combine idKey, ba, and latchSpans into a
// `serializedRequest` struct.
func (r *Replica) requestToProposal(
ctx context.Context,
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
st kvserverpb.LeaseStatus,
ui uncertainty.Interval,
latchSpans *spanset.SpanSet,
g *concurrency.Guard,
) (*ProposalData, *roachpb.Error) {
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, latchSpans)
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, g)

// Fill out the results even if pErr != nil; we'll return the error below.
proposal := &ProposalData{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *Replica) evalAndPropose(
) (chan proposalResult, func(), kvserverbase.CmdIDKey, *roachpb.Error) {
defer tok.DoneIfNotMoved(ctx)
idKey := makeIDKey()
proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, ui, g.LatchSpans())
proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, ui, g)
log.Event(proposal.ctx, "evaluated request")

// If the request hit a server-side concurrency retry error, immediately
Expand Down
13 changes: 5 additions & 8 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func (r *Replica) executeReadOnlyBatch(
ui := uncertainty.ComputeInterval(ba.Txn, st)

// Evaluate read-only batch command.
spans := g.LatchSpans()
rec := NewReplicaEvalContext(r, spans)
rec := NewReplicaEvalContext(r, g.LatchSpans())

// TODO(irfansharif): It's unfortunate that in this read-only code path,
// we're stuck with a ReadWriter because of the way evaluateBatch is
Expand All @@ -62,7 +61,7 @@ func (r *Replica) executeReadOnlyBatch(
panic("expected consistent iterators")
}
if util.RaceEnabled {
rw = spanset.NewReadWriterAt(rw, spans, ba.Timestamp)
rw = spanset.NewReadWriterAt(rw, g.LatchSpans(), ba.Timestamp)
}
defer rw.Close()

Expand All @@ -81,9 +80,7 @@ func (r *Replica) executeReadOnlyBatch(
// the latches are released.

var result result.Result
br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(
ctx, rw, rec, ba, ui, spans,
)
br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, ui, g)

// If the request hit a server-side concurrency retry error, immediately
// propagate the error. Don't assume ownership of the concurrency guard.
Expand Down Expand Up @@ -237,7 +234,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
rec batcheval.EvalContext,
ba *roachpb.BatchRequest,
ui uncertainty.Interval,
latchSpans *spanset.SpanSet,
g *concurrency.Guard,
) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) {
log.Event(ctx, "executing read-only batch")

Expand Down Expand Up @@ -290,7 +287,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, ui, true /* readOnly */)
// If we can retry, set a higher batch timestamp and continue.
// Allow one retry only.
if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, latchSpans, nil /* deadline */) {
if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, nil /* deadline */) {
break
}
}
Expand Down
Loading