Skip to content

Commit

Permalink
storage: add a test for the clearing of local results
Browse files Browse the repository at this point in the history
... in case of rejected command.

Release note: None
  • Loading branch information
andreimatei committed Nov 6, 2018
1 parent c523fbd commit defc5d7
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 0 deletions.
23 changes: 23 additions & 0 deletions pkg/storage/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package result

import (
"context"
"fmt"

"github.com/kr/pretty"
"github.com/pkg/errors"
Expand Down Expand Up @@ -70,6 +71,28 @@ type LocalResult struct {
UpdatedTxns *[]*roachpb.Transaction
}

func (lResult *LocalResult) String() string {
if lResult == nil {
return "LocalResult: nil"
}
var numIntents, numEndTxns, numUpdatedTxns int
if lResult.Intents != nil {
numIntents = len(*lResult.Intents)
}
if lResult.EndTxns != nil {
numEndTxns = len(*lResult.EndTxns)
}
if lResult.UpdatedTxns != nil {
numUpdatedTxns = len(*lResult.UpdatedTxns)
}
return fmt.Sprintf("LocalResult (reply: %v, #intents: %d, #endTxns: %d #updated txns: %d, "+
"GossipFirstRange:%t MaybeGossipSystemConfig:%t MaybeAddToSplitQueue:%t "+
"MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t",
lResult.Reply, numIntents, numEndTxns, numUpdatedTxns, lResult.GossipFirstRange,
lResult.MaybeGossipSystemConfig, lResult.MaybeAddToSplitQueue,
lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge)
}

// DetachMaybeWatchForMerge returns and falsifies the MaybeWatchForMerge flag
// from the local result.
func (lResult *LocalResult) DetachMaybeWatchForMerge() bool {
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3189,8 +3189,10 @@ func (r *Replica) executeWriteBatch(
br, pErr, retry := r.tryExecuteWriteBatch(ctx, ba)
switch retry {
case proposalIllegalLeaseIndex:
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")
continue // retry
case proposalAmbiguousShouldBeReevaluated:
log.VEventf(ctx, 2, "retry: proposalAmbiguousShouldBeReevaluated")
ambiguousResult = true
continue // retry
case proposalRangeNoLongerExists, proposalErrorReproposing:
Expand Down Expand Up @@ -5598,6 +5600,9 @@ func (r *Replica) processRaftCommand(
if pErr != nil && lResult != nil {
log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr)
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEvent(ctx, 2, lResult.String())
}

// Handle the Result, executing any side effects of the last
// state machine transition.
Expand Down
72 changes: 72 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand Down Expand Up @@ -8725,6 +8726,77 @@ func TestGCWithoutThreshold(t *testing.T) {
}
}

// Test that, if the Raft command resulting from EndTransaction request fails to
// be processed/apply, then the LocalResult associated with that command is
// cleared.
func TestFailureToProcessCommandClearsLocalResult(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
var tc testContext
cfg := TestStoreConfig(nil)
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, cfg)

key := roachpb.Key("a")
txn := newTransaction("test", key, 1, enginepb.SERIALIZABLE, tc.Clock())
bt, btH := beginTxnArgs(key, txn)
assignSeqNumsForReqs(txn, &bt)
put := putArgs(key, []byte("value"))
assignSeqNumsForReqs(txn, &put)

var ba roachpb.BatchRequest
ba.Header = btH
ba.Add(&bt, &put)
if _, err := tc.Sender().Send(ctx, ba); err != nil {
t.Fatal(err)
}

var proposalRecognized int64 // accessed atomically

r := tc.repl
r.mu.Lock()
r.mu.submitProposalFn = func(pd *ProposalData) error {
// We're going to recognize the first time the commnand for the
// EndTransaction is proposed and we're going to hackily decrease its
// MaxLeaseIndex, so that the processing gets rejected further on.
ut := pd.Local.UpdatedTxns
if atomic.LoadInt64(&proposalRecognized) == 0 &&
ut != nil && len(*ut) == 1 && (*ut)[0].ID.Equal(txn.ID) {
pd.command.MaxLeaseIndex--
atomic.StoreInt64(&proposalRecognized, 1)
}
return defaultSubmitProposalLocked(r, pd)
}
r.mu.Unlock()

opCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
defer cancel()

ba = roachpb.BatchRequest{}
et, etH := endTxnArgs(txn, true /* commit */)
et.IntentSpans = []roachpb.Span{{Key: key}}
assignSeqNumsForReqs(txn, &et)
ba.Header = etH
ba.Add(&et)
if _, err := tc.Sender().Send(opCtx, ba); err != nil {
t.Fatal(err)
}
formatted := tracing.FormatRecordedSpans(collect())
if err := testutils.MatchInOrder(formatted,
// The first proposal is rejected.
"retry proposal.*applied at lease index.*but required",
// The LocalResult is nil. This is the important part for this test.
"LocalResult: nil",
// The request will be re-evaluated.
"retry: proposalIllegalLeaseIndex",
// Re-evaluation succeeds and one txn is to be updated.
"LocalResult \\(reply.*#updated txns: 1",
); err != nil {
t.Fatal(err)
}
}

// TestCommandTimeThreshold verifies that commands outside the replica GC
// threshold fail.
func TestCommandTimeThreshold(t *testing.T) {
Expand Down

0 comments on commit defc5d7

Please sign in to comment.