diff --git a/pkg/storage/batcheval/result/result.go b/pkg/storage/batcheval/result/result.go index cc2cd7311f93..de185a39fda5 100644 --- a/pkg/storage/batcheval/result/result.go +++ b/pkg/storage/batcheval/result/result.go @@ -16,6 +16,7 @@ package result import ( "context" + "fmt" "github.com/kr/pretty" "github.com/pkg/errors" @@ -70,6 +71,28 @@ type LocalResult struct { UpdatedTxns *[]*roachpb.Transaction } +func (lResult *LocalResult) String() string { + if lResult == nil { + return "LocalResult: nil" + } + var numIntents, numEndTxns, numUpdatedTxns int + if lResult.Intents != nil { + numIntents = len(*lResult.Intents) + } + if lResult.EndTxns != nil { + numEndTxns = len(*lResult.EndTxns) + } + if lResult.UpdatedTxns != nil { + numUpdatedTxns = len(*lResult.UpdatedTxns) + } + return fmt.Sprintf("LocalResult (reply: %v, #intents: %d, #endTxns: %d #updated txns: %d, "+ + "GossipFirstRange:%t MaybeGossipSystemConfig:%t MaybeAddToSplitQueue:%t "+ + "MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t", + lResult.Reply, numIntents, numEndTxns, numUpdatedTxns, lResult.GossipFirstRange, + lResult.MaybeGossipSystemConfig, lResult.MaybeAddToSplitQueue, + lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge) +} + // DetachMaybeWatchForMerge returns and falsifies the MaybeWatchForMerge flag // from the local result. func (lResult *LocalResult) DetachMaybeWatchForMerge() bool { diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index b5e6ac92b488..0403c3eb9138 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -3199,8 +3199,10 @@ func (r *Replica) executeWriteBatch( br, pErr, retry := r.tryExecuteWriteBatch(ctx, ba) switch retry { case proposalIllegalLeaseIndex: + log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") continue // retry case proposalAmbiguousShouldBeReevaluated: + log.VEventf(ctx, 2, "retry: proposalAmbiguousShouldBeReevaluated") ambiguousResult = true continue // retry case proposalRangeNoLongerExists, proposalErrorReproposing: @@ -5619,7 +5621,15 @@ func (r *Replica) processRaftCommand( } response.Intents = proposal.Local.DetachIntents() response.EndTxns = proposal.Local.DetachEndTxns(response.Err != nil) - lResult = proposal.Local + if pErr == nil { + lResult = proposal.Local + } + } + if pErr != nil && lResult != nil { + log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) + } + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEvent(ctx, 2, lResult.String()) } // Handle the Result, executing any side effects of the last diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 75e18cbd4759..588152597f1d 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -81,9 +81,10 @@ type ProposalData struct { // Always use ProposalData.finishApplication(). doneCh chan proposalResult - // Local contains the results of evaluating the request - // tying the upstream evaluation of the request to the - // downstream application of the command. + // Local contains the results of evaluating the request tying the upstream + // evaluation of the request to the downstream application of the command. + // Nil when the proposal came from another node (i.e. the evaluation wasn't + // done here). Local *result.LocalResult // Request is the client's original BatchRequest. diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 471ea7cffdc8..19ec158f8fb2 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -65,6 +65,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/shuffle" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -8734,6 +8735,77 @@ func TestGCWithoutThreshold(t *testing.T) { } } +// Test that, if the Raft command resulting from EndTransaction request fails to +// be processed/apply, then the LocalResult associated with that command is +// cleared. +func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + var tc testContext + cfg := TestStoreConfig(nil) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(t, stopper, cfg) + + key := roachpb.Key("a") + txn := newTransaction("test", key, 1, enginepb.SERIALIZABLE, tc.Clock()) + bt, btH := beginTxnArgs(key, txn) + assignSeqNumsForReqs(txn, &bt) + put := putArgs(key, []byte("value")) + assignSeqNumsForReqs(txn, &put) + + var ba roachpb.BatchRequest + ba.Header = btH + ba.Add(&bt, &put) + if _, err := tc.Sender().Send(ctx, ba); err != nil { + t.Fatal(err) + } + + var proposalRecognized int64 // accessed atomically + + r := tc.repl + r.mu.Lock() + r.mu.submitProposalFn = func(pd *ProposalData) error { + // We're going to recognize the first time the commnand for the + // EndTransaction is proposed and we're going to hackily decrease its + // MaxLeaseIndex, so that the processing gets rejected further on. + ut := pd.Local.UpdatedTxns + if atomic.LoadInt64(&proposalRecognized) == 0 && + ut != nil && len(*ut) == 1 && (*ut)[0].ID.Equal(txn.ID) { + pd.command.MaxLeaseIndex-- + atomic.StoreInt64(&proposalRecognized, 1) + } + return defaultSubmitProposalLocked(r, pd) + } + r.mu.Unlock() + + opCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording") + defer cancel() + + ba = roachpb.BatchRequest{} + et, etH := endTxnArgs(txn, true /* commit */) + et.IntentSpans = []roachpb.Span{{Key: key}} + assignSeqNumsForReqs(txn, &et) + ba.Header = etH + ba.Add(&et) + if _, err := tc.Sender().Send(opCtx, ba); err != nil { + t.Fatal(err) + } + formatted := tracing.FormatRecordedSpans(collect()) + if err := testutils.MatchInOrder(formatted, + // The first proposal is rejected. + "retry proposal.*applied at lease index.*but required", + // The LocalResult is nil. This is the important part for this test. + "LocalResult: nil", + // The request will be re-evaluated. + "retry: proposalIllegalLeaseIndex", + // Re-evaluation succeeds and one txn is to be updated. + "LocalResult \\(reply.*#updated txns: 1", + ); err != nil { + t.Fatal(err) + } +} + // TestCommandTimeThreshold verifies that commands outside the replica GC // threshold fail. func TestCommandTimeThreshold(t *testing.T) {