diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 7bd1da8fe740..be9e0a0e9bde 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2262,9 +2262,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.Run(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("a"))), - // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + filter: newUncertaintyFilter(roachpb.Key("a")), + // We expect the request to succeed after a server-side retry. + txnCoordRetry: false, }, { // Even if accounting for the refresh spans would have exhausted the @@ -2952,8 +2952,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("a"))), - txnCoordRetry: true, + filter: newUncertaintyFilter(roachpb.Key("a")), + // We expect the request to succeed after a server-side retry. + txnCoordRetry: false, }, { name: "cput within uncertainty interval after timestamp leaked", @@ -2963,7 +2964,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("a"))), + filter: newUncertaintyFilter(roachpb.Key("a")), clientRetry: true, tsLeaked: true, }, @@ -2984,7 +2985,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))), + filter: newUncertaintyFilter(roachpb.Key("ac")), txnCoordRetry: true, }, { @@ -3007,7 +3008,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return nil }, - filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))), + filter: newUncertaintyFilter(roachpb.Key("ac")), clientRetry: true, // note this txn is read-only but still restarts }, { @@ -3023,7 +3024,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("c"))), + filter: newUncertaintyFilter(roachpb.Key("c")), txnCoordRetry: true, }, { @@ -3045,7 +3046,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("a"))), + filter: newUncertaintyFilter(roachpb.Key("a")), clientRetry: true, // will fail because of conflict on refresh span for the Get }, { @@ -3059,7 +3060,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key([]byte("c"))), + filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. txnCoordRetry: true, }, @@ -3069,7 +3070,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.Scan(ctx, "a", "d", 0) return err }, - filter: newUncertaintyFilter(roachpb.Key([]byte("c"))), + filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. txnCoordRetry: true, }, @@ -3079,7 +3080,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.DelRange(ctx, "a", "d", false /* returnKeys */) return err }, - filter: newUncertaintyFilter(roachpb.Key([]byte("c"))), + filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. txnCoordRetry: true, }, diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index 2089a348e7cd..5f63d0a05265 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -658,6 +658,26 @@ func (g *Guard) AssertNoLatches() { } } +// IsolatedAtLaterTimestamps returns whether the request holding the guard would +// continue to be isolated from other requests / transactions even if it were to +// increase its request timestamp while evaluating. If the method returns false, +// the concurrency guard must be dropped and re-acquired with the new timestamp +// before the request can evaluate at that later timestamp. +func (g *Guard) IsolatedAtLaterTimestamps() bool { + // If the request acquired any read latches with bounded (MVCC) timestamps + // then it can not trivially bump its timestamp without dropping and + // re-acquiring those latches. Doing so could allow the request to read at an + // unprotected timestamp. We only look at global latch spans because local + // latch spans always use unbounded (NonMVCC) timestamps. + return len(g.Req.LatchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) == 0 && + // Similarly, if the request declared any global or local read lock spans + // then it can not trivially bump its timestamp without dropping its + // lockTableGuard and re-scanning the lockTable. Doing so could allow the + // request to conflict with locks that it previously did not conflict with. + len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) == 0 && + len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanLocal)) == 0 +} + // CheckOptimisticNoConflicts checks that the {latch,lock}SpansRead do not // have a conflicting latch, lock. func (g *Guard) CheckOptimisticNoConflicts( diff --git a/pkg/kv/kvserver/replica_batch_updates.go b/pkg/kv/kvserver/replica_batch_updates.go index 2c1bd69904e0..52ea4f37d9fb 100644 --- a/pkg/kv/kvserver/replica_batch_updates.go +++ b/pkg/kv/kvserver/replica_batch_updates.go @@ -14,7 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -183,7 +183,7 @@ func maybeStripInFlightWrites(ba *roachpb.BatchRequest) (*roachpb.BatchRequest, // works for batches that exclusively contain writes; reads cannot be bumped // like this because they've already acquired timestamp-aware latches. func maybeBumpReadTimestampToWriteTimestamp( - ctx context.Context, ba *roachpb.BatchRequest, latchSpans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, ) bool { if ba.Txn == nil { return false @@ -202,53 +202,43 @@ func maybeBumpReadTimestampToWriteTimestamp( if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) { return false } - return tryBumpBatchTimestamp(ctx, ba, ba.Txn.WriteTimestamp, latchSpans) + return tryBumpBatchTimestamp(ctx, ba, g, ba.Txn.WriteTimestamp) } // tryBumpBatchTimestamp attempts to bump ba's read and write timestamps to ts. // +// This function is called both below and above latching, which is indicated by +// the concurrency guard argument. The concurrency guard, if not nil, indicates +// that the caller is holding latches and cannot adjust its timestamp beyond the +// limits of what is protected by those latches. If the concurrency guard is +// nil, the caller indicates that it is not holding latches and can therefore +// more freely adjust its timestamp because it will re-acquire latches at +// whatever timestamp the batch is bumped to. +// // Returns true if the timestamp was bumped. Returns false if the timestamp could // not be bumped. func tryBumpBatchTimestamp( - ctx context.Context, ba *roachpb.BatchRequest, ts hlc.Timestamp, latchSpans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, ts hlc.Timestamp, ) bool { - if len(latchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) > 0 { - // If the batch acquired any read latches with bounded (MVCC) timestamps - // then we can not trivially bump the batch's timestamp without dropping - // and re-acquiring those latches. Doing so could allow the request to - // read at an unprotected timestamp. We only look at global latch spans - // because local latch spans always use unbounded (NonMVCC) timestamps. - // - // NOTE: even if we hold read latches with high enough timestamps to - // fully cover ("protect") the batch at the new timestamp, we still - // don't want to allow the bump. This is because a batch with read spans - // and a higher timestamp may now conflict with locks that it previously - // did not. However, server-side retries don't re-scan the lock table. - // This can lead to requests missing unreplicated locks in the lock - // table that they should have seen or discovering replicated intents in - // MVCC that they should not have seen (from the perspective of the lock - // table's AddDiscoveredLock method). - // - // NOTE: we could consider adding a retry-loop above the latch - // acquisition to allow this to be retried, but given that we try not to - // mix read-only and read-write requests, doing so doesn't seem worth - // it. + if g != nil && !g.IsolatedAtLaterTimestamps() { return false } if ts.Less(ba.Timestamp) { log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp) } - ba.Timestamp = ts if ba.Txn == nil { + log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp) + ba.Timestamp = ts return true } if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) { log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+ "ba.Txn.WriteTimestamp: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp) } - log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s)", + log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp) ba.Txn = ba.Txn.Clone() ba.Txn.Refresh(ts) + ba.Timestamp = ba.Txn.ReadTimestamp // Refresh just updated ReadTimestamp return true } diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index a58bbbf91814..dfee7f2071e1 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -16,8 +16,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -518,8 +518,16 @@ func evaluateCommand( // for transactional requests, retrying is possible if the transaction had not // performed any prior reads that need refreshing. // +// This function is called both below and above latching, which is indicated by +// the concurrency guard argument. The concurrency guard, if not nil, indicates +// that the caller is holding latches and cannot adjust its timestamp beyond the +// limits of what is protected by those latches. If the concurrency guard is +// nil, the caller indicates that it is not holding latches and can therefore +// more freely adjust its timestamp because it will re-acquire latches at +// whatever timestamp the batch is bumped to. +// // deadline, if not nil, specifies the highest timestamp (exclusive) at which -// the request can be evaluated. If ba is a transactional request, then dealine +// the request can be evaluated. If ba is a transactional request, then deadline // cannot be specified; a transaction's deadline comes from it's EndTxn request. // // If true is returned, ba and ba.Txn will have been updated with the new @@ -529,7 +537,7 @@ func canDoServersideRetry( pErr *roachpb.Error, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, deadline *hlc.Timestamp, ) bool { if ba.Txn != nil { @@ -548,17 +556,6 @@ func canDoServersideRetry( var newTimestamp hlc.Timestamp if ba.Txn != nil { if pErr != nil { - // TODO(nvanbenschoten): This is intentionally not allowing server-side - // refreshes of ReadWithinUncertaintyIntervalErrors for now, even though - // that is the eventual goal here. Lifting that limitation will likely - // need to be accompanied by an above-latching retry loop, because read - // latches will usually prevent below-latch retries of - // ReadWithinUncertaintyIntervalErrors. See the comment in - // tryBumpBatchTimestamp. - if _, ok := pErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); ok { - return false - } - var ok bool ok, newTimestamp = roachpb.TransactionRefreshTimestamp(pErr) if !ok { @@ -576,7 +573,12 @@ func canDoServersideRetry( } switch tErr := pErr.GetDetail().(type) { case *roachpb.WriteTooOldError: - newTimestamp = tErr.ActualTimestamp + newTimestamp = tErr.RetryTimestamp() + + // TODO(nvanbenschoten): give non-txn requests uncertainty intervals. #73732. + //case *roachpb.ReadWithinUncertaintyIntervalError: + // newTimestamp = tErr.RetryTimestamp() + default: return false } @@ -585,5 +587,5 @@ func canDoServersideRetry( if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) { return false } - return tryBumpBatchTimestamp(ctx, ba, newTimestamp, latchSpans) + return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp) } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index effe9fcdb977..7a940b4440a6 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -21,10 +21,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -773,7 +773,7 @@ func (r *Replica) evaluateProposal( idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, ) (*result.Result, bool, *roachpb.Error) { if ba.Timestamp.IsEmpty() { return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp") @@ -789,7 +789,7 @@ func (r *Replica) evaluateProposal( // // TODO(tschottdorf): absorb all returned values in `res` below this point // in the call stack as well. - batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, latchSpans) + batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, g) // Note: reusing the proposer's batch when applying the command on the // proposer was explored as an optimization but resulted in no performance @@ -875,18 +875,15 @@ func (r *Replica) evaluateProposal( // evaluating it. The returned ProposalData is partially valid even // on a non-nil *roachpb.Error and should be proposed through Raft // if ProposalData.command is non-nil. -// -// TODO(nvanbenschoten): combine idKey, ba, and latchSpans into a -// `serializedRequest` struct. func (r *Replica) requestToProposal( ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, st kvserverpb.LeaseStatus, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, ) (*ProposalData, *roachpb.Error) { - res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, latchSpans) + res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, g) // Fill out the results even if pErr != nil; we'll return the error below. proposal := &ProposalData{ diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index a3b824a26604..9084a612a453 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -107,7 +107,7 @@ func (r *Replica) evalAndPropose( ) (chan proposalResult, func(), kvserverbase.CmdIDKey, *roachpb.Error) { defer tok.DoneIfNotMoved(ctx) idKey := makeIDKey() - proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, ui, g.LatchSpans()) + proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, ui, g) log.Event(proposal.ctx, "evaluated request") // If the request hit a server-side concurrency retry error, immediately diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index c7050b1a66e0..21cf12b33c61 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -49,8 +49,7 @@ func (r *Replica) executeReadOnlyBatch( ui := uncertainty.ComputeInterval(ba.Txn, st) // Evaluate read-only batch command. - spans := g.LatchSpans() - rec := NewReplicaEvalContext(r, spans) + rec := NewReplicaEvalContext(r, g.LatchSpans()) // TODO(irfansharif): It's unfortunate that in this read-only code path, // we're stuck with a ReadWriter because of the way evaluateBatch is @@ -62,7 +61,7 @@ func (r *Replica) executeReadOnlyBatch( panic("expected consistent iterators") } if util.RaceEnabled { - rw = spanset.NewReadWriterAt(rw, spans, ba.Timestamp) + rw = spanset.NewReadWriterAt(rw, g.LatchSpans(), ba.Timestamp) } defer rw.Close() @@ -81,9 +80,7 @@ func (r *Replica) executeReadOnlyBatch( // the latches are released. var result result.Result - br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes( - ctx, rw, rec, ba, ui, spans, - ) + br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, ui, g) // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. @@ -237,7 +234,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( rec batcheval.EvalContext, ba *roachpb.BatchRequest, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, ) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { log.Event(ctx, "executing read-only batch") @@ -290,7 +287,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, ui, true /* readOnly */) // If we can retry, set a higher batch timestamp and continue. // Allow one retry only. - if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, latchSpans, nil /* deadline */) { + if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, nil /* deadline */) { break } } diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 1a92fda9cf5f..31c6b736a76b 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -400,17 +400,9 @@ var _ batchExecutionFn = (*Replica).executeReadOnlyBatch func (r *Replica) executeBatchWithConcurrencyRetries( ctx context.Context, ba *roachpb.BatchRequest, fn batchExecutionFn, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { - // Determine the maximal set of key spans that the batch will operate on. - // This is used below to sequence the request in the concurrency manager. - latchSpans, lockSpans, requestEvalKind, err := r.collectSpans(ba) - if err != nil { - return nil, roachpb.NewError(err) - } - - // Handle load-based splitting. - r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans) - // Try to execute command; exit retry loop on success. + var latchSpans, lockSpans *spanset.SpanSet + var requestEvalKind concurrency.RequestEvalKind var g *concurrency.Guard defer func() { // NB: wrapped to delay g evaluation to its value when returning. @@ -418,12 +410,31 @@ func (r *Replica) executeBatchWithConcurrencyRetries( r.concMgr.FinishReq(g) } }() - for { + for first := true; ; first = false { // Exit loop if context has been canceled or timed out. if err := ctx.Err(); err != nil { return nil, roachpb.NewError(errors.Wrap(err, "aborted during Replica.Send")) } + // Determine the maximal set of key spans that the batch will operate on. + // This is used below to sequence the request in the concurrency manager. + // + // Only do so if the latchSpans and lockSpans are not being preserved from a + // prior iteration, either directly or in a concurrency guard that we intend + // to re-use during sequencing. + if latchSpans == nil && g == nil { + var err error + latchSpans, lockSpans, requestEvalKind, err = r.collectSpans(ba) + if err != nil { + return nil, roachpb.NewError(err) + } + } + + // Handle load-based splitting, if necessary. + if first { + r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans) + } + // Acquire latches to prevent overlapping requests from executing until // this request completes. After latching, wait on any conflicting locks // to ensure that the request has full isolation during evaluation. This @@ -495,6 +506,25 @@ func (r *Replica) executeBatchWithConcurrencyRetries( if pErr = r.handleIndeterminateCommitError(ctx, ba, pErr, t); pErr != nil { return nil, pErr } + case *roachpb.ReadWithinUncertaintyIntervalError: + // Drop latches and lock wait-queues. + r.concMgr.FinishReq(g) + g = nil + // If the batch is able to perform a server-side retry in order to avoid + // the uncertainty error, it will have a new timestamp. Force a refresh of + // the latch and lock spans. + latchSpans, lockSpans = nil, nil + // Attempt to adjust the batch's timestamp to avoid the uncertainty error + // and allow for a server-side retry. For transactional requests, there + // are strict conditions that must be met for this to be permitted. For + // non-transactional requests, this is always allowed. If successful, an + // updated BatchRequest will be returned. If unsuccessful, the provided + // read within uncertainty interval error will be returned so that we can + // propagate it. + ba, pErr = r.handleReadWithinUncertaintyIntervalError(ctx, ba, pErr, t) + if pErr != nil { + return nil, pErr + } case *roachpb.InvalidLeaseError: // Drop latches and lock wait-queues. latchSpans, lockSpans = g.TakeSpanSets() @@ -555,6 +585,34 @@ func isConcurrencyRetryError(pErr *roachpb.Error) bool { // the pushee is aborted or committed, so the request must kick off the // "transaction recovery procedure" to resolve this ambiguity before // retrying. + case *roachpb.ReadWithinUncertaintyIntervalError: + // If a request hits a ReadWithinUncertaintyIntervalError, it was performing + // a non-locking read [1] and encountered a (committed or provisional) write + // within the uncertainty interval of the reader. Depending on the state of + // the request (see conditions in canDoServersideRetry), it may be able to + // adjust its timestamp and retry on the server. + // + // This is similar to other server-side retries that we allow below + // latching, like for WriteTooOld errors. However, because uncertainty + // errors are specific to non-locking reads, they can not [2] be retried + // without first dropping and re-acquiring their read latches at a higher + // timestamp. This is unfortunate for uncertainty errors, as it leads to + // some extra work. + // + // On the other hand, it is more important for other forms of retry errors + // to be handled without dropping latches because they could be starved by + // repeated conflicts. For instance, if WriteTooOld errors caused a write + // request to drop and re-acquire latches, it is possible that the request + // could return after each retry to find a new WriteTooOld conflict, never + // managing to complete. This is not the case for uncertainty errors, which + // can not occur indefinitely. A request (transactional or otherwise) has a + // fixed uncertainty window and, once exhausted, will never hit an + // uncertainty error again. + // + // [1] if a locking read observes a write at a later timestamp, it returns a + // WriteTooOld error. It's uncertainty interval does not matter. + // [2] in practice, this is enforced by tryBumpBatchTimestamp's call to + // (*concurrency.Guard).IsolatedAtLaterTimestamps. case *roachpb.InvalidLeaseError: // If a request hits an InvalidLeaseError, the replica it is being // evaluated against does not have a valid lease under which it can @@ -699,6 +757,36 @@ func (r *Replica) handleIndeterminateCommitError( return nil } +func (r *Replica) handleReadWithinUncertaintyIntervalError( + ctx context.Context, + ba *roachpb.BatchRequest, + pErr *roachpb.Error, + t *roachpb.ReadWithinUncertaintyIntervalError, +) (*roachpb.BatchRequest, *roachpb.Error) { + // Attempt a server-side retry of the request. Note that we pass nil for + // latchSpans, because we have already released our latches and plan to + // re-acquire them if the retry is allowed. + if !canDoServersideRetry(ctx, pErr, ba, nil /* br */, nil /* g */, nil /* deadline */) { + return nil, pErr + } + // TODO(nvanbenschoten): give non-txn requests uncertainty intervals. #73732. + //if ba.Txn == nil && ba.Timestamp.Synthetic { + // // If the request is non-transactional and it was refreshed into the future + // // after observing a value with a timestamp in the future, immediately sleep + // // until its new read timestamp becomes present. We don't need to do this + // // for transactional requests because they will do this during their + // // commit-wait sleep after committing. + // // + // // See TxnCoordSender.maybeCommitWait for a discussion about why doing this + // // is necessary to preserve real-time ordering for transactions that write + // // into the future. + // if err := r.Clock().SleepUntil(ctx, ba.Timestamp); err != nil { + // return nil, roachpb.NewError(err) + // } + //} + return ba, nil +} + func (r *Replica) handleInvalidLeaseError( ctx context.Context, ba *roachpb.BatchRequest, ) *roachpb.Error { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 42553b6965d0..36cb73be835e 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -475,7 +475,7 @@ func TestIsOnePhaseCommit(t *testing.T) { // Emulate what a server actually does and bump the write timestamp when // possible. This makes some batches with diverged read and write // timestamps pass isOnePhaseCommit(). - maybeBumpReadTimestampToWriteTimestamp(ctx, &ba, &spanset.SpanSet{}) + maybeBumpReadTimestampToWriteTimestamp(ctx, &ba, allSpansGuard()) if is1PC := isOnePhaseCommit(&ba); is1PC != c.exp1PC { t.Errorf("expected 1pc=%t; got %t", c.exp1PC, is1PC) @@ -8079,7 +8079,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ba.Timestamp = tc.Clock().Now() ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}}) st := r.CurrentLeaseStatus(ctx) - cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, uncertainty.Interval{}, allSpans()) + cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, uncertainty.Interval{}, allSpansGuard()) if pErr != nil { t.Fatal(pErr) } @@ -8201,7 +8201,7 @@ func TestReplicaRefreshMultiple(t *testing.T) { incCmdID = makeIDKey() atomic.StoreInt32(&filterActive, 1) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpans()) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpansGuard()) if pErr != nil { t.Fatal(pErr) } @@ -8734,7 +8734,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { assignSeqNumsForReqs(txn, &txnPut, &txnPut2) origTxn := txn.Clone() - batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, uncertainty.Interval{}, allSpans()) + batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, uncertainty.Interval{}, allSpansGuard()) defer batch.Close() if pErr != nil { t.Fatal(pErr) @@ -10695,6 +10695,72 @@ func TestReplicaServersideRefreshes(t *testing.T) { // EndTransaction. This is hard to do at the moment, though, because we // never defer the handling of the write too old conditions to the end of // the transaction (but we might in the future). + { + name: "serverside-refresh of read within uncertainty interval error on get in non-1PC txn", + setupFn: func() (hlc.Timestamp, error) { + return put("a", "put") + }, + batchFn: func(ts hlc.Timestamp) (ba roachpb.BatchRequest, expTS hlc.Timestamp) { + expTS = ts.Next() + ts = ts.Prev() + ba.Txn = newTxn("a", ts) + ba.Txn.GlobalUncertaintyLimit = expTS + ba.CanForwardReadTimestamp = true // necessary to indicate serverside-refresh is possible + get := getArgs(roachpb.Key("a")) + ba.Add(&get) + return + }, + }, + { + name: "serverside-refresh of read within uncertainty interval error on get in non-1PC txn with prior reads", + setupFn: func() (hlc.Timestamp, error) { + return put("a", "put") + }, + batchFn: func(ts hlc.Timestamp) (ba roachpb.BatchRequest, expTS hlc.Timestamp) { + ts = ts.Prev() + ba.Txn = newTxn("a", ts) + ba.Txn.GlobalUncertaintyLimit = ts.Next() + get := getArgs(roachpb.Key("a")) + ba.Add(&get) + return + }, + expErr: "ReadWithinUncertaintyIntervalError", + }, + { + name: "serverside-refresh of read within uncertainty interval error on get in 1PC txn", + setupFn: func() (hlc.Timestamp, error) { + return put("a", "put") + }, + batchFn: func(ts hlc.Timestamp) (ba roachpb.BatchRequest, expTS hlc.Timestamp) { + expTS = ts.Next() + ts = ts.Prev() + ba.Txn = newTxn("a", ts) + ba.Txn.GlobalUncertaintyLimit = expTS + ba.CanForwardReadTimestamp = true // necessary to indicate serverside-refresh is possible + get := getArgs(roachpb.Key("a")) + et, _ := endTxnArgs(ba.Txn, true /* commit */) + ba.Add(&get, &et) + assignSeqNumsForReqs(ba.Txn, &get, &et) + return + }, + }, + { + name: "serverside-refresh of read within uncertainty interval error on get in 1PC txn with prior reads", + setupFn: func() (hlc.Timestamp, error) { + return put("a", "put") + }, + batchFn: func(ts hlc.Timestamp) (ba roachpb.BatchRequest, expTS hlc.Timestamp) { + ts = ts.Prev() + ba.Txn = newTxn("a", ts) + ba.Txn.GlobalUncertaintyLimit = ts.Next() + get := getArgs(roachpb.Key("a")) + et, _ := endTxnArgs(ba.Txn, true /* commit */) + ba.Add(&get, &et) + assignSeqNumsForReqs(ba.Txn, &get, &et) + return + }, + expErr: "ReadWithinUncertaintyIntervalError", + }, } for _, test := range testCases { @@ -12939,7 +13005,7 @@ func TestContainsEstimatesClampProposal(t *testing.T) { ba.Timestamp = tc.Clock().Now() req := putArgs(roachpb.Key("some-key"), []byte("some-value")) ba.Add(&req) - proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpans()) + proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpansGuard()) if err != nil { t.Error(err) } diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 110b6124324c..276a07aef347 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -316,7 +316,7 @@ func (r *Replica) executeWriteBatch( // canAttempt1PCEvaluation looks at the batch and decides whether it can be // executed as 1PC. func (r *Replica) canAttempt1PCEvaluation( - ctx context.Context, ba *roachpb.BatchRequest, latchSpans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, ) bool { if !isOnePhaseCommit(ba) { return false @@ -343,7 +343,7 @@ func (r *Replica) canAttempt1PCEvaluation( ba.Txn.WriteTimestamp = minCommitTS // We can only evaluate at the new timestamp if we manage to bump the read // timestamp. - return maybeBumpReadTimestampToWriteTimestamp(ctx, ba, latchSpans) + return maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) } return true } @@ -362,19 +362,19 @@ func (r *Replica) evaluateWriteBatch( idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, ) (storage.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) { log.Event(ctx, "executing read-write batch") // If the transaction has been pushed but it can commit at the higher // timestamp, let's evaluate the batch at the bumped timestamp. This will // allow it commit, and also it'll allow us to attempt the 1PC code path. - maybeBumpReadTimestampToWriteTimestamp(ctx, ba, latchSpans) + maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) // Attempt 1PC execution, if applicable. If not transactional or there are // indications that the batch's txn will require retry, execute as normal. - if r.canAttempt1PCEvaluation(ctx, ba, latchSpans) { - res := r.evaluate1PC(ctx, idKey, ba, latchSpans) + if r.canAttempt1PCEvaluation(ctx, ba, g) { + res := r.evaluate1PC(ctx, idKey, ba, g) switch res.success { case onePCSucceeded: return res.batch, res.stats, res.br, res.res, nil @@ -403,9 +403,9 @@ func (r *Replica) evaluateWriteBatch( } ms := new(enginepb.MVCCStats) - rec := NewReplicaEvalContext(r, latchSpans) + rec := NewReplicaEvalContext(r, g.LatchSpans()) batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes( - ctx, idKey, rec, ms, ba, ui, latchSpans, nil /* deadline */) + ctx, idKey, rec, ms, ba, ui, g, nil /* deadline */) return batch, *ms, br, res, pErr } @@ -445,10 +445,7 @@ type onePCResult struct { // efficient - we're avoiding writing the transaction record and writing and the // immediately deleting intents. func (r *Replica) evaluate1PC( - ctx context.Context, - idKey kvserverbase.CmdIDKey, - ba *roachpb.BatchRequest, - latchSpans *spanset.SpanSet, + ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, g *concurrency.Guard, ) (onePCRes onePCResult) { log.VEventf(ctx, 2, "attempting 1PC execution") @@ -471,7 +468,7 @@ func (r *Replica) evaluate1PC( // Is this relying on the batch being write-only? ui := uncertainty.Interval{} - rec := NewReplicaEvalContext(r, latchSpans) + rec := NewReplicaEvalContext(r, g.LatchSpans()) var br *roachpb.BatchResponse var res result.Result var pErr *roachpb.Error @@ -483,10 +480,10 @@ func (r *Replica) evaluate1PC( ms := new(enginepb.MVCCStats) if ba.CanForwardReadTimestamp { batch, br, res, pErr = r.evaluateWriteBatchWithServersideRefreshes( - ctx, idKey, rec, ms, &strippedBa, ui, latchSpans, etArg.Deadline) + ctx, idKey, rec, ms, &strippedBa, ui, g, etArg.Deadline) } else { batch, br, res, pErr = r.evaluateWriteBatchWrapper( - ctx, idKey, rec, ms, &strippedBa, ui, latchSpans) + ctx, idKey, rec, ms, &strippedBa, ui, g) } if pErr != nil || (!ba.CanForwardReadTimestamp && ba.Timestamp != br.Timestamp) { @@ -577,7 +574,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, deadline *hlc.Timestamp, ) (batch storage.Batch, br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { goldenMS := *ms @@ -591,7 +588,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( batch.Close() } - batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, ui, latchSpans) + batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, ui, g) var success bool if pErr == nil { @@ -604,7 +601,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( // If we can retry, set a higher batch timestamp and continue. // Allow one retry only; a non-txn batch containing overlapping // spans will always experience WriteTooOldError. - if success || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, latchSpans, deadline) { + if success || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, deadline) { break } } @@ -620,9 +617,9 @@ func (r *Replica) evaluateWriteBatchWrapper( ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, ui uncertainty.Interval, - latchSpans *spanset.SpanSet, + g *concurrency.Guard, ) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) { - batch, opLogger := r.newBatchedEngine(ba, latchSpans) + batch, opLogger := r.newBatchedEngine(ba, g) br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, ui, false /* readOnly */) if pErr == nil { if opLogger != nil { @@ -639,7 +636,7 @@ func (r *Replica) evaluateWriteBatchWrapper( // OpLogger is attached to the returned engine.Batch, recording all operations. // Its recording should be attached to the Result of request evaluation. func (r *Replica) newBatchedEngine( - ba *roachpb.BatchRequest, latchSpans *spanset.SpanSet, + ba *roachpb.BatchRequest, g *concurrency.Guard, ) (storage.Batch, *storage.OpLoggerBatch) { batch := r.store.Engine().NewBatch() if !batch.ConsistentIterators() { @@ -690,7 +687,7 @@ func (r *Replica) newBatchedEngine( // safe as we're only ever writing at timestamps higher than the timestamp // any write latch would be declared at. But because of this, we don't // assert on access timestamps using spanset.NewBatchAt. - batch = spanset.NewBatch(batch, latchSpans) + batch = spanset.NewBatch(batch, g.LatchSpans()) } return batch, opLogger } diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 4064b0b608e3..1d8fdacd8317 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -1472,7 +1472,7 @@ func PrepareTransactionForRetry( // Use the priority communicated back by the server. txn.Priority = errTxnPri case *ReadWithinUncertaintyIntervalError: - txn.WriteTimestamp.Forward(readWithinUncertaintyIntervalRetryTimestamp(tErr)) + txn.WriteTimestamp.Forward(tErr.RetryTimestamp()) case *TransactionPushError: // Increase timestamp if applicable, ensuring that we're just ahead of // the pushee. @@ -1503,7 +1503,7 @@ func PrepareTransactionForRetry( } case *WriteTooOldError: // Increase the timestamp to the ts at which we've actually written. - txn.WriteTimestamp.Forward(writeTooOldRetryTimestamp(tErr)) + txn.WriteTimestamp.Forward(tErr.RetryTimestamp()) default: log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr) } @@ -1537,52 +1537,15 @@ func TransactionRefreshTimestamp(pErr *Error) (bool, hlc.Timestamp) { // error, obviously the refresh will fail. It might be worth trying to // detect these cases and save the futile attempt; we'd need to have access // to the key that generated the error. - timestamp.Forward(writeTooOldRetryTimestamp(err)) + timestamp.Forward(err.RetryTimestamp()) case *ReadWithinUncertaintyIntervalError: - timestamp.Forward(readWithinUncertaintyIntervalRetryTimestamp(err)) + timestamp.Forward(err.RetryTimestamp()) default: return false, hlc.Timestamp{} } return true, timestamp } -func readWithinUncertaintyIntervalRetryTimestamp( - err *ReadWithinUncertaintyIntervalError, -) hlc.Timestamp { - // If the reader encountered a newer write within the uncertainty interval, - // we advance the txn's timestamp just past the uncertain value's timestamp. - // This ensures that we read above the uncertain value on a retry. - ts := err.ExistingTimestamp.Next() - // In addition to advancing past the uncertainty value's timestamp, we also - // advance the txn's timestamp up to the local uncertainty limit on the node - // which hit the error. This ensures that no future read after the retry on - // this node (ignoring lease complications in ComputeLocalUncertaintyLimit - // and values with synthetic timestamps) will throw an uncertainty error, - // even when reading other keys. - // - // Note that if the request was not able to establish a local uncertainty - // limit due to a missing observed timestamp (for instance, if the request - // was evaluated on a follower replica and the txn had never visited the - // leaseholder), then LocalUncertaintyLimit will be empty and the Forward - // will be a no-op. In this case, we could advance all the way past the - // global uncertainty limit, but this time would likely be in the future, so - // this would necessitate a commit-wait period after committing. - // - // In general, we expect the local uncertainty limit, if set, to be above - // the uncertainty value's timestamp. So we expect this Forward to advance - // ts. However, this is not always the case. The one exception is if the - // uncertain value had a synthetic timestamp, so it was compared against the - // global uncertainty limit to determine uncertainty (see IsUncertain). In - // such cases, we're ok advancing just past the value's timestamp. Either - // way, we won't see the same value in our uncertainty interval on a retry. - ts.Forward(err.LocalUncertaintyLimit) - return ts -} - -func writeTooOldRetryTimestamp(err *WriteTooOldError) hlc.Timestamp { - return err.ActualTimestamp -} - // Replicas returns all of the replicas present in the descriptor after this // trigger applies. func (crt ChangeReplicasTrigger) Replicas() []ReplicaDescriptor { diff --git a/pkg/roachpb/errors.go b/pkg/roachpb/errors.go index 78cb10e18733..8ce4c5fe509a 100644 --- a/pkg/roachpb/errors.go +++ b/pkg/roachpb/errors.go @@ -949,6 +949,12 @@ func (e *WriteTooOldError) Type() ErrorDetailType { return WriteTooOldErrType } +// RetryTimestamp returns the timestamp that should be used to retry an +// operation after encountering a WriteTooOldError. +func (e *WriteTooOldError) RetryTimestamp() hlc.Timestamp { + return e.ActualTimestamp +} + var _ ErrorDetailInterface = &WriteTooOldError{} var _ transactionRestartError = &WriteTooOldError{} @@ -1009,6 +1015,39 @@ func (*ReadWithinUncertaintyIntervalError) canRestartTransaction() TransactionRe return TransactionRestart_IMMEDIATE } +// RetryTimestamp returns the timestamp that should be used to retry an +// operation after encountering a ReadWithinUncertaintyIntervalError. +func (e *ReadWithinUncertaintyIntervalError) RetryTimestamp() hlc.Timestamp { + // If the reader encountered a newer write within the uncertainty interval, + // we advance the txn's timestamp just past the uncertain value's timestamp. + // This ensures that we read above the uncertain value on a retry. + ts := e.ExistingTimestamp.Next() + // In addition to advancing past the uncertainty value's timestamp, we also + // advance the txn's timestamp up to the local uncertainty limit on the node + // which hit the error. This ensures that no future read after the retry on + // this node (ignoring lease complications in ComputeLocalUncertaintyLimit + // and values with synthetic timestamps) will throw an uncertainty error, + // even when reading other keys. + // + // Note that if the request was not able to establish a local uncertainty + // limit due to a missing observed timestamp (for instance, if the request + // was evaluated on a follower replica and the txn had never visited the + // leaseholder), then LocalUncertaintyLimit will be empty and the Forward + // will be a no-op. In this case, we could advance all the way past the + // global uncertainty limit, but this time would likely be in the future, so + // this would necessitate a commit-wait period after committing. + // + // In general, we expect the local uncertainty limit, if set, to be above + // the uncertainty value's timestamp. So we expect this Forward to advance + // ts. However, this is not always the case. The one exception is if the + // uncertain value had a synthetic timestamp, so it was compared against the + // global uncertainty limit to determine uncertainty (see IsUncertain). In + // such cases, we're ok advancing just past the value's timestamp. Either + // way, we won't see the same value in our uncertainty interval on a retry. + ts.Forward(e.LocalUncertaintyLimit) + return ts +} + var _ ErrorDetailInterface = &ReadWithinUncertaintyIntervalError{} var _ transactionRestartError = &ReadWithinUncertaintyIntervalError{} diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index a8b229b97e97..f4edd908ed54 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -1145,39 +1145,25 @@ func TestReacquireLeaseOnRestart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - advancement := 2 * base.DefaultDescriptorLeaseDuration - - var cmdFilters tests.CommandFilters - cmdFilters.AppendFilter(tests.CheckEndTxnTrigger, true) - - testKey := []byte("test_key") - storeTestingKnobs := &kvserver.StoreTestingKnobs{ - EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ - TestingEvalFilter: cmdFilters.RunFilters, - }, - DisableMaxOffsetCheck: true, - } - const refreshAttempts = 3 clientTestingKnobs := &kvcoord.ClientTestingKnobs{ MaxTxnRefreshAttempts: refreshAttempts, } - params, _ := tests.CreateTestServerParams() - params.Knobs.Store = storeTestingKnobs - params.Knobs.KVClient = clientTestingKnobs - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(context.Background()) - + testKey := []byte("test_key") + var s serverutils.TestServerInterface var clockUpdate, restartDone int32 - cleanupFilter := cmdFilters.AppendFilter( - func(args kvserverbase.FilterArgs) *roachpb.Error { - if req, ok := args.Req.(*roachpb.GetRequest); ok { + testingResponseFilter := func( + ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, + ) *roachpb.Error { + for _, ru := range ba.Requests { + if req := ru.GetGet(); req != nil { if bytes.Contains(req.Key, testKey) && !kv.TestingIsRangeLookupRequest(req) { if atomic.LoadInt32(&clockUpdate) == 0 { atomic.AddInt32(&clockUpdate, 1) // Hack to advance the transaction timestamp on a // transaction restart. + const advancement = 2 * base.DefaultDescriptorLeaseDuration now := s.Clock().NowAsClockTimestamp() now.WallTime += advancement.Nanoseconds() s.Clock().Update(now) @@ -1189,7 +1175,7 @@ func TestReacquireLeaseOnRestart(t *testing.T) { atomic.AddInt32(&restartDone, 1) // Return ReadWithinUncertaintyIntervalError to update // the transaction timestamp on retry. - txn := args.Hdr.Txn + txn := ba.Txn txn.ResetObservedTimestamps() now := s.Clock().NowAsClockTimestamp() txn.UpdateObservedTimestamp(s.(*server.TestServer).Gossip().NodeID.Get(), now) @@ -1197,9 +1183,22 @@ func TestReacquireLeaseOnRestart(t *testing.T) { } } } - return nil - }, false) - defer cleanupFilter() + } + return nil + } + storeTestingKnobs := &kvserver.StoreTestingKnobs{ + // We use a TestingResponseFilter to avoid server-side refreshes of the + // ReadWithinUncertaintyIntervalError that the filter returns. + TestingResponseFilter: testingResponseFilter, + DisableMaxOffsetCheck: true, + } + + params, _ := tests.CreateTestServerParams() + params.Knobs.Store = storeTestingKnobs + params.Knobs.KVClient = clientTestingKnobs + var sqlDB *gosql.DB + s, sqlDB, _ = serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.Background()) sqlDB.SetMaxOpenConns(1) if _, err := sqlDB.Exec(`