diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go index f4144b1cb5fc..fc86a6bffc0e 100644 --- a/pkg/storage/replica_application.go +++ b/pkg/storage/replica_application.go @@ -154,25 +154,65 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { // Copy stats as it gets updated in-place in applyRaftCommandToBatch. b.replicaState.Stats = &b.stats *b.replicaState.Stats = *r.mu.state.Stats + // Assign all the local proposals first then delete all of them from the map + // in a second pass. This ensures that we retrieve all proposals correctly + // even if the batch has multiple entries for the same proposal, in which + // case the proposal was reproposed (either under its original or a new + // MaxLeaseIndex) which we handle in a second pass below. + var anyLocal bool var it cmdAppCtxBufIterator - haveProposalQuota := r.mu.proposalQuota != nil for ok := it.init(&b.cmdBuf); ok; ok = it.next() { cmd := it.cur() cmd.proposal = r.mu.proposals[cmd.idKey] if cmd.proposedLocally() { // We initiated this command, so use the caller-supplied context. cmd.ctx = cmd.proposal.ctx - delete(r.mu.proposals, cmd.idKey) - // At this point we're not guaranteed to have proposalQuota initialized, - // the same is true for quotaReleaseQueues. Only queue the proposal's - // quota for release if the proposalQuota is initialized. - if haveProposalQuota { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) - } + anyLocal = true } else { cmd.ctx = ctx } } + if !anyLocal { + // Fast-path. + return + } + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + if !cmd.proposedLocally() { + continue + } + if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { + // If this entry does not have the most up-to-date view of the + // corresponding proposal's maximum lease index then the proposal + // must have been reproposed with a higher lease index. (see + // tryReproposeWithNewLeaseIndex). In that case, there's a newer + // version of the proposal in the pipeline, so don't remove the + // proposal from the map. We expect this entry to be rejected by + // checkForcedErr. + continue + } + // Delete the proposal from the proposals map. There may be reproposals + // of the proposal in the pipeline, but those will all have the same max + // lease index, meaning that they will all be rejected after this entry + // applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex + // picks up the proposal on failure, it will re-add the proposal to the + // proposal map, but this won't affect anything in this cmdAppBatch. + // + // While here, add the proposal's quota size to the quota release queue. + // We check the proposal map again first to avoid double free-ing quota + // when reproposals from the same proposal end up in the same entry + // application batch. + if _, ok := r.mu.proposals[cmd.idKey]; !ok { + continue + } + delete(r.mu.proposals, cmd.idKey) + // At this point we're not guaranteed to have proposalQuota initialized, + // the same is true for quotaReleaseQueues. Only queue the proposal's + // quota for release if the proposalQuota is initialized. + if r.mu.proposalQuota != nil { + r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) + } + } } // stageRaftCommand handles the first phase of applying a command to the @@ -856,7 +896,7 @@ func (r *Replica) applyCmdAppBatch( } cmd.replicatedResult().SuggestedCompactions = nil isNonTrivial := batchIsNonTrivial && it.isLast() - if errExpl, err = r.handleRaftCommandResult(ctx, cmd, isNonTrivial, + if errExpl, err = r.handleRaftCommandResult(cmd.ctx, cmd, isNonTrivial, b.replicaState.UsingAppliedStateKey); err != nil { return errExpl, err } diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go index 8257243626e5..24ca050bde12 100644 --- a/pkg/storage/replica_application_result.go +++ b/pkg/storage/replica_application_result.go @@ -160,7 +160,11 @@ func (r *Replica) handleRaftCommandResult( ) (errExpl string, err error) { // Set up the local result prior to handling the ReplicatedEvalResult to // give testing knobs an opportunity to inspect it. - r.prepareLocalResult(cmd.ctx, cmd) + r.prepareLocalResult(ctx, cmd) + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEvent(ctx, 2, cmd.localResult.String()) + } + // Handle the ReplicatedEvalResult, executing any side effects of the last // state machine transition. // @@ -168,7 +172,7 @@ func (r *Replica) handleRaftCommandResult( // before notifying a potentially waiting client. clearTrivialReplicatedEvalResultFields(cmd.replicatedResult(), usingAppliedStateKey) if isNonTrivial { - r.handleComplexReplicatedEvalResult(cmd.ctx, *cmd.replicatedResult()) + r.handleComplexReplicatedEvalResult(ctx, *cmd.replicatedResult()) } else if !cmd.replicatedResult().Equal(storagepb.ReplicatedEvalResult{}) { log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v", cmd.replicatedResult()) @@ -186,13 +190,13 @@ func (r *Replica) handleRaftCommandResult( } if cmd.localResult != nil { - r.handleLocalEvalResult(cmd.ctx, *cmd.localResult) + r.handleLocalEvalResult(ctx, *cmd.localResult) } - r.finishRaftCommand(cmd.ctx, cmd) + r.finishRaftCommand(ctx, cmd) switch cmd.e.Type { case raftpb.EntryNormal: if cmd.replicatedResult().ChangeReplicas != nil { - log.Fatalf(cmd.ctx, "unexpected replication change from command %s", &cmd.raftCmd) + log.Fatalf(ctx, "unexpected replication change from command %s", &cmd.raftCmd) } case raftpb.EntryConfChange: if cmd.replicatedResult().ChangeReplicas == nil { @@ -307,6 +311,10 @@ func (r *Replica) handleComplexReplicatedEvalResult( // result will zero-out the struct to ensure that is has properly performed all // of the implied side-effects. func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { + if !cmd.proposedLocally() { + return + } + var pErr *roachpb.Error if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; filter != nil { var newPropRetry int @@ -328,36 +336,90 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { pErr = cmd.forcedErr } - if cmd.proposedLocally() { - if cmd.proposalRetry != proposalNoReevaluation && pErr == nil { - log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal) - } - if pErr != nil { - // A forced error was set (i.e. we did not apply the proposal, - // for instance due to its log position) or the Replica is now - // corrupted. - // If proposalRetry is set, we don't also return an error, as per the - // proposalResult contract. - if cmd.proposalRetry == proposalNoReevaluation { + if cmd.proposalRetry != proposalNoReevaluation && pErr == nil { + log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal) + } + if pErr != nil { + // A forced error was set (i.e. we did not apply the proposal, + // for instance due to its log position) or the Replica is now + // corrupted. + switch cmd.proposalRetry { + case proposalNoReevaluation: + cmd.response.Err = pErr + case proposalIllegalLeaseIndex: + // If we failed to apply at the right lease index, try again with a + // new one. This is important for pipelined writes, since they don't + // have a client watching to retry, so a failure to eventually apply + // the proposal would be a user-visible error. + pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd) + if pErr != nil { cmd.response.Err = pErr + } else { + // Unbind the entry's local proposal because we just succeeded + // in reproposing it and we don't want to acknowledge the client + // yet. + cmd.proposal = nil + return } - } else if cmd.proposal.Local.Reply != nil { - cmd.response.Reply = cmd.proposal.Local.Reply - } else { - log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal) - } - cmd.response.Intents = cmd.proposal.Local.DetachIntents() - cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) - if pErr == nil { - cmd.localResult = cmd.proposal.Local + default: + panic("unexpected") } + } else if cmd.proposal.Local.Reply != nil { + cmd.response.Reply = cmd.proposal.Local.Reply + } else { + log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal) } - if pErr != nil && cmd.localResult != nil { + cmd.response.Intents = cmd.proposal.Local.DetachIntents() + cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + if pErr == nil { + cmd.localResult = cmd.proposal.Local + } else if cmd.localResult != nil { log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) } - if log.ExpensiveLogEnabled(ctx, 2) { - log.VEvent(ctx, 2, cmd.localResult.String()) +} + +// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose +// commands that have gotten an illegal lease index error, and that we know +// could not have applied while their lease index was valid (that is, we +// observed all applied entries between proposal and the lease index becoming +// invalid, as opposed to skipping some of them by applying a snapshot). +// +// It is not intended for use elsewhere and is only a top-level function so that +// it can avoid the below_raft_protos check. Returns a nil error if the command +// has already been successfully applied or has been reproposed here or by a +// different entry for the same proposal that hit an illegal lease index error. +func (r *Replica) tryReproposeWithNewLeaseIndex( + ctx context.Context, cmd *cmdAppCtx, +) *roachpb.Error { + // Note that we don't need to validate anything about the proposal's + // lease here - if we got this far, we know that everything but the + // index is valid at this point in the log. + p := cmd.proposal + if p.applied || cmd.raftCmd.MaxLeaseIndex != p.command.MaxLeaseIndex { + // If the command associated with this rejected raft entry already + // applied then we don't want to repropose it. Doing so could lead + // to duplicate application of the same proposal. + // + // Similarly, if the command associated with this rejected raft + // entry has a different (larger) MaxLeaseIndex than the one we + // decoded from the entry itself, the command must have already + // been reproposed (this can happen if there are multiple copies + // of the command in the logs; see TestReplicaRefreshMultiple). + // We must not create multiple copies with multiple lease indexes, + // so don't repropose it again. This ensures that at any time, + // there is only up to a single lease index that has a chance of + // succeeding in the Raft log for a given command. + return nil } + // Some tests check for this log message in the trace. + log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") + maxLeaseIndex, pErr := r.propose(ctx, p) + if pErr != nil { + log.Warningf(ctx, "failed to repropose with new lease index: %s", pErr) + return pErr + } + log.VEventf(ctx, 2, "reproposed command %x at maxLeaseIndex=%d", cmd.idKey, maxLeaseIndex) + return nil } // finishRaftCommand is called after a command's side effects have been applied @@ -395,17 +457,6 @@ func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) { } if cmd.proposedLocally() { - // If we failed to apply at the right lease index, try again with - // a new one. This is important for pipelined writes, since they - // don't have a client watching to retry, so a failure to - // eventually apply the proposal would be a uservisible error. - // TODO(nvanbenschoten): This reproposal is not tracked by the - // quota pool. We should fix that. - if cmd.proposalRetry == proposalIllegalLeaseIndex && - r.tryReproposeWithNewLeaseIndex(cmd.proposal) { - return - } - // Otherwise, signal the command's status to the client. cmd.proposal.finishApplication(cmd.response) } else if cmd.response.Err != nil { log.VEventf(ctx, 1, "applying raft command resulted in error: %s", cmd.response.Err) diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 3ff7e608700f..02a7269a1adc 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -87,6 +87,11 @@ type ProposalData struct { // cache and release latches. ec endCmds + // applied is set when the a command finishes application. It is used to + // avoid reproposing a failed proposal if an earlier version of the same + // proposal succeeded in applying. + applied bool + // doneCh is used to signal the waiting RPC handler (the contents of // proposalResult come from LocalEvalResult). // @@ -122,11 +127,25 @@ type ProposalData struct { // is canceled, it won't be listening to this done channel, and so it can't be // counted on to invoke endCmds itself.) func (proposal *ProposalData) finishApplication(pr proposalResult) { + if proposal.applied { + // If the command already applied then we shouldn't be "finishing" its + // application again because it should only be able to apply successfully + // once. We expect that when any reproposal for the same command attempts + // to apply it will be rejected by the below raft lease sequence or lease + // index check in checkForcedErr. + if pr.Err != nil { + return + } + log.Fatalf(proposal.ctx, + "command already applied: %+v; unexpected successful result: %+v", proposal, pr) + } + proposal.applied = true proposal.ec.done(proposal.Request, pr.Reply, pr.Err) + proposal.signalProposalResult(pr) if proposal.sp != nil { tracing.FinishSpan(proposal.sp) + proposal.sp = nil } - proposal.signalProposalResult(pr) } // returnProposalResult signals proposal.doneCh with the proposal result if it diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index e11ae2529640..068d609ed7b6 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1404,56 +1404,6 @@ func (m lastUpdateTimesMap) isFollowerActive( return now.Sub(lastUpdateTime) <= MaxQuotaReplicaLivenessDuration } -// tryReproposeWithNewLeaseIndex is used by processRaftCommand to -// repropose commands that have gotten an illegal lease index error, -// and that we know could not have applied while their lease index was -// valid (that is, we observed all applied entries between proposal -// and the lease index becoming invalid, as opposed to skipping some -// of them by applying a snapshot). -// -// It is not intended for use elsewhere and is only a top-level -// function so that it can avoid the below_raft_protos check. Returns -// true if the command has been successfully reproposed (not -// necessarily by this method! But if this method returns true, the -// command will be in the local proposals map). -func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { - // Note that we don't need to validate anything about the proposal's - // lease here - if we got this far, we know that everything but the - // index is valid at this point in the log. - r.mu.Lock() - if proposal.command.MaxLeaseIndex > r.mu.state.LeaseAppliedIndex { - // If the command's MaxLeaseIndex is greater than the - // LeaseAppliedIndex, it must have already been reproposed (this - // can happen if there are multiple copies of the command in the - // logs; see TestReplicaRefreshMultiple). We must not create - // multiple copies with multiple lease indexes, so don't repropose - // it again. This ensures that at any time, there is only up to a - // single lease index that has a chance of succeeding in the Raft - // log for a given command. - // - // Note that the caller has already removed the current version of - // the proposal from the pending proposals map. We must re-add it - // since it's still pending. - log.VEventf(proposal.ctx, 2, "skipping reproposal, already reproposed at index %d", - proposal.command.MaxLeaseIndex) - r.mu.proposals[proposal.idKey] = proposal - r.mu.Unlock() - return true - } - r.mu.Unlock() - - // Some tests check for this log message in the trace. - log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex") - if _, pErr := r.propose(proposal.ctx, proposal); pErr != nil { - // TODO(nvanbenschoten): Returning false here isn't ok. It will result - // in a proposal returning without a response or an error, which - // triggers a panic higher up in the stack. We need to fix this. - log.Warningf(proposal.ctx, "failed to repropose with new lease index: %s", pErr) - return false - } - return true -} - // maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes // any replicas and, if so, locks them for subsumption. See acquireMergeLock // for details about the lock itself. diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 4e2d28c0e225..a05d8b8593cf 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7849,6 +7849,88 @@ func TestReplicaRefreshMultiple(t *testing.T) { } } +// TestReplicaReproposalWithNewLeaseIndexError tests an interaction where a +// proposal is rejected beneath raft due an illegal lease index error and then +// hits an error when being reproposed. The expectation is that this error +// manages to make its way back to the client. +func TestReplicaReproposalWithNewLeaseIndexError(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + var tc testContext + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.Start(t, stopper) + + type magicKey struct{} + magicCtx := context.WithValue(ctx, magicKey{}, "foo") + + var c int32 // updated atomically + tc.repl.mu.Lock() + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { + if v := p.ctx.Value(magicKey{}); v != nil { + curAttempt := atomic.AddInt32(&c, 1) + switch curAttempt { + case 1: + // This is the first time the command is being given a max lease + // applied index. Set the index to that of the recently applied + // write. Two requests can't have the same lease applied index, + // so this will cause it to be rejected beneath raft with an + // illegal lease index error. + wrongLeaseIndex := uint64(1) + return wrongLeaseIndex, nil + case 2: + // This is the second time the command is being given a max + // lease applied index, which should be after the command was + // rejected beneath raft. Return an error. We expect this error + // to propagate up through tryReproposeWithNewLeaseIndex and + // make it back to the client. + return 0, errors.New("boom") + default: + // Unexpected. Asserted against below. + return 0, nil + } + } + return 0, nil + } + tc.repl.mu.Unlock() + + // Perform a few writes to advance the lease applied index. + const initCount = 3 + key := roachpb.Key("a") + for i := 0; i < initCount; i++ { + iArg := incrementArgs(key, 1) + if _, pErr := tc.SendWrapped(&iArg); pErr != nil { + t.Fatal(pErr) + } + } + + // Perform a write that will first hit an illegal lease index error and + // will then hit the injected error when we attempt to repropose it. + var ba roachpb.BatchRequest + iArg := incrementArgs(key, 10) + ba.Add(&iArg) + if _, pErr := tc.Sender().Send(magicCtx, ba); pErr == nil { + t.Fatal("expected a non-nil error") + } else if !testutils.IsPError(pErr, "boom") { + t.Fatalf("unexpected error: %v", pErr) + } + // The command should have picked a new max lease index exactly twice. + if exp, act := int32(2), atomic.LoadInt32(&c); exp != act { + t.Fatalf("expected %d proposals, got %d", exp, act) + } + + // The command should not have applied. + gArgs := getArgs(key) + if reply, pErr := tc.SendWrapped(&gArgs); pErr != nil { + t.Fatal(pErr) + } else if v, err := reply.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatal(err) + } else if v != initCount { + t.Fatalf("expected value of %d, found %d", initCount, v) + } +} + // TestGCWithoutThreshold validates that GCRequest only declares the threshold // key if it is subject to change, and that it does not access this key if it // does not declare them. @@ -7958,10 +8040,10 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { if err := testutils.MatchInOrder(formatted, // The first proposal is rejected. "retry proposal.*applied at lease index.*but required", - // The LocalResult is nil. This is the important part for this test. - "LocalResult: nil", // The request will be re-evaluated. "retry: proposalIllegalLeaseIndex", + // The LocalResult is nil. This is the important part for this test. + "LocalResult: nil", // Re-evaluation succeeds and one txn is to be updated. "LocalResult \\(reply.*#updated txns: 1", ); err != nil { diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 5a4945e977d6..6821f0268e69 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -56,6 +56,7 @@ type StoreTestingKnobs struct { // TestingPostApplyFilter is called after a command is applied to // rocksdb but before in-memory side effects have been processed. + // It is only called on the replica the proposed the command. TestingPostApplyFilter storagebase.ReplicaApplyFilter // TestingResponseFilter is called after the replica processes a