From 012e653816add46431404a81c1adc53c8c85f478 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Mon, 5 Nov 2018 14:27:10 -0500 Subject: [PATCH 1/2] storage: don't apply local results if cmd processing failed This patch fixes a bug with the application of LocalResults: they were applied even in cases when the processing of the respective command failed. So, we were evaluating a batch, proposing a command, failing to apply the command (in particular because of the lease index check), but then still applying the LocalResult even though the replicated state was not updated. This bug had catastrophic consequences: part of the LocalResult is a list of UpdatedTxn - this is used to ping the txn wait queue and unblock waiters, communicating a final txn disposition to them. Now imagine an EndTxn batch that evaluates to a command that fails the lease check. The respective LocalResult is populate with out txn, updated to the COMMITTED disposition. The command fails to process, but we still ping the wait queue telling all the waiters that the transaction committed. The waiters, if any, now go and send ResolveIntent requests, which actually commit intents (thinking that the intent's txn committed). And so we end up with "partially committed transactions" - some of the writes from a txn are committed (the ones that happened to have waiters on their intents), others don't. In order for this bug to manifest itself, we need: - read/write contention on some keys - a command to fail processing. Generally this happens either because of the lease index check or because of the lease check; so either a Raft leadership change or a lease change can potentially cause the badness. In the case of a Raft leadership change, proposals can get dropped, which leads to reproposals, which seem to frequently(?) end up processing at the wrong lease index and thus be rejected (and it's the reproposal processing that triggers the bug). This kind of situation is triggered by the Jepsen register test, with various nemeses. The lease check failure can probably be triggered simply with lease transfers (e.g. lease rebalancing). Interestingly, the rate of occurence of this bug probably changed between 2.0 and 2.1 because of the introduction, in 2.1, of the QueryIntent request (and also, separately, because of increased usage of lease transfers; this claim was not verified though). Speaking of the Raft leadership change scenario, once the reproposal fail to be applied (but the wait queue is erroneously signalled), we also re-evaluate the EndTransaction batch. Unless something else goes wrong, in 2.0 this re-evaluation was likely succeeding. In 2.1, however, it tends to fail if there was a waiter on our txn because the EndTxn requests are usually bundled with QueryIntent requests - and these QueryIntents fail during re-evaluation because they don't find the intent - it was, by then, errneously committed by a waiter through a ResolveIntent. Except for transaction that wrote to multiple ranges: I think for those the QueryIntent requests are split off from the EndTransaction request by the DistSender, and so we're back to the situation in 2.0. Fixes #30792 Release note (bug fix): Fix a bug causing transactions to appear to be partially committed. CRDB was sometimes claiming to have failed to commit a transaction but some (or all) of its writes were actually persisted. --- pkg/storage/replica.go | 7 ++++++- pkg/storage/replica_proposal.go | 7 ++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index b5e6ac92b488..c7d1b7ced496 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -5619,7 +5619,12 @@ func (r *Replica) processRaftCommand( } response.Intents = proposal.Local.DetachIntents() response.EndTxns = proposal.Local.DetachEndTxns(response.Err != nil) - lResult = proposal.Local + if pErr == nil { + lResult = proposal.Local + } + } + if pErr != nil && lResult != nil { + log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) } // Handle the Result, executing any side effects of the last diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 75e18cbd4759..588152597f1d 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -81,9 +81,10 @@ type ProposalData struct { // Always use ProposalData.finishApplication(). doneCh chan proposalResult - // Local contains the results of evaluating the request - // tying the upstream evaluation of the request to the - // downstream application of the command. + // Local contains the results of evaluating the request tying the upstream + // evaluation of the request to the downstream application of the command. + // Nil when the proposal came from another node (i.e. the evaluation wasn't + // done here). Local *result.LocalResult // Request is the client's original BatchRequest. From 0a405c4f76907336bb02ea9c4ad0aafa0675a901 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Tue, 6 Nov 2018 15:24:30 -0500 Subject: [PATCH 2/2] storage: add a test for the clearing of local results ... in case of rejected command. Release note: None --- pkg/storage/batcheval/result/result.go | 23 ++++++++ pkg/storage/replica.go | 5 ++ pkg/storage/replica_test.go | 72 ++++++++++++++++++++++++++ 3 files changed, 100 insertions(+) diff --git a/pkg/storage/batcheval/result/result.go b/pkg/storage/batcheval/result/result.go index cc2cd7311f93..de185a39fda5 100644 --- a/pkg/storage/batcheval/result/result.go +++ b/pkg/storage/batcheval/result/result.go @@ -16,6 +16,7 @@ package result import ( "context" + "fmt" "github.com/kr/pretty" "github.com/pkg/errors" @@ -70,6 +71,28 @@ type LocalResult struct { UpdatedTxns *[]*roachpb.Transaction } +func (lResult *LocalResult) String() string { + if lResult == nil { + return "LocalResult: nil" + } + var numIntents, numEndTxns, numUpdatedTxns int + if lResult.Intents != nil { + numIntents = len(*lResult.Intents) + } + if lResult.EndTxns != nil { + numEndTxns = len(*lResult.EndTxns) + } + if lResult.UpdatedTxns != nil { + numUpdatedTxns = len(*lResult.UpdatedTxns) + } + return fmt.Sprintf("LocalResult (reply: %v, #intents: %d, #endTxns: %d #updated txns: %d, "+ + "GossipFirstRange:%t MaybeGossipSystemConfig:%t MaybeAddToSplitQueue:%t "+ + "MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t", + lResult.Reply, numIntents, numEndTxns, numUpdatedTxns, lResult.GossipFirstRange, + lResult.MaybeGossipSystemConfig, lResult.MaybeAddToSplitQueue, + lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge) +} + // DetachMaybeWatchForMerge returns and falsifies the MaybeWatchForMerge flag // from the local result. func (lResult *LocalResult) DetachMaybeWatchForMerge() bool { diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index c7d1b7ced496..0403c3eb9138 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -3199,8 +3199,10 @@ func (r *Replica) executeWriteBatch( br, pErr, retry := r.tryExecuteWriteBatch(ctx, ba) switch retry { case proposalIllegalLeaseIndex: + log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") continue // retry case proposalAmbiguousShouldBeReevaluated: + log.VEventf(ctx, 2, "retry: proposalAmbiguousShouldBeReevaluated") ambiguousResult = true continue // retry case proposalRangeNoLongerExists, proposalErrorReproposing: @@ -5626,6 +5628,9 @@ func (r *Replica) processRaftCommand( if pErr != nil && lResult != nil { log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) } + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEvent(ctx, 2, lResult.String()) + } // Handle the Result, executing any side effects of the last // state machine transition. diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 471ea7cffdc8..19ec158f8fb2 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -65,6 +65,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/shuffle" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -8734,6 +8735,77 @@ func TestGCWithoutThreshold(t *testing.T) { } } +// Test that, if the Raft command resulting from EndTransaction request fails to +// be processed/apply, then the LocalResult associated with that command is +// cleared. +func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + var tc testContext + cfg := TestStoreConfig(nil) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(t, stopper, cfg) + + key := roachpb.Key("a") + txn := newTransaction("test", key, 1, enginepb.SERIALIZABLE, tc.Clock()) + bt, btH := beginTxnArgs(key, txn) + assignSeqNumsForReqs(txn, &bt) + put := putArgs(key, []byte("value")) + assignSeqNumsForReqs(txn, &put) + + var ba roachpb.BatchRequest + ba.Header = btH + ba.Add(&bt, &put) + if _, err := tc.Sender().Send(ctx, ba); err != nil { + t.Fatal(err) + } + + var proposalRecognized int64 // accessed atomically + + r := tc.repl + r.mu.Lock() + r.mu.submitProposalFn = func(pd *ProposalData) error { + // We're going to recognize the first time the commnand for the + // EndTransaction is proposed and we're going to hackily decrease its + // MaxLeaseIndex, so that the processing gets rejected further on. + ut := pd.Local.UpdatedTxns + if atomic.LoadInt64(&proposalRecognized) == 0 && + ut != nil && len(*ut) == 1 && (*ut)[0].ID.Equal(txn.ID) { + pd.command.MaxLeaseIndex-- + atomic.StoreInt64(&proposalRecognized, 1) + } + return defaultSubmitProposalLocked(r, pd) + } + r.mu.Unlock() + + opCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording") + defer cancel() + + ba = roachpb.BatchRequest{} + et, etH := endTxnArgs(txn, true /* commit */) + et.IntentSpans = []roachpb.Span{{Key: key}} + assignSeqNumsForReqs(txn, &et) + ba.Header = etH + ba.Add(&et) + if _, err := tc.Sender().Send(opCtx, ba); err != nil { + t.Fatal(err) + } + formatted := tracing.FormatRecordedSpans(collect()) + if err := testutils.MatchInOrder(formatted, + // The first proposal is rejected. + "retry proposal.*applied at lease index.*but required", + // The LocalResult is nil. This is the important part for this test. + "LocalResult: nil", + // The request will be re-evaluated. + "retry: proposalIllegalLeaseIndex", + // Re-evaluation succeeds and one txn is to be updated. + "LocalResult \\(reply.*#updated txns: 1", + ); err != nil { + t.Fatal(err) + } +} + // TestCommandTimeThreshold verifies that commands outside the replica GC // threshold fail. func TestCommandTimeThreshold(t *testing.T) {