Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: don't apply local results if cmd processing failed #32166

Merged
merged 2 commits into from
Nov 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/storage/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package result

import (
"context"
"fmt"

"github.com/kr/pretty"
"github.com/pkg/errors"
Expand Down Expand Up @@ -70,6 +71,28 @@ type LocalResult struct {
UpdatedTxns *[]*roachpb.Transaction
}

func (lResult *LocalResult) String() string {
if lResult == nil {
return "LocalResult: nil"
}
var numIntents, numEndTxns, numUpdatedTxns int
if lResult.Intents != nil {
numIntents = len(*lResult.Intents)
}
if lResult.EndTxns != nil {
numEndTxns = len(*lResult.EndTxns)
}
if lResult.UpdatedTxns != nil {
numUpdatedTxns = len(*lResult.UpdatedTxns)
}
return fmt.Sprintf("LocalResult (reply: %v, #intents: %d, #endTxns: %d #updated txns: %d, "+
"GossipFirstRange:%t MaybeGossipSystemConfig:%t MaybeAddToSplitQueue:%t "+
"MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t",
lResult.Reply, numIntents, numEndTxns, numUpdatedTxns, lResult.GossipFirstRange,
lResult.MaybeGossipSystemConfig, lResult.MaybeAddToSplitQueue,
lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge)
}

// DetachMaybeWatchForMerge returns and falsifies the MaybeWatchForMerge flag
// from the local result.
func (lResult *LocalResult) DetachMaybeWatchForMerge() bool {
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3199,8 +3199,10 @@ func (r *Replica) executeWriteBatch(
br, pErr, retry := r.tryExecuteWriteBatch(ctx, ba)
switch retry {
case proposalIllegalLeaseIndex:
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")
continue // retry
case proposalAmbiguousShouldBeReevaluated:
log.VEventf(ctx, 2, "retry: proposalAmbiguousShouldBeReevaluated")
ambiguousResult = true
continue // retry
case proposalRangeNoLongerExists, proposalErrorReproposing:
Expand Down Expand Up @@ -5619,7 +5621,15 @@ func (r *Replica) processRaftCommand(
}
response.Intents = proposal.Local.DetachIntents()
response.EndTxns = proposal.Local.DetachEndTxns(response.Err != nil)
lResult = proposal.Local
if pErr == nil {
lResult = proposal.Local
}
}
if pErr != nil && lResult != nil {
log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr)
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEvent(ctx, 2, lResult.String())
}

// Handle the Result, executing any side effects of the last
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ type ProposalData struct {
// Always use ProposalData.finishApplication().
doneCh chan proposalResult

// Local contains the results of evaluating the request
// tying the upstream evaluation of the request to the
// downstream application of the command.
// Local contains the results of evaluating the request tying the upstream
// evaluation of the request to the downstream application of the command.
// Nil when the proposal came from another node (i.e. the evaluation wasn't
// done here).
Local *result.LocalResult

// Request is the client's original BatchRequest.
Expand Down
72 changes: 72 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand Down Expand Up @@ -8734,6 +8735,77 @@ func TestGCWithoutThreshold(t *testing.T) {
}
}

// Test that, if the Raft command resulting from EndTransaction request fails to
// be processed/apply, then the LocalResult associated with that command is
// cleared.
func TestFailureToProcessCommandClearsLocalResult(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
var tc testContext
cfg := TestStoreConfig(nil)
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, cfg)

key := roachpb.Key("a")
txn := newTransaction("test", key, 1, enginepb.SERIALIZABLE, tc.Clock())
bt, btH := beginTxnArgs(key, txn)
assignSeqNumsForReqs(txn, &bt)
put := putArgs(key, []byte("value"))
assignSeqNumsForReqs(txn, &put)

var ba roachpb.BatchRequest
ba.Header = btH
ba.Add(&bt, &put)
if _, err := tc.Sender().Send(ctx, ba); err != nil {
t.Fatal(err)
}

var proposalRecognized int64 // accessed atomically

r := tc.repl
r.mu.Lock()
r.mu.submitProposalFn = func(pd *ProposalData) error {
// We're going to recognize the first time the commnand for the
// EndTransaction is proposed and we're going to hackily decrease its
// MaxLeaseIndex, so that the processing gets rejected further on.
ut := pd.Local.UpdatedTxns
if atomic.LoadInt64(&proposalRecognized) == 0 &&
ut != nil && len(*ut) == 1 && (*ut)[0].ID.Equal(txn.ID) {
pd.command.MaxLeaseIndex--
atomic.StoreInt64(&proposalRecognized, 1)
}
return defaultSubmitProposalLocked(r, pd)
}
r.mu.Unlock()

opCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
defer cancel()

ba = roachpb.BatchRequest{}
et, etH := endTxnArgs(txn, true /* commit */)
et.IntentSpans = []roachpb.Span{{Key: key}}
assignSeqNumsForReqs(txn, &et)
ba.Header = etH
ba.Add(&et)
if _, err := tc.Sender().Send(opCtx, ba); err != nil {
t.Fatal(err)
}
formatted := tracing.FormatRecordedSpans(collect())
if err := testutils.MatchInOrder(formatted,
// The first proposal is rejected.
"retry proposal.*applied at lease index.*but required",
// The LocalResult is nil. This is the important part for this test.
"LocalResult: nil",
// The request will be re-evaluated.
"retry: proposalIllegalLeaseIndex",
// Re-evaluation succeeds and one txn is to be updated.
"LocalResult \\(reply.*#updated txns: 1",
); err != nil {
t.Fatal(err)
}
}

// TestCommandTimeThreshold verifies that commands outside the replica GC
// threshold fail.
func TestCommandTimeThreshold(t *testing.T) {
Expand Down