From 2d9630ad084db0e023902f512b3ca875d1f28424 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 23 Jul 2019 13:31:50 -0400 Subject: [PATCH 1/2] storage/rangefeed: read rangefeed values from batch instead of engine There are no known issues caused by this, but it seems bad. 509baff introduced batching of Raft entry application. An entire batch of entries is applied and then each of their LogicalOpLogs are consumed if the range has a running rangefeed processor. During this consumption phase, values may be read from the Store's engine (so that they don't need to be duplicated in an entries WriteBatch and LogicalOpLog). Since we're no longer performing this apply-then-consumer cycle one entry at a time, it seems possible for a later entry a batch to overwrite the value that an earlier entry in the batch wants to read in handleLogicalOpLogRaftMuLocked. This would cause a rangefeed to produce incorrect results. This commit fixes this issue by consuming logical ops as entries are staged in the batch instead of after the batch is applied. To facilitate this, the lookup in handleLogicalOpLogRaftMuLocked now operates on the WriteBatch directly instead of on the Store's engine. This is likely more efficient when the read is satisfied from what's in the batch (on MVCCWriteValueOp but likely not on MVCCCommitIntentOp). Either way, it simplifies this logic. Release note: None --- pkg/storage/replica_application.go | 13 +++++++++++++ pkg/storage/replica_application_result.go | 12 ------------ pkg/storage/replica_rangefeed.go | 17 ++++++++++------- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go index d0c910aa0e0c..f4144b1cb5fc 100644 --- a/pkg/storage/replica_application.go +++ b/pkg/storage/replica_application.go @@ -382,6 +382,19 @@ func (r *Replica) stageRaftCommand( log.Fatal(ctx, err) } } + + // Provide the command's corresponding logical operations to the Replica's + // rangefeed. Only do so if the WriteBatch is non-nil, in which case the + // rangefeed requires there to be a corresponding logical operation log or + // it will shut down with an error. If the WriteBatch is nil then we expect + // the logical operation log to also be nil. We don't want to trigger a + // shutdown of the rangefeed in that situation, so we don't pass anything to + // the rangefed. If no rangefeed is running at all, this call will be a noop. + if cmd.raftCmd.WriteBatch != nil { + r.handleLogicalOpLogRaftMuLocked(ctx, cmd.raftCmd.LogicalOpLog, batch) + } else if cmd.raftCmd.LogicalOpLog != nil { + log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.raftCmd) + } } func checkForcedErr( diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go index 3f4d67c62815..8257243626e5 100644 --- a/pkg/storage/replica_application_result.go +++ b/pkg/storage/replica_application_result.go @@ -363,18 +363,6 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { // finishRaftCommand is called after a command's side effects have been applied // in order to acknowledge clients and release latches. func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) { - - // Provide the command's corresponding logical operations to the - // Replica's rangefeed. Only do so if the WriteBatch is nonnil, - // otherwise it's valid for the logical op log to be nil, which - // would shut down all rangefeeds. If no rangefeed is running, - // this call will be a noop. - if cmd.raftCmd.WriteBatch != nil { - r.handleLogicalOpLogRaftMuLocked(ctx, cmd.raftCmd.LogicalOpLog) - } else if cmd.raftCmd.LogicalOpLog != nil { - log.Fatalf(ctx, "nonnil logical op log with nil write batch: %v", cmd.raftCmd) - } - // When set to true, recomputes the stats for the LHS and RHS of splits and // makes sure that they agree with the state's range stats. const expensiveSplitAssertion = false diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index 69cbda53d596..029bfffad266 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -405,9 +405,13 @@ func (r *Replica) numRangefeedRegistrations() int { } // handleLogicalOpLogRaftMuLocked passes the logical op log to the active -// rangefeed, if one is running. No-op if a rangefeed is not active. Requires +// rangefeed, if one is running. The method accepts a reader, which is used to +// look up the values associated with key-value writes in the log before handing +// them to the rangefeed processor. No-op if a rangefeed is not active. Requires // raftMu to be locked. -func (r *Replica) handleLogicalOpLogRaftMuLocked(ctx context.Context, ops *storagepb.LogicalOpLog) { +func (r *Replica) handleLogicalOpLogRaftMuLocked( + ctx context.Context, ops *storagepb.LogicalOpLog, reader engine.Reader, +) { p := r.getRangefeedProcessor() if p == nil { return @@ -429,8 +433,7 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked(ctx context.Context, ops *stora } // When reading straight from the Raft log, some logical ops will not be - // fully populated. Read from the engine (under raftMu) to populate all - // fields. + // fully populated. Read from the Reader to populate all fields. for _, op := range ops.Ops { var key []byte var ts hlc.Timestamp @@ -450,12 +453,12 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked(ctx context.Context, ops *stora panic(fmt.Sprintf("unknown logical op %T", t)) } - // Read the value directly from the Engine. This is performed in the + // Read the value directly from the Reader. This is performed in the // same raftMu critical section that the logical op's corresponding // WriteBatch is applied, so the value should exist. - val, _, err := engine.MVCCGet(ctx, r.Engine(), key, ts, engine.MVCCGetOptions{Tombstones: true}) + val, _, err := engine.MVCCGet(ctx, reader, key, ts, engine.MVCCGetOptions{Tombstones: true}) if val == nil && err == nil { - err = errors.New("value missing in engine") + err = errors.New("value missing in reader") } if err != nil { r.disconnectRangefeedWithErr(p, roachpb.NewErrorf( From aada4fcf3ff410d83a738cf16696e37357cb8d4e Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 23 Jul 2019 21:35:53 -0400 Subject: [PATCH 2/2] storage: prevent command reproposal with new lease index after application Fixes #39018. Fixes #37810. May fix other tests. This commit fixes a bug introduced in e4ce717 which allowed a single Raft proposal to be applied multiple times at multiple applied indexes. The bug was possible if a raft proposal was reproposed twice, once with the same max lease index and once with a new max lease index. Because there are two entries for the same proposal, one necessarily has to have an invalid max lease applied index (see the invariant in https://github.com/cockroachdb/cockroach/blob/5cbc4b50712546465de75dba69a1c0cdd1fe2f87/pkg/storage/replica_raft.go#L1430) If these two entries happened to land in the same application batch on the leaseholder then the first entry would be rejected and the second would apply. The replicas LeaseAppliedIndex would then be bumped all the way to the max lease index of the entry that applied. The bug occurred when the first entry (who must have hit a proposalIllegalLeaseIndex), called into tryReproposeWithNewLeaseIndex. The ProposalData's MaxLeaseIndex would be equal to the Replica's LeaseAppliedIndex because it would match the index in the successful entry. We would then repropose the proposal with a larger lease applied index. This new entry could then apply and result in duplicate entry application. Luckily, rangefeed's intent reference counting was sensitive enough that it caught this duplicate entry application and panicked loudly. Other tests might also be failing because of it but might not have as obvious of symptoms when they hit the bug. In addition to this primary bug fix, this commit has a secondary effect of fixing an issue where two entries for the same command could be in the same batch and only one would be linked to its ProposalData struct and be considered locally proposed (see the change in retrieveLocalProposals). I believe that this would prevent the command from being properly acknowledged if the first entry was rejected due to its max lease index and the second entry had a larger max lease index and applied. I think the first entry would have eventually hit the check in tryReproposeWithNewLeaseIndex and observed that the linked ProposalData had a new MaxLeaseIndex so it would have added it back to the proposal map, but then it would have had to wait for refreshProposalsLocked to refresh the proposal, at which point this refresh would have hit a lease index error and would be reproposed at a higher index. Not only could this cause duplicate versions of the same command to apply (described above), but I think this could even loop forever without acknowledging the client. It seems like there should be a way for this to cause #39022, but it doesn't exactly line up. Again, these kinds of cases will be easier to test once we properly mock out these interfaces in #38954. I'm working on that, but I don't think it should hold up the next alpha (#39036). However, this commit does address a TODO to properly handle errors during tryReproposeWithNewLeaseIndex reproposals and that is properly tested. My debugging process to track this down was to repeatedly run a set of 10 `cdc/ledger/rangefeed=true` roachtests after reducing its duration down to 5m. Usually, at least one of these tests would hit the `negative refcount` assertion. I then incrementally added more and more logging around entry application until I painted a full picture of which logical ops were being consumed by the rangefeed processor and why the same raft command was being applied twice (once it became clear that one was). After a few more rounds of fine-tuning the logging, the interaction with reproposals with a new max lease index became clear. Release note: None --- pkg/storage/replica_application.go | 58 ++++++++-- pkg/storage/replica_application_result.go | 129 +++++++++++++++------- pkg/storage/replica_proposal.go | 21 +++- pkg/storage/replica_raft.go | 50 --------- pkg/storage/replica_test.go | 86 ++++++++++++++- pkg/storage/testing_knobs.go | 1 + 6 files changed, 244 insertions(+), 101 deletions(-) diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go index f4144b1cb5fc..fc86a6bffc0e 100644 --- a/pkg/storage/replica_application.go +++ b/pkg/storage/replica_application.go @@ -154,25 +154,65 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { // Copy stats as it gets updated in-place in applyRaftCommandToBatch. b.replicaState.Stats = &b.stats *b.replicaState.Stats = *r.mu.state.Stats + // Assign all the local proposals first then delete all of them from the map + // in a second pass. This ensures that we retrieve all proposals correctly + // even if the batch has multiple entries for the same proposal, in which + // case the proposal was reproposed (either under its original or a new + // MaxLeaseIndex) which we handle in a second pass below. + var anyLocal bool var it cmdAppCtxBufIterator - haveProposalQuota := r.mu.proposalQuota != nil for ok := it.init(&b.cmdBuf); ok; ok = it.next() { cmd := it.cur() cmd.proposal = r.mu.proposals[cmd.idKey] if cmd.proposedLocally() { // We initiated this command, so use the caller-supplied context. cmd.ctx = cmd.proposal.ctx - delete(r.mu.proposals, cmd.idKey) - // At this point we're not guaranteed to have proposalQuota initialized, - // the same is true for quotaReleaseQueues. Only queue the proposal's - // quota for release if the proposalQuota is initialized. - if haveProposalQuota { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) - } + anyLocal = true } else { cmd.ctx = ctx } } + if !anyLocal { + // Fast-path. + return + } + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + if !cmd.proposedLocally() { + continue + } + if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { + // If this entry does not have the most up-to-date view of the + // corresponding proposal's maximum lease index then the proposal + // must have been reproposed with a higher lease index. (see + // tryReproposeWithNewLeaseIndex). In that case, there's a newer + // version of the proposal in the pipeline, so don't remove the + // proposal from the map. We expect this entry to be rejected by + // checkForcedErr. + continue + } + // Delete the proposal from the proposals map. There may be reproposals + // of the proposal in the pipeline, but those will all have the same max + // lease index, meaning that they will all be rejected after this entry + // applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex + // picks up the proposal on failure, it will re-add the proposal to the + // proposal map, but this won't affect anything in this cmdAppBatch. + // + // While here, add the proposal's quota size to the quota release queue. + // We check the proposal map again first to avoid double free-ing quota + // when reproposals from the same proposal end up in the same entry + // application batch. + if _, ok := r.mu.proposals[cmd.idKey]; !ok { + continue + } + delete(r.mu.proposals, cmd.idKey) + // At this point we're not guaranteed to have proposalQuota initialized, + // the same is true for quotaReleaseQueues. Only queue the proposal's + // quota for release if the proposalQuota is initialized. + if r.mu.proposalQuota != nil { + r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) + } + } } // stageRaftCommand handles the first phase of applying a command to the @@ -856,7 +896,7 @@ func (r *Replica) applyCmdAppBatch( } cmd.replicatedResult().SuggestedCompactions = nil isNonTrivial := batchIsNonTrivial && it.isLast() - if errExpl, err = r.handleRaftCommandResult(ctx, cmd, isNonTrivial, + if errExpl, err = r.handleRaftCommandResult(cmd.ctx, cmd, isNonTrivial, b.replicaState.UsingAppliedStateKey); err != nil { return errExpl, err } diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go index 8257243626e5..24ca050bde12 100644 --- a/pkg/storage/replica_application_result.go +++ b/pkg/storage/replica_application_result.go @@ -160,7 +160,11 @@ func (r *Replica) handleRaftCommandResult( ) (errExpl string, err error) { // Set up the local result prior to handling the ReplicatedEvalResult to // give testing knobs an opportunity to inspect it. - r.prepareLocalResult(cmd.ctx, cmd) + r.prepareLocalResult(ctx, cmd) + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEvent(ctx, 2, cmd.localResult.String()) + } + // Handle the ReplicatedEvalResult, executing any side effects of the last // state machine transition. // @@ -168,7 +172,7 @@ func (r *Replica) handleRaftCommandResult( // before notifying a potentially waiting client. clearTrivialReplicatedEvalResultFields(cmd.replicatedResult(), usingAppliedStateKey) if isNonTrivial { - r.handleComplexReplicatedEvalResult(cmd.ctx, *cmd.replicatedResult()) + r.handleComplexReplicatedEvalResult(ctx, *cmd.replicatedResult()) } else if !cmd.replicatedResult().Equal(storagepb.ReplicatedEvalResult{}) { log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v", cmd.replicatedResult()) @@ -186,13 +190,13 @@ func (r *Replica) handleRaftCommandResult( } if cmd.localResult != nil { - r.handleLocalEvalResult(cmd.ctx, *cmd.localResult) + r.handleLocalEvalResult(ctx, *cmd.localResult) } - r.finishRaftCommand(cmd.ctx, cmd) + r.finishRaftCommand(ctx, cmd) switch cmd.e.Type { case raftpb.EntryNormal: if cmd.replicatedResult().ChangeReplicas != nil { - log.Fatalf(cmd.ctx, "unexpected replication change from command %s", &cmd.raftCmd) + log.Fatalf(ctx, "unexpected replication change from command %s", &cmd.raftCmd) } case raftpb.EntryConfChange: if cmd.replicatedResult().ChangeReplicas == nil { @@ -307,6 +311,10 @@ func (r *Replica) handleComplexReplicatedEvalResult( // result will zero-out the struct to ensure that is has properly performed all // of the implied side-effects. func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { + if !cmd.proposedLocally() { + return + } + var pErr *roachpb.Error if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; filter != nil { var newPropRetry int @@ -328,36 +336,90 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { pErr = cmd.forcedErr } - if cmd.proposedLocally() { - if cmd.proposalRetry != proposalNoReevaluation && pErr == nil { - log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal) - } - if pErr != nil { - // A forced error was set (i.e. we did not apply the proposal, - // for instance due to its log position) or the Replica is now - // corrupted. - // If proposalRetry is set, we don't also return an error, as per the - // proposalResult contract. - if cmd.proposalRetry == proposalNoReevaluation { + if cmd.proposalRetry != proposalNoReevaluation && pErr == nil { + log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal) + } + if pErr != nil { + // A forced error was set (i.e. we did not apply the proposal, + // for instance due to its log position) or the Replica is now + // corrupted. + switch cmd.proposalRetry { + case proposalNoReevaluation: + cmd.response.Err = pErr + case proposalIllegalLeaseIndex: + // If we failed to apply at the right lease index, try again with a + // new one. This is important for pipelined writes, since they don't + // have a client watching to retry, so a failure to eventually apply + // the proposal would be a user-visible error. + pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd) + if pErr != nil { cmd.response.Err = pErr + } else { + // Unbind the entry's local proposal because we just succeeded + // in reproposing it and we don't want to acknowledge the client + // yet. + cmd.proposal = nil + return } - } else if cmd.proposal.Local.Reply != nil { - cmd.response.Reply = cmd.proposal.Local.Reply - } else { - log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal) - } - cmd.response.Intents = cmd.proposal.Local.DetachIntents() - cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) - if pErr == nil { - cmd.localResult = cmd.proposal.Local + default: + panic("unexpected") } + } else if cmd.proposal.Local.Reply != nil { + cmd.response.Reply = cmd.proposal.Local.Reply + } else { + log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal) } - if pErr != nil && cmd.localResult != nil { + cmd.response.Intents = cmd.proposal.Local.DetachIntents() + cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + if pErr == nil { + cmd.localResult = cmd.proposal.Local + } else if cmd.localResult != nil { log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) } - if log.ExpensiveLogEnabled(ctx, 2) { - log.VEvent(ctx, 2, cmd.localResult.String()) +} + +// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose +// commands that have gotten an illegal lease index error, and that we know +// could not have applied while their lease index was valid (that is, we +// observed all applied entries between proposal and the lease index becoming +// invalid, as opposed to skipping some of them by applying a snapshot). +// +// It is not intended for use elsewhere and is only a top-level function so that +// it can avoid the below_raft_protos check. Returns a nil error if the command +// has already been successfully applied or has been reproposed here or by a +// different entry for the same proposal that hit an illegal lease index error. +func (r *Replica) tryReproposeWithNewLeaseIndex( + ctx context.Context, cmd *cmdAppCtx, +) *roachpb.Error { + // Note that we don't need to validate anything about the proposal's + // lease here - if we got this far, we know that everything but the + // index is valid at this point in the log. + p := cmd.proposal + if p.applied || cmd.raftCmd.MaxLeaseIndex != p.command.MaxLeaseIndex { + // If the command associated with this rejected raft entry already + // applied then we don't want to repropose it. Doing so could lead + // to duplicate application of the same proposal. + // + // Similarly, if the command associated with this rejected raft + // entry has a different (larger) MaxLeaseIndex than the one we + // decoded from the entry itself, the command must have already + // been reproposed (this can happen if there are multiple copies + // of the command in the logs; see TestReplicaRefreshMultiple). + // We must not create multiple copies with multiple lease indexes, + // so don't repropose it again. This ensures that at any time, + // there is only up to a single lease index that has a chance of + // succeeding in the Raft log for a given command. + return nil } + // Some tests check for this log message in the trace. + log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") + maxLeaseIndex, pErr := r.propose(ctx, p) + if pErr != nil { + log.Warningf(ctx, "failed to repropose with new lease index: %s", pErr) + return pErr + } + log.VEventf(ctx, 2, "reproposed command %x at maxLeaseIndex=%d", cmd.idKey, maxLeaseIndex) + return nil } // finishRaftCommand is called after a command's side effects have been applied @@ -395,17 +457,6 @@ func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) { } if cmd.proposedLocally() { - // If we failed to apply at the right lease index, try again with - // a new one. This is important for pipelined writes, since they - // don't have a client watching to retry, so a failure to - // eventually apply the proposal would be a uservisible error. - // TODO(nvanbenschoten): This reproposal is not tracked by the - // quota pool. We should fix that. - if cmd.proposalRetry == proposalIllegalLeaseIndex && - r.tryReproposeWithNewLeaseIndex(cmd.proposal) { - return - } - // Otherwise, signal the command's status to the client. cmd.proposal.finishApplication(cmd.response) } else if cmd.response.Err != nil { log.VEventf(ctx, 1, "applying raft command resulted in error: %s", cmd.response.Err) diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 3ff7e608700f..02a7269a1adc 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -87,6 +87,11 @@ type ProposalData struct { // cache and release latches. ec endCmds + // applied is set when the a command finishes application. It is used to + // avoid reproposing a failed proposal if an earlier version of the same + // proposal succeeded in applying. + applied bool + // doneCh is used to signal the waiting RPC handler (the contents of // proposalResult come from LocalEvalResult). // @@ -122,11 +127,25 @@ type ProposalData struct { // is canceled, it won't be listening to this done channel, and so it can't be // counted on to invoke endCmds itself.) func (proposal *ProposalData) finishApplication(pr proposalResult) { + if proposal.applied { + // If the command already applied then we shouldn't be "finishing" its + // application again because it should only be able to apply successfully + // once. We expect that when any reproposal for the same command attempts + // to apply it will be rejected by the below raft lease sequence or lease + // index check in checkForcedErr. + if pr.Err != nil { + return + } + log.Fatalf(proposal.ctx, + "command already applied: %+v; unexpected successful result: %+v", proposal, pr) + } + proposal.applied = true proposal.ec.done(proposal.Request, pr.Reply, pr.Err) + proposal.signalProposalResult(pr) if proposal.sp != nil { tracing.FinishSpan(proposal.sp) + proposal.sp = nil } - proposal.signalProposalResult(pr) } // returnProposalResult signals proposal.doneCh with the proposal result if it diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index e11ae2529640..068d609ed7b6 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1404,56 +1404,6 @@ func (m lastUpdateTimesMap) isFollowerActive( return now.Sub(lastUpdateTime) <= MaxQuotaReplicaLivenessDuration } -// tryReproposeWithNewLeaseIndex is used by processRaftCommand to -// repropose commands that have gotten an illegal lease index error, -// and that we know could not have applied while their lease index was -// valid (that is, we observed all applied entries between proposal -// and the lease index becoming invalid, as opposed to skipping some -// of them by applying a snapshot). -// -// It is not intended for use elsewhere and is only a top-level -// function so that it can avoid the below_raft_protos check. Returns -// true if the command has been successfully reproposed (not -// necessarily by this method! But if this method returns true, the -// command will be in the local proposals map). -func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { - // Note that we don't need to validate anything about the proposal's - // lease here - if we got this far, we know that everything but the - // index is valid at this point in the log. - r.mu.Lock() - if proposal.command.MaxLeaseIndex > r.mu.state.LeaseAppliedIndex { - // If the command's MaxLeaseIndex is greater than the - // LeaseAppliedIndex, it must have already been reproposed (this - // can happen if there are multiple copies of the command in the - // logs; see TestReplicaRefreshMultiple). We must not create - // multiple copies with multiple lease indexes, so don't repropose - // it again. This ensures that at any time, there is only up to a - // single lease index that has a chance of succeeding in the Raft - // log for a given command. - // - // Note that the caller has already removed the current version of - // the proposal from the pending proposals map. We must re-add it - // since it's still pending. - log.VEventf(proposal.ctx, 2, "skipping reproposal, already reproposed at index %d", - proposal.command.MaxLeaseIndex) - r.mu.proposals[proposal.idKey] = proposal - r.mu.Unlock() - return true - } - r.mu.Unlock() - - // Some tests check for this log message in the trace. - log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex") - if _, pErr := r.propose(proposal.ctx, proposal); pErr != nil { - // TODO(nvanbenschoten): Returning false here isn't ok. It will result - // in a proposal returning without a response or an error, which - // triggers a panic higher up in the stack. We need to fix this. - log.Warningf(proposal.ctx, "failed to repropose with new lease index: %s", pErr) - return false - } - return true -} - // maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes // any replicas and, if so, locks them for subsumption. See acquireMergeLock // for details about the lock itself. diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 4e2d28c0e225..a05d8b8593cf 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7849,6 +7849,88 @@ func TestReplicaRefreshMultiple(t *testing.T) { } } +// TestReplicaReproposalWithNewLeaseIndexError tests an interaction where a +// proposal is rejected beneath raft due an illegal lease index error and then +// hits an error when being reproposed. The expectation is that this error +// manages to make its way back to the client. +func TestReplicaReproposalWithNewLeaseIndexError(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + var tc testContext + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.Start(t, stopper) + + type magicKey struct{} + magicCtx := context.WithValue(ctx, magicKey{}, "foo") + + var c int32 // updated atomically + tc.repl.mu.Lock() + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { + if v := p.ctx.Value(magicKey{}); v != nil { + curAttempt := atomic.AddInt32(&c, 1) + switch curAttempt { + case 1: + // This is the first time the command is being given a max lease + // applied index. Set the index to that of the recently applied + // write. Two requests can't have the same lease applied index, + // so this will cause it to be rejected beneath raft with an + // illegal lease index error. + wrongLeaseIndex := uint64(1) + return wrongLeaseIndex, nil + case 2: + // This is the second time the command is being given a max + // lease applied index, which should be after the command was + // rejected beneath raft. Return an error. We expect this error + // to propagate up through tryReproposeWithNewLeaseIndex and + // make it back to the client. + return 0, errors.New("boom") + default: + // Unexpected. Asserted against below. + return 0, nil + } + } + return 0, nil + } + tc.repl.mu.Unlock() + + // Perform a few writes to advance the lease applied index. + const initCount = 3 + key := roachpb.Key("a") + for i := 0; i < initCount; i++ { + iArg := incrementArgs(key, 1) + if _, pErr := tc.SendWrapped(&iArg); pErr != nil { + t.Fatal(pErr) + } + } + + // Perform a write that will first hit an illegal lease index error and + // will then hit the injected error when we attempt to repropose it. + var ba roachpb.BatchRequest + iArg := incrementArgs(key, 10) + ba.Add(&iArg) + if _, pErr := tc.Sender().Send(magicCtx, ba); pErr == nil { + t.Fatal("expected a non-nil error") + } else if !testutils.IsPError(pErr, "boom") { + t.Fatalf("unexpected error: %v", pErr) + } + // The command should have picked a new max lease index exactly twice. + if exp, act := int32(2), atomic.LoadInt32(&c); exp != act { + t.Fatalf("expected %d proposals, got %d", exp, act) + } + + // The command should not have applied. + gArgs := getArgs(key) + if reply, pErr := tc.SendWrapped(&gArgs); pErr != nil { + t.Fatal(pErr) + } else if v, err := reply.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatal(err) + } else if v != initCount { + t.Fatalf("expected value of %d, found %d", initCount, v) + } +} + // TestGCWithoutThreshold validates that GCRequest only declares the threshold // key if it is subject to change, and that it does not access this key if it // does not declare them. @@ -7958,10 +8040,10 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { if err := testutils.MatchInOrder(formatted, // The first proposal is rejected. "retry proposal.*applied at lease index.*but required", - // The LocalResult is nil. This is the important part for this test. - "LocalResult: nil", // The request will be re-evaluated. "retry: proposalIllegalLeaseIndex", + // The LocalResult is nil. This is the important part for this test. + "LocalResult: nil", // Re-evaluation succeeds and one txn is to be updated. "LocalResult \\(reply.*#updated txns: 1", ); err != nil { diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 5a4945e977d6..6821f0268e69 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -56,6 +56,7 @@ type StoreTestingKnobs struct { // TestingPostApplyFilter is called after a command is applied to // rocksdb but before in-memory side effects have been processed. + // It is only called on the replica the proposed the command. TestingPostApplyFilter storagebase.ReplicaApplyFilter // TestingResponseFilter is called after the replica processes a