Skip to content

Commit

Permalink
Merge #39064
Browse files Browse the repository at this point in the history
39064: storage: prevent command reproposal with new lease index after application r=nvanbenschoten a=nvanbenschoten

Fixes #39018.
Fixes #37810.
May fix other tests.

This commit fixes a bug introduced in e4ce717 which allowed a single Raft proposal to be applied multiple times at multiple applied indexes. The bug was possible if a raft proposal was reproposed twice, once with the same max lease index and once with a new max lease index. Because there are two entries for the same proposal, one necessarily has to have an invalid max lease applied index (see the invariant in https://github.com/cockroachdb/cockroach/blob/5cbc4b50712546465de75dba69a1c0cdd1fe2f87/pkg/storage/replica_raft.go#L1430) If these two entries happened to land in the same application batch on the leaseholder then the first entry would be rejected and the second would apply. The replicas LeaseAppliedIndex would then be bumped all the way to the max lease index of the entry that applied. The bug occurred when the first entry (who must have hit a proposalIllegalLeaseIndex), called into tryReproposeWithNewLeaseIndex. The ProposalData's MaxLeaseIndex would be equal to the Replica's LeaseAppliedIndex because it would match the index in the successful entry. We would then repropose the proposal with a larger lease applied index. This new entry could then apply and result in duplicate entry application.

Luckily, rangefeed's intent reference counting was sensitive enough that it caught this duplicate entry application and panicked loudly. Other tests might also be failing because of it but might not have as obvious symptoms when they hit the bug.

In addition to this primary bug fix, this commit has a secondary effect of fixing an issue where two entries for the same command could be in the same batch and only one would be linked to its ProposalData struct and be considered locally proposed (see the change in retrieveLocalProposals). I believe that this would prevent the command from being properly acknowledged if the first entry was rejected due to its max lease index and the second entry had a larger max lease index and applied. I think the first entry would have eventually hit the check in tryReproposeWithNewLeaseIndex and observed that the linked ProposalData had a new MaxLeaseIndex so it would have added it back to the proposal map, but then it would have had to wait for refreshProposalsLocked to refresh the proposal, at which point this refresh would have hit a lease index error and would be reproposed at a higher index. Not only could this cause duplicate versions of the same command to apply (described above), but I think this could even loop forever without acknowledging the client. It seems like there should be a way for this to cause #39022, but it doesn't exactly line up.

Again, these kinds of cases will be easier to test once we properly mock out these interfaces in #38954. I'm working on that, but I don't think it should hold up the next alpha (#39036). However, this commit does address a TODO to properly handle errors during tryReproposeWithNewLeaseIndex reproposals and that is properly tested.

My debugging process to track this down was to repeatedly run a set of 10 `cdc/ledger/rangefeed=true` roachtests after reducing its duration down to 5m. Usually, at least one of these tests would hit the `negative refcount` assertion. I then incrementally added more and more logging around entry application until I painted a full picture of which logical ops were being consumed by the rangefeed processor and why the same raft command was being applied twice (once it became clear that one was). After a few more rounds of fine-tuning the logging, the interaction with reproposals with a new max lease index became clear.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jul 24, 2019
2 parents 89b0a16 + aada4fc commit ed40e07
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 120 deletions.
71 changes: 62 additions & 9 deletions pkg/storage/replica_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,25 +154,65 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) {
// Copy stats as it gets updated in-place in applyRaftCommandToBatch.
b.replicaState.Stats = &b.stats
*b.replicaState.Stats = *r.mu.state.Stats
// Assign all the local proposals first then delete all of them from the map
// in a second pass. This ensures that we retrieve all proposals correctly
// even if the batch has multiple entries for the same proposal, in which
// case the proposal was reproposed (either under its original or a new
// MaxLeaseIndex) which we handle in a second pass below.
var anyLocal bool
var it cmdAppCtxBufIterator
haveProposalQuota := r.mu.proposalQuota != nil
for ok := it.init(&b.cmdBuf); ok; ok = it.next() {
cmd := it.cur()
cmd.proposal = r.mu.proposals[cmd.idKey]
if cmd.proposedLocally() {
// We initiated this command, so use the caller-supplied context.
cmd.ctx = cmd.proposal.ctx
delete(r.mu.proposals, cmd.idKey)
// At this point we're not guaranteed to have proposalQuota initialized,
// the same is true for quotaReleaseQueues. Only queue the proposal's
// quota for release if the proposalQuota is initialized.
if haveProposalQuota {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize)
}
anyLocal = true
} else {
cmd.ctx = ctx
}
}
if !anyLocal {
// Fast-path.
return
}
for ok := it.init(&b.cmdBuf); ok; ok = it.next() {
cmd := it.cur()
if !cmd.proposedLocally() {
continue
}
if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't remove the
// proposal from the map. We expect this entry to be rejected by
// checkForcedErr.
continue
}
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
// applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex
// picks up the proposal on failure, it will re-add the proposal to the
// proposal map, but this won't affect anything in this cmdAppBatch.
//
// While here, add the proposal's quota size to the quota release queue.
// We check the proposal map again first to avoid double free-ing quota
// when reproposals from the same proposal end up in the same entry
// application batch.
if _, ok := r.mu.proposals[cmd.idKey]; !ok {
continue
}
delete(r.mu.proposals, cmd.idKey)
// At this point we're not guaranteed to have proposalQuota initialized,
// the same is true for quotaReleaseQueues. Only queue the proposal's
// quota for release if the proposalQuota is initialized.
if r.mu.proposalQuota != nil {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize)
}
}
}

// stageRaftCommand handles the first phase of applying a command to the
Expand Down Expand Up @@ -382,6 +422,19 @@ func (r *Replica) stageRaftCommand(
log.Fatal(ctx, err)
}
}

// Provide the command's corresponding logical operations to the Replica's
// rangefeed. Only do so if the WriteBatch is non-nil, in which case the
// rangefeed requires there to be a corresponding logical operation log or
// it will shut down with an error. If the WriteBatch is nil then we expect
// the logical operation log to also be nil. We don't want to trigger a
// shutdown of the rangefeed in that situation, so we don't pass anything to
// the rangefed. If no rangefeed is running at all, this call will be a noop.
if cmd.raftCmd.WriteBatch != nil {
r.handleLogicalOpLogRaftMuLocked(ctx, cmd.raftCmd.LogicalOpLog, batch)
} else if cmd.raftCmd.LogicalOpLog != nil {
log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.raftCmd)
}
}

func checkForcedErr(
Expand Down Expand Up @@ -843,7 +896,7 @@ func (r *Replica) applyCmdAppBatch(
}
cmd.replicatedResult().SuggestedCompactions = nil
isNonTrivial := batchIsNonTrivial && it.isLast()
if errExpl, err = r.handleRaftCommandResult(ctx, cmd, isNonTrivial,
if errExpl, err = r.handleRaftCommandResult(cmd.ctx, cmd, isNonTrivial,
b.replicaState.UsingAppliedStateKey); err != nil {
return errExpl, err
}
Expand Down
141 changes: 90 additions & 51 deletions pkg/storage/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,19 @@ func (r *Replica) handleRaftCommandResult(
) (errExpl string, err error) {
// Set up the local result prior to handling the ReplicatedEvalResult to
// give testing knobs an opportunity to inspect it.
r.prepareLocalResult(cmd.ctx, cmd)
r.prepareLocalResult(ctx, cmd)
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEvent(ctx, 2, cmd.localResult.String())
}

// Handle the ReplicatedEvalResult, executing any side effects of the last
// state machine transition.
//
// Note that this must happen after committing (the engine.Batch), but
// before notifying a potentially waiting client.
clearTrivialReplicatedEvalResultFields(cmd.replicatedResult(), usingAppliedStateKey)
if isNonTrivial {
r.handleComplexReplicatedEvalResult(cmd.ctx, *cmd.replicatedResult())
r.handleComplexReplicatedEvalResult(ctx, *cmd.replicatedResult())
} else if !cmd.replicatedResult().Equal(storagepb.ReplicatedEvalResult{}) {
log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v",
cmd.replicatedResult())
Expand All @@ -186,13 +190,13 @@ func (r *Replica) handleRaftCommandResult(
}

if cmd.localResult != nil {
r.handleLocalEvalResult(cmd.ctx, *cmd.localResult)
r.handleLocalEvalResult(ctx, *cmd.localResult)
}
r.finishRaftCommand(cmd.ctx, cmd)
r.finishRaftCommand(ctx, cmd)
switch cmd.e.Type {
case raftpb.EntryNormal:
if cmd.replicatedResult().ChangeReplicas != nil {
log.Fatalf(cmd.ctx, "unexpected replication change from command %s", &cmd.raftCmd)
log.Fatalf(ctx, "unexpected replication change from command %s", &cmd.raftCmd)
}
case raftpb.EntryConfChange:
if cmd.replicatedResult().ChangeReplicas == nil {
Expand Down Expand Up @@ -307,6 +311,10 @@ func (r *Replica) handleComplexReplicatedEvalResult(
// result will zero-out the struct to ensure that is has properly performed all
// of the implied side-effects.
func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) {
if !cmd.proposedLocally() {
return
}

var pErr *roachpb.Error
if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; filter != nil {
var newPropRetry int
Expand All @@ -328,53 +336,95 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) {
pErr = cmd.forcedErr
}

if cmd.proposedLocally() {
if cmd.proposalRetry != proposalNoReevaluation && pErr == nil {
log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal)
}
if pErr != nil {
// A forced error was set (i.e. we did not apply the proposal,
// for instance due to its log position) or the Replica is now
// corrupted.
// If proposalRetry is set, we don't also return an error, as per the
// proposalResult contract.
if cmd.proposalRetry == proposalNoReevaluation {
if cmd.proposalRetry != proposalNoReevaluation && pErr == nil {
log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal)
}
if pErr != nil {
// A forced error was set (i.e. we did not apply the proposal,
// for instance due to its log position) or the Replica is now
// corrupted.
switch cmd.proposalRetry {
case proposalNoReevaluation:
cmd.response.Err = pErr
case proposalIllegalLeaseIndex:
// If we failed to apply at the right lease index, try again with a
// new one. This is important for pipelined writes, since they don't
// have a client watching to retry, so a failure to eventually apply
// the proposal would be a user-visible error.
pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd)
if pErr != nil {
cmd.response.Err = pErr
} else {
// Unbind the entry's local proposal because we just succeeded
// in reproposing it and we don't want to acknowledge the client
// yet.
cmd.proposal = nil
return
}
} else if cmd.proposal.Local.Reply != nil {
cmd.response.Reply = cmd.proposal.Local.Reply
} else {
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal)
}
cmd.response.Intents = cmd.proposal.Local.DetachIntents()
cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil)
if pErr == nil {
cmd.localResult = cmd.proposal.Local
default:
panic("unexpected")
}
} else if cmd.proposal.Local.Reply != nil {
cmd.response.Reply = cmd.proposal.Local.Reply
} else {
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal)
}
if pErr != nil && cmd.localResult != nil {
cmd.response.Intents = cmd.proposal.Local.DetachIntents()
cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil)
if pErr == nil {
cmd.localResult = cmd.proposal.Local
} else if cmd.localResult != nil {
log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr)
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEvent(ctx, 2, cmd.localResult.String())
}

// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose
// commands that have gotten an illegal lease index error, and that we know
// could not have applied while their lease index was valid (that is, we
// observed all applied entries between proposal and the lease index becoming
// invalid, as opposed to skipping some of them by applying a snapshot).
//
// It is not intended for use elsewhere and is only a top-level function so that
// it can avoid the below_raft_protos check. Returns a nil error if the command
// has already been successfully applied or has been reproposed here or by a
// different entry for the same proposal that hit an illegal lease index error.
func (r *Replica) tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *cmdAppCtx,
) *roachpb.Error {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
p := cmd.proposal
if p.applied || cmd.raftCmd.MaxLeaseIndex != p.command.MaxLeaseIndex {
// If the command associated with this rejected raft entry already
// applied then we don't want to repropose it. Doing so could lead
// to duplicate application of the same proposal.
//
// Similarly, if the command associated with this rejected raft
// entry has a different (larger) MaxLeaseIndex than the one we
// decoded from the entry itself, the command must have already
// been reproposed (this can happen if there are multiple copies
// of the command in the logs; see TestReplicaRefreshMultiple).
// We must not create multiple copies with multiple lease indexes,
// so don't repropose it again. This ensures that at any time,
// there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
return nil
}
// Some tests check for this log message in the trace.
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")
maxLeaseIndex, pErr := r.propose(ctx, p)
if pErr != nil {
log.Warningf(ctx, "failed to repropose with new lease index: %s", pErr)
return pErr
}
log.VEventf(ctx, 2, "reproposed command %x at maxLeaseIndex=%d", cmd.idKey, maxLeaseIndex)
return nil
}

// finishRaftCommand is called after a command's side effects have been applied
// in order to acknowledge clients and release latches.
func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) {

// Provide the command's corresponding logical operations to the
// Replica's rangefeed. Only do so if the WriteBatch is nonnil,
// otherwise it's valid for the logical op log to be nil, which
// would shut down all rangefeeds. If no rangefeed is running,
// this call will be a noop.
if cmd.raftCmd.WriteBatch != nil {
r.handleLogicalOpLogRaftMuLocked(ctx, cmd.raftCmd.LogicalOpLog)
} else if cmd.raftCmd.LogicalOpLog != nil {
log.Fatalf(ctx, "nonnil logical op log with nil write batch: %v", cmd.raftCmd)
}

// When set to true, recomputes the stats for the LHS and RHS of splits and
// makes sure that they agree with the state's range stats.
const expensiveSplitAssertion = false
Expand Down Expand Up @@ -407,17 +457,6 @@ func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) {
}

if cmd.proposedLocally() {
// If we failed to apply at the right lease index, try again with
// a new one. This is important for pipelined writes, since they
// don't have a client watching to retry, so a failure to
// eventually apply the proposal would be a uservisible error.
// TODO(nvanbenschoten): This reproposal is not tracked by the
// quota pool. We should fix that.
if cmd.proposalRetry == proposalIllegalLeaseIndex &&
r.tryReproposeWithNewLeaseIndex(cmd.proposal) {
return
}
// Otherwise, signal the command's status to the client.
cmd.proposal.finishApplication(cmd.response)
} else if cmd.response.Err != nil {
log.VEventf(ctx, 1, "applying raft command resulted in error: %s", cmd.response.Err)
Expand Down
21 changes: 20 additions & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ type ProposalData struct {
// cache and release latches.
ec endCmds

// applied is set when the a command finishes application. It is used to
// avoid reproposing a failed proposal if an earlier version of the same
// proposal succeeded in applying.
applied bool

// doneCh is used to signal the waiting RPC handler (the contents of
// proposalResult come from LocalEvalResult).
//
Expand Down Expand Up @@ -122,11 +127,25 @@ type ProposalData struct {
// is canceled, it won't be listening to this done channel, and so it can't be
// counted on to invoke endCmds itself.)
func (proposal *ProposalData) finishApplication(pr proposalResult) {
if proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
if pr.Err != nil {
return
}
log.Fatalf(proposal.ctx,
"command already applied: %+v; unexpected successful result: %+v", proposal, pr)
}
proposal.applied = true
proposal.ec.done(proposal.Request, pr.Reply, pr.Err)
proposal.signalProposalResult(pr)
if proposal.sp != nil {
tracing.FinishSpan(proposal.sp)
proposal.sp = nil
}
proposal.signalProposalResult(pr)
}

// returnProposalResult signals proposal.doneCh with the proposal result if it
Expand Down
50 changes: 0 additions & 50 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,56 +1404,6 @@ func (m lastUpdateTimesMap) isFollowerActive(
return now.Sub(lastUpdateTime) <= MaxQuotaReplicaLivenessDuration
}

// tryReproposeWithNewLeaseIndex is used by processRaftCommand to
// repropose commands that have gotten an illegal lease index error,
// and that we know could not have applied while their lease index was
// valid (that is, we observed all applied entries between proposal
// and the lease index becoming invalid, as opposed to skipping some
// of them by applying a snapshot).
//
// It is not intended for use elsewhere and is only a top-level
// function so that it can avoid the below_raft_protos check. Returns
// true if the command has been successfully reproposed (not
// necessarily by this method! But if this method returns true, the
// command will be in the local proposals map).
func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
r.mu.Lock()
if proposal.command.MaxLeaseIndex > r.mu.state.LeaseAppliedIndex {
// If the command's MaxLeaseIndex is greater than the
// LeaseAppliedIndex, it must have already been reproposed (this
// can happen if there are multiple copies of the command in the
// logs; see TestReplicaRefreshMultiple). We must not create
// multiple copies with multiple lease indexes, so don't repropose
// it again. This ensures that at any time, there is only up to a
// single lease index that has a chance of succeeding in the Raft
// log for a given command.
//
// Note that the caller has already removed the current version of
// the proposal from the pending proposals map. We must re-add it
// since it's still pending.
log.VEventf(proposal.ctx, 2, "skipping reproposal, already reproposed at index %d",
proposal.command.MaxLeaseIndex)
r.mu.proposals[proposal.idKey] = proposal
r.mu.Unlock()
return true
}
r.mu.Unlock()

// Some tests check for this log message in the trace.
log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex")
if _, pErr := r.propose(proposal.ctx, proposal); pErr != nil {
// TODO(nvanbenschoten): Returning false here isn't ok. It will result
// in a proposal returning without a response or an error, which
// triggers a panic higher up in the stack. We need to fix this.
log.Warningf(proposal.ctx, "failed to repropose with new lease index: %s", pErr)
return false
}
return true
}

// maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes
// any replicas and, if so, locks them for subsumption. See acquireMergeLock
// for details about the lock itself.
Expand Down
Loading

0 comments on commit ed40e07

Please sign in to comment.