From a095546265339624db9ba0db939c98b596c983c7 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 2 Feb 2022 15:33:52 -0500 Subject: [PATCH] kv: support server-side refreshes of uncertainty errors Extracted from #73732, with relevant comments addressed. This commit adds support for server-side refreshes of `ReadWithinUncertaintyIntervalError`s. This serves as a performance optimization for transactional requests, which now benefit from this new capability to refresh away `ReadWithinUncertaintyIntervalErrors` early in their transaction, before they've accumulated any refresh spans. There's some complexity around supporting this form of server-side retry, because it must be done above latching, instead of below. However, the recent refactoring in #73557 has made this possible to support cleanly. Specifically, we now handle `ReadWithinUncertaintyIntervalError` as a concurrency error in the `executeWithConcurrencyRetries` retry loop. This is different from other server-side retries, which are hit during writes and can be handled without releasing latches. This difference stems from the difference in how read and write latches behave. Write latches protect their MVCC timestamp and any later time. Meanwhile, read latches protect their MVCC timestamp and any earlier time. This means that a request holding read latches that hits an uncertainty error can't refresh without dropping those latches and acquiring new ones. This is also a prerequisite to giving non-transactional requests uncertainty intervals (#73732), because we don't want ReadWithinUncertaintyIntervalErrors to reach the client for non-transactional requests. Conveniently, because non-transactional requests are always scoped to a single-range, those that hit uncertainty errors will always be able to retry on the server, so these errors will never bubble up to the client that initiated the request. Release note (performance improvement): Certain forms of automatically retried "read uncertainty" errors are now retried more efficiently, avoiding a network round trip. --- .../kvcoord/dist_sender_server_test.go | 27 ++--- .../concurrency/concurrency_manager.go | 20 ++++ pkg/kv/kvserver/replica_batch_updates.go | 44 +++---- pkg/kv/kvserver/replica_evaluate.go | 34 +++--- pkg/kv/kvserver/replica_proposal.go | 13 +-- pkg/kv/kvserver/replica_raft.go | 2 +- pkg/kv/kvserver/replica_read.go | 13 +-- pkg/kv/kvserver/replica_send.go | 110 ++++++++++++++++-- pkg/kv/kvserver/replica_test.go | 76 +++++++++++- pkg/kv/kvserver/replica_write.go | 41 +++---- pkg/roachpb/data.go | 45 +------ pkg/roachpb/errors.go | 39 +++++++ pkg/sql/txn_restart_test.go | 51 ++++---- 13 files changed, 337 insertions(+), 178 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 7bd1da8fe740..be9e0a0e9bde 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2262,9 +2262,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.Run(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("a"))), - // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + filter: newUncertaintyFilter(roachpb.Key("a")), + // We expect the request to succeed after a server-side retry. + txnCoordRetry: false, }, { // Even if accounting for the refresh spans would have exhausted the @@ -2952,8 +2952,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("a"))), - txnCoordRetry: true, + filter: newUncertaintyFilter(roachpb.Key("a")), + // We expect the request to succeed after a server-side retry. + txnCoordRetry: false, }, { name: "cput within uncertainty interval after timestamp leaked", @@ -2963,7 +2964,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("a"))), + filter: newUncertaintyFilter(roachpb.Key("a")), clientRetry: true, tsLeaked: true, }, @@ -2984,7 +2985,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))), + filter: newUncertaintyFilter(roachpb.Key("ac")), txnCoordRetry: true, }, { @@ -3007,7 +3008,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return nil }, - filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))), + filter: newUncertaintyFilter(roachpb.Key("ac")), clientRetry: true, // note this txn is read-only but still restarts }, { @@ -3023,7 +3024,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("c"))), + filter: newUncertaintyFilter(roachpb.Key("c")), txnCoordRetry: true, }, { @@ -3045,7 +3046,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("a"))), + filter: newUncertaintyFilter(roachpb.Key("a")), clientRetry: true, // will fail because of conflict on refresh span for the Get }, { @@ -3059,7 +3060,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("c"))), + filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. txnCoordRetry: true, }, @@ -3069,7 +3070,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.Scan(ctx, "a", "d", 0) return err }, - filter: newUncertaintyFilter(roachpb.Key([]byte("c"))), + filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. txnCoordRetry: true, }, @@ -3079,7 +3080,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.DelRange(ctx, "a", "d", false /* returnKeys */) return err }, - filter: newUncertaintyFilter(roachpb.Key([]byte("c"))), + filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. txnCoordRetry: true, }, diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index 2089a348e7cd..5f63d0a05265 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -658,6 +658,26 @@ func (g *Guard) AssertNoLatches() { } } +// IsolatedAtLaterTimestamps returns whether the request holding the guard would +// continue to be isolated from other requests / transactions even if it were to +// increase its request timestamp while evaluating. If the method returns false, +// the concurrency guard must be dropped and re-acquired with the new timestamp +// before the request can evaluate at that later timestamp. +func (g *Guard) IsolatedAtLaterTimestamps() bool { + // If the request acquired any read latches with bounded (MVCC) timestamps + // then it can not trivially bump its timestamp without dropping and + // re-acquiring those latches. Doing so could allow the request to read at an + // unprotected timestamp. We only look at global latch spans because local + // latch spans always use unbounded (NonMVCC) timestamps. + return len(g.Req.LatchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) == 0 && + // Similarly, if the request declared any global or local read lock spans + // then it can not trivially bump its timestamp without dropping its + // lockTableGuard and re-scanning the lockTable. Doing so could allow the + // request to conflict with locks that it previously did not conflict with. + len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) == 0 && + len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanLocal)) == 0 +} + // CheckOptimisticNoConflicts checks that the {latch,lock}SpansRead do not // have a conflicting latch, lock. func (g *Guard) CheckOptimisticNoConflicts( diff --git a/pkg/kv/kvserver/replica_batch_updates.go b/pkg/kv/kvserver/replica_batch_updates.go index 2c1bd69904e0..52ea4f37d9fb 100644 --- a/pkg/kv/kvserver/replica_batch_updates.go +++ b/pkg/kv/kvserver/replica_batch_updates.go @@ -14,7 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -183,7 +183,7 @@ func maybeStripInFlightWrites(ba *roachpb.BatchRequest) (*roachpb.BatchRequest, // works for batches that exclusively contain writes; reads cannot be bumped // like this because they've already acquired timestamp-aware latches. func maybeBumpReadTimestampToWriteTimestamp( - ctx context.Context, ba *roachpb.BatchRequest, latchSpans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, ) bool { if ba.Txn == nil { return false @@ -202,53 +202,43 @@ func maybeBumpReadTimestampToWriteTimestamp( if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) { return false } - return tryBumpBatchTimestamp(ctx, ba, ba.Txn.WriteTimestamp, latchSpans) + return tryBumpBatchTimestamp(ctx, ba, g, ba.Txn.WriteTimestamp) } // tryBumpBatchTimestamp attempts to bump ba's read and write timestamps to ts. // +// This function is called both below and above latching, which is indicated by +// the concurrency guard argument. The concurrency guard, if not nil, indicates +// that the caller is holding latches and cannot adjust its timestamp beyond the +// limits of what is protected by those latches. If the concurrency guard is +// nil, the caller indicates that it is not holding latches and can therefore +// more freely adjust its timestamp because it will re-acquire latches at +// whatever timestamp the batch is bumped to. +// // Returns true if the timestamp was bumped. Returns false if the timestamp could // not be bumped. func tryBumpBatchTimestamp( - ctx context.Context, ba *roachpb.BatchRequest, ts hlc.Timestamp, latchSpans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, ts hlc.Timestamp, ) bool { - if len(latchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) > 0 { - // If the batch acquired any read latches with bounded (MVCC) timestamps - // then we can not trivially bump the batch's timestamp without dropping - // and re-acquiring those latches. Doing so could allow the request to - // read at an unprotected timestamp. We only look at global latch spans - // because local latch spans always use unbounded (NonMVCC) timestamps. - // - // NOTE: even if we hold read latches with high enough timestamps to - // fully cover ("protect") the batch at the new timestamp, we still - // don't want to allow the bump. This is because a batch with read spans - // and a higher timestamp may now conflict with locks that it previously - // did not. However, server-side retries don't re-scan the lock table. - // This can lead to requests missing unreplicated locks in the lock - // table that they should have seen or discovering replicated intents in - // MVCC that they should not have seen (from the perspective of the lock - // table's AddDiscoveredLock method). - // - // NOTE: we could consider adding a retry-loop above the latch - // acquisition to allow this to be retried, but given that we try not to - // mix read-only and read-write requests, doing so doesn't seem worth - // it. + if g != nil && !g.IsolatedAtLaterTimestamps() { return false } if ts.Less(ba.Timestamp) { log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp) } - ba.Timestamp = ts if ba.Txn == nil { + log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp) + ba.Timestamp = ts return true } if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) { log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+ "ba.Txn.WriteTimestamp: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp) } - log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s)", + log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp) ba.Txn = ba.Txn.Clone() ba.Txn.Refresh(ts) + ba.Timestamp = ba.Txn.ReadTimestamp // Refresh just updated ReadTimestamp return true } diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index a58bbbf91814..dfee7f2071e1 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -16,8 +16,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -518,8 +518,16 @@ func evaluateCommand( // for transactional requests, retrying is possible if the transaction had not // performed any prior reads that need refreshing. // +// This function is called both below and above latching, which is indicated by +// the concurrency guard argument. The concurrency guard, if not nil, indicates +// that the caller is holding latches and cannot adjust its timestamp beyond the +// limits of what is protected by those latches. If the concurrency guard is +// nil, the caller indicates that it is not holding latches and can therefore +// more freely adjust its timestamp because it will re-acquire latches at +// whatever timestamp the batch is bumped to. +// // deadline, if not nil, specifies the highest timestamp (exclusive) at which -// the request can be evaluated. If ba is a transactional request, then dealine +// the request can be evaluated. If ba is a transactional request, then deadline // cannot be specified; a transaction's deadline comes from it's EndTxn request. // // If true is returned, ba and ba.Txn will have been updated with the new @@ -529,7 +537,7 @@ func canDoServersideRetry( pErr *roachpb.Error, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, deadline *hlc.Timestamp, ) bool { if ba.Txn != nil { @@ -548,17 +556,6 @@ func canDoServersideRetry( var newTimestamp hlc.Timestamp if ba.Txn != nil { if pErr != nil { - // TODO(nvanbenschoten): This is intentionally not allowing server-side - // refreshes of ReadWithinUncertaintyIntervalErrors for now, even though - // that is the eventual goal here. Lifting that limitation will likely - // need to be accompanied by an above-latching retry loop, because read - // latches will usually prevent below-latch retries of - // ReadWithinUncertaintyIntervalErrors. See the comment in - // tryBumpBatchTimestamp. - if _, ok := pErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); ok { - return false - } - var ok bool ok, newTimestamp = roachpb.TransactionRefreshTimestamp(pErr) if !ok { @@ -576,7 +573,12 @@ func canDoServersideRetry( } switch tErr := pErr.GetDetail().(type) { case *roachpb.WriteTooOldError: - newTimestamp = tErr.ActualTimestamp + newTimestamp = tErr.RetryTimestamp() + + // TODO(nvanbenschoten): give non-txn requests uncertainty intervals. #73732. + //case *roachpb.ReadWithinUncertaintyIntervalError: + // newTimestamp = tErr.RetryTimestamp() + default: return false } @@ -585,5 +587,5 @@ func canDoServersideRetry( if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) { return false } - return tryBumpBatchTimestamp(ctx, ba, newTimestamp, latchSpans) + return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp) } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index effe9fcdb977..7a940b4440a6 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -21,10 +21,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -773,7 +773,7 @@ func (r *Replica) evaluateProposal( idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, ) (*result.Result, bool, *roachpb.Error) { if ba.Timestamp.IsEmpty() { return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp") @@ -789,7 +789,7 @@ func (r *Replica) evaluateProposal( // // TODO(tschottdorf): absorb all returned values in `res` below this point // in the call stack as well. - batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, latchSpans) + batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, g) // Note: reusing the proposer's batch when applying the command on the // proposer was explored as an optimization but resulted in no performance @@ -875,18 +875,15 @@ func (r *Replica) evaluateProposal( // evaluating it. The returned ProposalData is partially valid even // on a non-nil *roachpb.Error and should be proposed through Raft // if ProposalData.command is non-nil. -// -// TODO(nvanbenschoten): combine idKey, ba, and latchSpans into a -// `serializedRequest` struct. func (r *Replica) requestToProposal( ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, st kvserverpb.LeaseStatus, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, ) (*ProposalData, *roachpb.Error) { - res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, latchSpans) + res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, g) // Fill out the results even if pErr != nil; we'll return the error below. proposal := &ProposalData{ diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index a3b824a26604..9084a612a453 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -107,7 +107,7 @@ func (r *Replica) evalAndPropose( ) (chan proposalResult, func(), kvserverbase.CmdIDKey, *roachpb.Error) { defer tok.DoneIfNotMoved(ctx) idKey := makeIDKey() - proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, ui, g.LatchSpans()) + proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, ui, g) log.Event(proposal.ctx, "evaluated request") // If the request hit a server-side concurrency retry error, immediately diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index c7050b1a66e0..21cf12b33c61 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -49,8 +49,7 @@ func (r *Replica) executeReadOnlyBatch( ui := uncertainty.ComputeInterval(ba.Txn, st) // Evaluate read-only batch command. - spans := g.LatchSpans() - rec := NewReplicaEvalContext(r, spans) + rec := NewReplicaEvalContext(r, g.LatchSpans()) // TODO(irfansharif): It's unfortunate that in this read-only code path, // we're stuck with a ReadWriter because of the way evaluateBatch is @@ -62,7 +61,7 @@ func (r *Replica) executeReadOnlyBatch( panic("expected consistent iterators") } if util.RaceEnabled { - rw = spanset.NewReadWriterAt(rw, spans, ba.Timestamp) + rw = spanset.NewReadWriterAt(rw, g.LatchSpans(), ba.Timestamp) } defer rw.Close() @@ -81,9 +80,7 @@ func (r *Replica) executeReadOnlyBatch( // the latches are released. var result result.Result - br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes( - ctx, rw, rec, ba, ui, spans, - ) + br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, ui, g) // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. @@ -237,7 +234,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( rec batcheval.EvalContext, ba *roachpb.BatchRequest, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, ) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { log.Event(ctx, "executing read-only batch") @@ -290,7 +287,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, ui, true /* readOnly */) // If we can retry, set a higher batch timestamp and continue. // Allow one retry only. - if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, latchSpans, nil /* deadline */) { + if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, nil /* deadline */) { break } } diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 1a92fda9cf5f..31c6b736a76b 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -400,17 +400,9 @@ var _ batchExecutionFn = (*Replica).executeReadOnlyBatch func (r *Replica) executeBatchWithConcurrencyRetries( ctx context.Context, ba *roachpb.BatchRequest, fn batchExecutionFn, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { - // Determine the maximal set of key spans that the batch will operate on. - // This is used below to sequence the request in the concurrency manager. - latchSpans, lockSpans, requestEvalKind, err := r.collectSpans(ba) - if err != nil { - return nil, roachpb.NewError(err) - } - - // Handle load-based splitting. - r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans) - // Try to execute command; exit retry loop on success. + var latchSpans, lockSpans *spanset.SpanSet + var requestEvalKind concurrency.RequestEvalKind var g *concurrency.Guard defer func() { // NB: wrapped to delay g evaluation to its value when returning. @@ -418,12 +410,31 @@ func (r *Replica) executeBatchWithConcurrencyRetries( r.concMgr.FinishReq(g) } }() - for { + for first := true; ; first = false { // Exit loop if context has been canceled or timed out. if err := ctx.Err(); err != nil { return nil, roachpb.NewError(errors.Wrap(err, "aborted during Replica.Send")) } + // Determine the maximal set of key spans that the batch will operate on. + // This is used below to sequence the request in the concurrency manager. + // + // Only do so if the latchSpans and lockSpans are not being preserved from a + // prior iteration, either directly or in a concurrency guard that we intend + // to re-use during sequencing. + if latchSpans == nil && g == nil { + var err error + latchSpans, lockSpans, requestEvalKind, err = r.collectSpans(ba) + if err != nil { + return nil, roachpb.NewError(err) + } + } + + // Handle load-based splitting, if necessary. + if first { + r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans) + } + // Acquire latches to prevent overlapping requests from executing until // this request completes. After latching, wait on any conflicting locks // to ensure that the request has full isolation during evaluation. This @@ -495,6 +506,25 @@ func (r *Replica) executeBatchWithConcurrencyRetries( if pErr = r.handleIndeterminateCommitError(ctx, ba, pErr, t); pErr != nil { return nil, pErr } + case *roachpb.ReadWithinUncertaintyIntervalError: + // Drop latches and lock wait-queues. + r.concMgr.FinishReq(g) + g = nil + // If the batch is able to perform a server-side retry in order to avoid + // the uncertainty error, it will have a new timestamp. Force a refresh of + // the latch and lock spans. + latchSpans, lockSpans = nil, nil + // Attempt to adjust the batch's timestamp to avoid the uncertainty error + // and allow for a server-side retry. For transactional requests, there + // are strict conditions that must be met for this to be permitted. For + // non-transactional requests, this is always allowed. If successful, an + // updated BatchRequest will be returned. If unsuccessful, the provided + // read within uncertainty interval error will be returned so that we can + // propagate it. + ba, pErr = r.handleReadWithinUncertaintyIntervalError(ctx, ba, pErr, t) + if pErr != nil { + return nil, pErr + } case *roachpb.InvalidLeaseError: // Drop latches and lock wait-queues. latchSpans, lockSpans = g.TakeSpanSets() @@ -555,6 +585,34 @@ func isConcurrencyRetryError(pErr *roachpb.Error) bool { // the pushee is aborted or committed, so the request must kick off the // "transaction recovery procedure" to resolve this ambiguity before // retrying. + case *roachpb.ReadWithinUncertaintyIntervalError: + // If a request hits a ReadWithinUncertaintyIntervalError, it was performing + // a non-locking read [1] and encountered a (committed or provisional) write + // within the uncertainty interval of the reader. Depending on the state of + // the request (see conditions in canDoServersideRetry), it may be able to + // adjust its timestamp and retry on the server. + // + // This is similar to other server-side retries that we allow below + // latching, like for WriteTooOld errors. However, because uncertainty + // errors are specific to non-locking reads, they can not [2] be retried + // without first dropping and re-acquiring their read latches at a higher + // timestamp. This is unfortunate for uncertainty errors, as it leads to + // some extra work. + // + // On the other hand, it is more important for other forms of retry errors + // to be handled without dropping latches because they could be starved by + // repeated conflicts. For instance, if WriteTooOld errors caused a write + // request to drop and re-acquire latches, it is possible that the request + // could return after each retry to find a new WriteTooOld conflict, never + // managing to complete. This is not the case for uncertainty errors, which + // can not occur indefinitely. A request (transactional or otherwise) has a + // fixed uncertainty window and, once exhausted, will never hit an + // uncertainty error again. + // + // [1] if a locking read observes a write at a later timestamp, it returns a + // WriteTooOld error. It's uncertainty interval does not matter. + // [2] in practice, this is enforced by tryBumpBatchTimestamp's call to + // (*concurrency.Guard).IsolatedAtLaterTimestamps. case *roachpb.InvalidLeaseError: // If a request hits an InvalidLeaseError, the replica it is being // evaluated against does not have a valid lease under which it can @@ -699,6 +757,36 @@ func (r *Replica) handleIndeterminateCommitError( return nil } +func (r *Replica) handleReadWithinUncertaintyIntervalError( + ctx context.Context, + ba *roachpb.BatchRequest, + pErr *roachpb.Error, + t *roachpb.ReadWithinUncertaintyIntervalError, +) (*roachpb.BatchRequest, *roachpb.Error) { + // Attempt a server-side retry of the request. Note that we pass nil for + // latchSpans, because we have already released our latches and plan to + // re-acquire them if the retry is allowed. + if !canDoServersideRetry(ctx, pErr, ba, nil /* br */, nil /* g */, nil /* deadline */) { + return nil, pErr + } + // TODO(nvanbenschoten): give non-txn requests uncertainty intervals. #73732. + //if ba.Txn == nil && ba.Timestamp.Synthetic { + // // If the request is non-transactional and it was refreshed into the future + // // after observing a value with a timestamp in the future, immediately sleep + // // until its new read timestamp becomes present. We don't need to do this + // // for transactional requests because they will do this during their + // // commit-wait sleep after committing. + // // + // // See TxnCoordSender.maybeCommitWait for a discussion about why doing this + // // is necessary to preserve real-time ordering for transactions that write + // // into the future. + // if err := r.Clock().SleepUntil(ctx, ba.Timestamp); err != nil { + // return nil, roachpb.NewError(err) + // } + //} + return ba, nil +} + func (r *Replica) handleInvalidLeaseError( ctx context.Context, ba *roachpb.BatchRequest, ) *roachpb.Error { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 42553b6965d0..36cb73be835e 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -475,7 +475,7 @@ func TestIsOnePhaseCommit(t *testing.T) { // Emulate what a server actually does and bump the write timestamp when // possible. This makes some batches with diverged read and write // timestamps pass isOnePhaseCommit(). - maybeBumpReadTimestampToWriteTimestamp(ctx, &ba, &spanset.SpanSet{}) + maybeBumpReadTimestampToWriteTimestamp(ctx, &ba, allSpansGuard()) if is1PC := isOnePhaseCommit(&ba); is1PC != c.exp1PC { t.Errorf("expected 1pc=%t; got %t", c.exp1PC, is1PC) @@ -8079,7 +8079,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ba.Timestamp = tc.Clock().Now() ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}}) st := r.CurrentLeaseStatus(ctx) - cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, uncertainty.Interval{}, allSpans()) + cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, uncertainty.Interval{}, allSpansGuard()) if pErr != nil { t.Fatal(pErr) } @@ -8201,7 +8201,7 @@ func TestReplicaRefreshMultiple(t *testing.T) { incCmdID = makeIDKey() atomic.StoreInt32(&filterActive, 1) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpans()) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpansGuard()) if pErr != nil { t.Fatal(pErr) } @@ -8734,7 +8734,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { assignSeqNumsForReqs(txn, &txnPut, &txnPut2) origTxn := txn.Clone() - batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, uncertainty.Interval{}, allSpans()) + batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, uncertainty.Interval{}, allSpansGuard()) defer batch.Close() if pErr != nil { t.Fatal(pErr) @@ -10695,6 +10695,72 @@ func TestReplicaServersideRefreshes(t *testing.T) { // EndTransaction. This is hard to do at the moment, though, because we // never defer the handling of the write too old conditions to the end of // the transaction (but we might in the future). + { + name: "serverside-refresh of read within uncertainty interval error on get in non-1PC txn", + setupFn: func() (hlc.Timestamp, error) { + return put("a", "put") + }, + batchFn: func(ts hlc.Timestamp) (ba roachpb.BatchRequest, expTS hlc.Timestamp) { + expTS = ts.Next() + ts = ts.Prev() + ba.Txn = newTxn("a", ts) + ba.Txn.GlobalUncertaintyLimit = expTS + ba.CanForwardReadTimestamp = true // necessary to indicate serverside-refresh is possible + get := getArgs(roachpb.Key("a")) + ba.Add(&get) + return + }, + }, + { + name: "serverside-refresh of read within uncertainty interval error on get in non-1PC txn with prior reads", + setupFn: func() (hlc.Timestamp, error) { + return put("a", "put") + }, + batchFn: func(ts hlc.Timestamp) (ba roachpb.BatchRequest, expTS hlc.Timestamp) { + ts = ts.Prev() + ba.Txn = newTxn("a", ts) + ba.Txn.GlobalUncertaintyLimit = ts.Next() + get := getArgs(roachpb.Key("a")) + ba.Add(&get) + return + }, + expErr: "ReadWithinUncertaintyIntervalError", + }, + { + name: "serverside-refresh of read within uncertainty interval error on get in 1PC txn", + setupFn: func() (hlc.Timestamp, error) { + return put("a", "put") + }, + batchFn: func(ts hlc.Timestamp) (ba roachpb.BatchRequest, expTS hlc.Timestamp) { + expTS = ts.Next() + ts = ts.Prev() + ba.Txn = newTxn("a", ts) + ba.Txn.GlobalUncertaintyLimit = expTS + ba.CanForwardReadTimestamp = true // necessary to indicate serverside-refresh is possible + get := getArgs(roachpb.Key("a")) + et, _ := endTxnArgs(ba.Txn, true /* commit */) + ba.Add(&get, &et) + assignSeqNumsForReqs(ba.Txn, &get, &et) + return + }, + }, + { + name: "serverside-refresh of read within uncertainty interval error on get in 1PC txn with prior reads", + setupFn: func() (hlc.Timestamp, error) { + return put("a", "put") + }, + batchFn: func(ts hlc.Timestamp) (ba roachpb.BatchRequest, expTS hlc.Timestamp) { + ts = ts.Prev() + ba.Txn = newTxn("a", ts) + ba.Txn.GlobalUncertaintyLimit = ts.Next() + get := getArgs(roachpb.Key("a")) + et, _ := endTxnArgs(ba.Txn, true /* commit */) + ba.Add(&get, &et) + assignSeqNumsForReqs(ba.Txn, &get, &et) + return + }, + expErr: "ReadWithinUncertaintyIntervalError", + }, } for _, test := range testCases { @@ -12939,7 +13005,7 @@ func TestContainsEstimatesClampProposal(t *testing.T) { ba.Timestamp = tc.Clock().Now() req := putArgs(roachpb.Key("some-key"), []byte("some-value")) ba.Add(&req) - proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpans()) + proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpansGuard()) if err != nil { t.Error(err) } diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 110b6124324c..276a07aef347 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -316,7 +316,7 @@ func (r *Replica) executeWriteBatch( // canAttempt1PCEvaluation looks at the batch and decides whether it can be // executed as 1PC. func (r *Replica) canAttempt1PCEvaluation( - ctx context.Context, ba *roachpb.BatchRequest, latchSpans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, ) bool { if !isOnePhaseCommit(ba) { return false @@ -343,7 +343,7 @@ func (r *Replica) canAttempt1PCEvaluation( ba.Txn.WriteTimestamp = minCommitTS // We can only evaluate at the new timestamp if we manage to bump the read // timestamp. - return maybeBumpReadTimestampToWriteTimestamp(ctx, ba, latchSpans) + return maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) } return true } @@ -362,19 +362,19 @@ func (r *Replica) evaluateWriteBatch( idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, ) (storage.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) { log.Event(ctx, "executing read-write batch") // If the transaction has been pushed but it can commit at the higher // timestamp, let's evaluate the batch at the bumped timestamp. This will // allow it commit, and also it'll allow us to attempt the 1PC code path. - maybeBumpReadTimestampToWriteTimestamp(ctx, ba, latchSpans) + maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) // Attempt 1PC execution, if applicable. If not transactional or there are // indications that the batch's txn will require retry, execute as normal. - if r.canAttempt1PCEvaluation(ctx, ba, latchSpans) { - res := r.evaluate1PC(ctx, idKey, ba, latchSpans) + if r.canAttempt1PCEvaluation(ctx, ba, g) { + res := r.evaluate1PC(ctx, idKey, ba, g) switch res.success { case onePCSucceeded: return res.batch, res.stats, res.br, res.res, nil @@ -403,9 +403,9 @@ func (r *Replica) evaluateWriteBatch( } ms := new(enginepb.MVCCStats) - rec := NewReplicaEvalContext(r, latchSpans) + rec := NewReplicaEvalContext(r, g.LatchSpans()) batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes( - ctx, idKey, rec, ms, ba, ui, latchSpans, nil /* deadline */) + ctx, idKey, rec, ms, ba, ui, g, nil /* deadline */) return batch, *ms, br, res, pErr } @@ -445,10 +445,7 @@ type onePCResult struct { // efficient - we're avoiding writing the transaction record and writing and the // immediately deleting intents. func (r *Replica) evaluate1PC( - ctx context.Context, - idKey kvserverbase.CmdIDKey, - ba *roachpb.BatchRequest, - latchSpans *spanset.SpanSet, + ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, g *concurrency.Guard, ) (onePCRes onePCResult) { log.VEventf(ctx, 2, "attempting 1PC execution") @@ -471,7 +468,7 @@ func (r *Replica) evaluate1PC( // Is this relying on the batch being write-only? ui := uncertainty.Interval{} - rec := NewReplicaEvalContext(r, latchSpans) + rec := NewReplicaEvalContext(r, g.LatchSpans()) var br *roachpb.BatchResponse var res result.Result var pErr *roachpb.Error @@ -483,10 +480,10 @@ func (r *Replica) evaluate1PC( ms := new(enginepb.MVCCStats) if ba.CanForwardReadTimestamp { batch, br, res, pErr = r.evaluateWriteBatchWithServersideRefreshes( - ctx, idKey, rec, ms, &strippedBa, ui, latchSpans, etArg.Deadline) + ctx, idKey, rec, ms, &strippedBa, ui, g, etArg.Deadline) } else { batch, br, res, pErr = r.evaluateWriteBatchWrapper( - ctx, idKey, rec, ms, &strippedBa, ui, latchSpans) + ctx, idKey, rec, ms, &strippedBa, ui, g) } if pErr != nil || (!ba.CanForwardReadTimestamp && ba.Timestamp != br.Timestamp) { @@ -577,7 +574,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, deadline *hlc.Timestamp, ) (batch storage.Batch, br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { goldenMS := *ms @@ -591,7 +588,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( batch.Close() } - batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, ui, latchSpans) + batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, ui, g) var success bool if pErr == nil { @@ -604,7 +601,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( // If we can retry, set a higher batch timestamp and continue. // Allow one retry only; a non-txn batch containing overlapping // spans will always experience WriteTooOldError. - if success || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, latchSpans, deadline) { + if success || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, deadline) { break } } @@ -620,9 +617,9 @@ func (r *Replica) evaluateWriteBatchWrapper( ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, ) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) { - batch, opLogger := r.newBatchedEngine(ba, latchSpans) + batch, opLogger := r.newBatchedEngine(ba, g) br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, ui, false /* readOnly */) if pErr == nil { if opLogger != nil { @@ -639,7 +636,7 @@ func (r *Replica) evaluateWriteBatchWrapper( // OpLogger is attached to the returned engine.Batch, recording all operations. // Its recording should be attached to the Result of request evaluation. func (r *Replica) newBatchedEngine( - ba *roachpb.BatchRequest, latchSpans *spanset.SpanSet, + ba *roachpb.BatchRequest, g *concurrency.Guard, ) (storage.Batch, *storage.OpLoggerBatch) { batch := r.store.Engine().NewBatch() if !batch.ConsistentIterators() { @@ -690,7 +687,7 @@ func (r *Replica) newBatchedEngine( // safe as we're only ever writing at timestamps higher than the timestamp // any write latch would be declared at. But because of this, we don't // assert on access timestamps using spanset.NewBatchAt. - batch = spanset.NewBatch(batch, latchSpans) + batch = spanset.NewBatch(batch, g.LatchSpans()) } return batch, opLogger } diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 4064b0b608e3..1d8fdacd8317 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -1472,7 +1472,7 @@ func PrepareTransactionForRetry( // Use the priority communicated back by the server. txn.Priority = errTxnPri case *ReadWithinUncertaintyIntervalError: - txn.WriteTimestamp.Forward(readWithinUncertaintyIntervalRetryTimestamp(tErr)) + txn.WriteTimestamp.Forward(tErr.RetryTimestamp()) case *TransactionPushError: // Increase timestamp if applicable, ensuring that we're just ahead of // the pushee. @@ -1503,7 +1503,7 @@ func PrepareTransactionForRetry( } case *WriteTooOldError: // Increase the timestamp to the ts at which we've actually written. - txn.WriteTimestamp.Forward(writeTooOldRetryTimestamp(tErr)) + txn.WriteTimestamp.Forward(tErr.RetryTimestamp()) default: log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr) } @@ -1537,52 +1537,15 @@ func TransactionRefreshTimestamp(pErr *Error) (bool, hlc.Timestamp) { // error, obviously the refresh will fail. It might be worth trying to // detect these cases and save the futile attempt; we'd need to have access // to the key that generated the error. - timestamp.Forward(writeTooOldRetryTimestamp(err)) + timestamp.Forward(err.RetryTimestamp()) case *ReadWithinUncertaintyIntervalError: - timestamp.Forward(readWithinUncertaintyIntervalRetryTimestamp(err)) + timestamp.Forward(err.RetryTimestamp()) default: return false, hlc.Timestamp{} } return true, timestamp } -func readWithinUncertaintyIntervalRetryTimestamp( - err *ReadWithinUncertaintyIntervalError, -) hlc.Timestamp { - // If the reader encountered a newer write within the uncertainty interval, - // we advance the txn's timestamp just past the uncertain value's timestamp. - // This ensures that we read above the uncertain value on a retry. - ts := err.ExistingTimestamp.Next() - // In addition to advancing past the uncertainty value's timestamp, we also - // advance the txn's timestamp up to the local uncertainty limit on the node - // which hit the error. This ensures that no future read after the retry on - // this node (ignoring lease complications in ComputeLocalUncertaintyLimit - // and values with synthetic timestamps) will throw an uncertainty error, - // even when reading other keys. - // - // Note that if the request was not able to establish a local uncertainty - // limit due to a missing observed timestamp (for instance, if the request - // was evaluated on a follower replica and the txn had never visited the - // leaseholder), then LocalUncertaintyLimit will be empty and the Forward - // will be a no-op. In this case, we could advance all the way past the - // global uncertainty limit, but this time would likely be in the future, so - // this would necessitate a commit-wait period after committing. - // - // In general, we expect the local uncertainty limit, if set, to be above - // the uncertainty value's timestamp. So we expect this Forward to advance - // ts. However, this is not always the case. The one exception is if the - // uncertain value had a synthetic timestamp, so it was compared against the - // global uncertainty limit to determine uncertainty (see IsUncertain). In - // such cases, we're ok advancing just past the value's timestamp. Either - // way, we won't see the same value in our uncertainty interval on a retry. - ts.Forward(err.LocalUncertaintyLimit) - return ts -} - -func writeTooOldRetryTimestamp(err *WriteTooOldError) hlc.Timestamp { - return err.ActualTimestamp -} - // Replicas returns all of the replicas present in the descriptor after this // trigger applies. func (crt ChangeReplicasTrigger) Replicas() []ReplicaDescriptor { diff --git a/pkg/roachpb/errors.go b/pkg/roachpb/errors.go index 78cb10e18733..8ce4c5fe509a 100644 --- a/pkg/roachpb/errors.go +++ b/pkg/roachpb/errors.go @@ -949,6 +949,12 @@ func (e *WriteTooOldError) Type() ErrorDetailType { return WriteTooOldErrType } +// RetryTimestamp returns the timestamp that should be used to retry an +// operation after encountering a WriteTooOldError. +func (e *WriteTooOldError) RetryTimestamp() hlc.Timestamp { + return e.ActualTimestamp +} + var _ ErrorDetailInterface = &WriteTooOldError{} var _ transactionRestartError = &WriteTooOldError{} @@ -1009,6 +1015,39 @@ func (*ReadWithinUncertaintyIntervalError) canRestartTransaction() TransactionRe return TransactionRestart_IMMEDIATE } +// RetryTimestamp returns the timestamp that should be used to retry an +// operation after encountering a ReadWithinUncertaintyIntervalError. +func (e *ReadWithinUncertaintyIntervalError) RetryTimestamp() hlc.Timestamp { + // If the reader encountered a newer write within the uncertainty interval, + // we advance the txn's timestamp just past the uncertain value's timestamp. + // This ensures that we read above the uncertain value on a retry. + ts := e.ExistingTimestamp.Next() + // In addition to advancing past the uncertainty value's timestamp, we also + // advance the txn's timestamp up to the local uncertainty limit on the node + // which hit the error. This ensures that no future read after the retry on + // this node (ignoring lease complications in ComputeLocalUncertaintyLimit + // and values with synthetic timestamps) will throw an uncertainty error, + // even when reading other keys. + // + // Note that if the request was not able to establish a local uncertainty + // limit due to a missing observed timestamp (for instance, if the request + // was evaluated on a follower replica and the txn had never visited the + // leaseholder), then LocalUncertaintyLimit will be empty and the Forward + // will be a no-op. In this case, we could advance all the way past the + // global uncertainty limit, but this time would likely be in the future, so + // this would necessitate a commit-wait period after committing. + // + // In general, we expect the local uncertainty limit, if set, to be above + // the uncertainty value's timestamp. So we expect this Forward to advance + // ts. However, this is not always the case. The one exception is if the + // uncertain value had a synthetic timestamp, so it was compared against the + // global uncertainty limit to determine uncertainty (see IsUncertain). In + // such cases, we're ok advancing just past the value's timestamp. Either + // way, we won't see the same value in our uncertainty interval on a retry. + ts.Forward(e.LocalUncertaintyLimit) + return ts +} + var _ ErrorDetailInterface = &ReadWithinUncertaintyIntervalError{} var _ transactionRestartError = &ReadWithinUncertaintyIntervalError{} diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index a8b229b97e97..f4edd908ed54 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -1145,39 +1145,25 @@ func TestReacquireLeaseOnRestart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - advancement := 2 * base.DefaultDescriptorLeaseDuration - - var cmdFilters tests.CommandFilters - cmdFilters.AppendFilter(tests.CheckEndTxnTrigger, true) - - testKey := []byte("test_key") - storeTestingKnobs := &kvserver.StoreTestingKnobs{ - EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ - TestingEvalFilter: cmdFilters.RunFilters, - }, - DisableMaxOffsetCheck: true, - } - const refreshAttempts = 3 clientTestingKnobs := &kvcoord.ClientTestingKnobs{ MaxTxnRefreshAttempts: refreshAttempts, } - params, _ := tests.CreateTestServerParams() - params.Knobs.Store = storeTestingKnobs - params.Knobs.KVClient = clientTestingKnobs - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(context.Background()) - + testKey := []byte("test_key") + var s serverutils.TestServerInterface var clockUpdate, restartDone int32 - cleanupFilter := cmdFilters.AppendFilter( - func(args kvserverbase.FilterArgs) *roachpb.Error { - if req, ok := args.Req.(*roachpb.GetRequest); ok { + testingResponseFilter := func( + ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, + ) *roachpb.Error { + for _, ru := range ba.Requests { + if req := ru.GetGet(); req != nil { if bytes.Contains(req.Key, testKey) && !kv.TestingIsRangeLookupRequest(req) { if atomic.LoadInt32(&clockUpdate) == 0 { atomic.AddInt32(&clockUpdate, 1) // Hack to advance the transaction timestamp on a // transaction restart. + const advancement = 2 * base.DefaultDescriptorLeaseDuration now := s.Clock().NowAsClockTimestamp() now.WallTime += advancement.Nanoseconds() s.Clock().Update(now) @@ -1189,7 +1175,7 @@ func TestReacquireLeaseOnRestart(t *testing.T) { atomic.AddInt32(&restartDone, 1) // Return ReadWithinUncertaintyIntervalError to update // the transaction timestamp on retry. - txn := args.Hdr.Txn + txn := ba.Txn txn.ResetObservedTimestamps() now := s.Clock().NowAsClockTimestamp() txn.UpdateObservedTimestamp(s.(*server.TestServer).Gossip().NodeID.Get(), now) @@ -1197,9 +1183,22 @@ func TestReacquireLeaseOnRestart(t *testing.T) { } } } - return nil - }, false) - defer cleanupFilter() + } + return nil + } + storeTestingKnobs := &kvserver.StoreTestingKnobs{ + // We use a TestingResponseFilter to avoid server-side refreshes of the + // ReadWithinUncertaintyIntervalError that the filter returns. + TestingResponseFilter: testingResponseFilter, + DisableMaxOffsetCheck: true, + } + + params, _ := tests.CreateTestServerParams() + params.Knobs.Store = storeTestingKnobs + params.Knobs.KVClient = clientTestingKnobs + var sqlDB *gosql.DB + s, sqlDB, _ = serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.Background()) sqlDB.SetMaxOpenConns(1) if _, err := sqlDB.Exec(`