Skip to content

Commit

Permalink
Merge #32166
Browse files Browse the repository at this point in the history
32166: storage: don't apply local results if cmd processing failed r=andreimatei a=andreimatei

This patch fixes a bug with the application of LocalResults: they were
applied even in cases when the processing of the respective command
failed. So, we were evaluating a batch, proposing a command, failing to
apply the command (in particular because of the lease index check), but
then still applying the LocalResult even though the replicated state was
not updated.
This bug had catastrophic consequences: part of the LocalResult is a
list of UpdatedTxn - this is used to ping the txn wait queue and unblock
waiters, communicating a final txn disposition to them. Now imagine an
EndTxn batch that evaluates to a command that fails the lease check. The
respective LocalResult is populate with out txn, updated to the
COMMITTED disposition. The command fails to process, but we still ping
the wait queue telling all the waiters that the transaction committed.
The waiters, if any, now go and send ResolveIntent requests, which
actually commit intents (thinking that the intent's txn committed). And
so we end up with "partially committed transactions" - some of the
writes from a txn are committed (the ones that happened to have waiters
on their intents), others don't.

In order for this bug to manifest itself, we need:
- read/write contention on some keys
- a command to fail processing. Generally this happens either because of
the lease index check or because of the lease check; so either a
Raft leadership change or a lease change can potentially cause the
badness. In the case of a Raft leadership change, proposals can get
dropped, which leads to reproposals, which seem to frequently(?) end up
processing at the wrong lease index and thus be rejected (and it's the
reproposal processing that triggers the bug). This kind of situation is
triggered by the Jepsen register test, with various nemeses. The lease
check failure can probably be triggered simply with lease transfers
(e.g. lease rebalancing).

Interestingly, the rate of occurence of this bug probably changed
between 2.0 and 2.1 because of the introduction, in 2.1, of the
QueryIntent request (and also, separately, because of increased usage of
lease transfers; this claim was not verified though). Speaking of the
Raft leadership change scenario, once the reproposal fail to be applied
(but the wait queue is erroneously signalled), we also re-evaluate the
EndTransaction batch. Unless something else goes wrong, in 2.0 this
re-evaluation was likely succeeding. In 2.1, however, it tends to fail
if there was a waiter on our txn because the EndTxn requests are usually
bundled with QueryIntent requests - and these QueryIntents fail during
re-evaluation because they don't find the intent - it was, by then,
errneously  committed by a waiter through a ResolveIntent. Except for
transaction that wrote to multiple ranges: I think for those the
QueryIntent requests are split off from the EndTransaction request by
the DistSender, and so we're back to the situation in 2.0.

Fixes #30792

Release note (bug fix): Fix a bug causing transactions to appear to be
partially committed. CRDB was sometimes claiming to have failed to
commit a transaction but some (or all) of its writes were actually
persisted.

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
  • Loading branch information
craig[bot] and andreimatei committed Nov 12, 2018
2 parents 190333a + 0a405c4 commit 3bfc2b1
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 4 deletions.
23 changes: 23 additions & 0 deletions pkg/storage/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package result

import (
"context"
"fmt"

"github.com/kr/pretty"
"github.com/pkg/errors"
Expand Down Expand Up @@ -70,6 +71,28 @@ type LocalResult struct {
UpdatedTxns *[]*roachpb.Transaction
}

func (lResult *LocalResult) String() string {
if lResult == nil {
return "LocalResult: nil"
}
var numIntents, numEndTxns, numUpdatedTxns int
if lResult.Intents != nil {
numIntents = len(*lResult.Intents)
}
if lResult.EndTxns != nil {
numEndTxns = len(*lResult.EndTxns)
}
if lResult.UpdatedTxns != nil {
numUpdatedTxns = len(*lResult.UpdatedTxns)
}
return fmt.Sprintf("LocalResult (reply: %v, #intents: %d, #endTxns: %d #updated txns: %d, "+
"GossipFirstRange:%t MaybeGossipSystemConfig:%t MaybeAddToSplitQueue:%t "+
"MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t",
lResult.Reply, numIntents, numEndTxns, numUpdatedTxns, lResult.GossipFirstRange,
lResult.MaybeGossipSystemConfig, lResult.MaybeAddToSplitQueue,
lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge)
}

// DetachMaybeWatchForMerge returns and falsifies the MaybeWatchForMerge flag
// from the local result.
func (lResult *LocalResult) DetachMaybeWatchForMerge() bool {
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3199,8 +3199,10 @@ func (r *Replica) executeWriteBatch(
br, pErr, retry := r.tryExecuteWriteBatch(ctx, ba)
switch retry {
case proposalIllegalLeaseIndex:
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")
continue // retry
case proposalAmbiguousShouldBeReevaluated:
log.VEventf(ctx, 2, "retry: proposalAmbiguousShouldBeReevaluated")
ambiguousResult = true
continue // retry
case proposalRangeNoLongerExists, proposalErrorReproposing:
Expand Down Expand Up @@ -5619,7 +5621,15 @@ func (r *Replica) processRaftCommand(
}
response.Intents = proposal.Local.DetachIntents()
response.EndTxns = proposal.Local.DetachEndTxns(response.Err != nil)
lResult = proposal.Local
if pErr == nil {
lResult = proposal.Local
}
}
if pErr != nil && lResult != nil {
log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr)
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEvent(ctx, 2, lResult.String())
}

// Handle the Result, executing any side effects of the last
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ type ProposalData struct {
// Always use ProposalData.finishApplication().
doneCh chan proposalResult

// Local contains the results of evaluating the request
// tying the upstream evaluation of the request to the
// downstream application of the command.
// Local contains the results of evaluating the request tying the upstream
// evaluation of the request to the downstream application of the command.
// Nil when the proposal came from another node (i.e. the evaluation wasn't
// done here).
Local *result.LocalResult

// Request is the client's original BatchRequest.
Expand Down
72 changes: 72 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand Down Expand Up @@ -8734,6 +8735,77 @@ func TestGCWithoutThreshold(t *testing.T) {
}
}

// Test that, if the Raft command resulting from EndTransaction request fails to
// be processed/apply, then the LocalResult associated with that command is
// cleared.
func TestFailureToProcessCommandClearsLocalResult(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
var tc testContext
cfg := TestStoreConfig(nil)
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, cfg)

key := roachpb.Key("a")
txn := newTransaction("test", key, 1, enginepb.SERIALIZABLE, tc.Clock())
bt, btH := beginTxnArgs(key, txn)
assignSeqNumsForReqs(txn, &bt)
put := putArgs(key, []byte("value"))
assignSeqNumsForReqs(txn, &put)

var ba roachpb.BatchRequest
ba.Header = btH
ba.Add(&bt, &put)
if _, err := tc.Sender().Send(ctx, ba); err != nil {
t.Fatal(err)
}

var proposalRecognized int64 // accessed atomically

r := tc.repl
r.mu.Lock()
r.mu.submitProposalFn = func(pd *ProposalData) error {
// We're going to recognize the first time the commnand for the
// EndTransaction is proposed and we're going to hackily decrease its
// MaxLeaseIndex, so that the processing gets rejected further on.
ut := pd.Local.UpdatedTxns
if atomic.LoadInt64(&proposalRecognized) == 0 &&
ut != nil && len(*ut) == 1 && (*ut)[0].ID.Equal(txn.ID) {
pd.command.MaxLeaseIndex--
atomic.StoreInt64(&proposalRecognized, 1)
}
return defaultSubmitProposalLocked(r, pd)
}
r.mu.Unlock()

opCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
defer cancel()

ba = roachpb.BatchRequest{}
et, etH := endTxnArgs(txn, true /* commit */)
et.IntentSpans = []roachpb.Span{{Key: key}}
assignSeqNumsForReqs(txn, &et)
ba.Header = etH
ba.Add(&et)
if _, err := tc.Sender().Send(opCtx, ba); err != nil {
t.Fatal(err)
}
formatted := tracing.FormatRecordedSpans(collect())
if err := testutils.MatchInOrder(formatted,
// The first proposal is rejected.
"retry proposal.*applied at lease index.*but required",
// The LocalResult is nil. This is the important part for this test.
"LocalResult: nil",
// The request will be re-evaluated.
"retry: proposalIllegalLeaseIndex",
// Re-evaluation succeeds and one txn is to be updated.
"LocalResult \\(reply.*#updated txns: 1",
); err != nil {
t.Fatal(err)
}
}

// TestCommandTimeThreshold verifies that commands outside the replica GC
// threshold fail.
func TestCommandTimeThreshold(t *testing.T) {
Expand Down

0 comments on commit 3bfc2b1

Please sign in to comment.