diff --git a/pkg/storage/cmd_app_batch.go b/pkg/storage/cmd_app_batch.go new file mode 100644 index 000000000000..72c423fca530 --- /dev/null +++ b/pkg/storage/cmd_app_batch.go @@ -0,0 +1,131 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + "sync" + + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "go.etcd.io/etcd/raft/raftpb" +) + +// cmdAppBatch accumulates state due to the application of raft +// commands. Committed raft commands are applied to the batch in a multi-stage +// process whereby individual commands are decoded, prepared for application +// relative to the current view of replicaState, committed to the storage +// engine, applied to the Replica's in-memory state and then finished by +// releasing their latches and notifying clients. +type cmdAppBatch struct { + // decodeBuf is used to decode an entry before adding it to the batch. + // See decode(). + decodeBuf decodedRaftEntry + + // batch accumulates writes implied by the raft entries in this batch. + batch engine.Batch + + // replicaState is this batch's view of the replica's state. + // It is copied from under the Replica.mu when the batch is initialized and + // is updated in stageTrivialReplicatedEvalResult. + replicaState storagepb.ReplicaState + + // stats is stored on the application batch to avoid an allocation in tracking + // the batch's view of replicaState. All pointer fields in replicaState other + // than Stats are overwritten completely rather than updated in-place. + stats enginepb.MVCCStats + + // updatedTruncatedState tracks whether any command in the batch updated the + // replica's truncated state. Truncated state updates are considered trivial + // but represent something of a special case but given their relative + // frequency and the fact that multiple updates can be trivially coalesced, we + // treat updates to truncated state as trivial. If the batch updated the + // truncated state then after it has been committed, then the side-loaded data + // and raftentry.Cache should be truncated to the index indicated. + // TODO(ajwerner): consider whether this logic should imply that commands + // which update truncated state are non-trivial. + updatedTruncatedState bool + + cmdBuf cmdAppCtxBuf +} + +// cmdAppBatch structs are needed to apply raft commands, which is to +// say, frequently, so best to pool them rather than allocated under the raftMu. +var cmdAppBatchSyncPool = sync.Pool{ + New: func() interface{} { + return new(cmdAppBatch) + }, +} + +func getCmdAppBatch() *cmdAppBatch { + return cmdAppBatchSyncPool.Get().(*cmdAppBatch) +} + +func releaseCmdAppBatch(b *cmdAppBatch) { + b.cmdBuf.clear() + *b = cmdAppBatch{} + cmdAppBatchSyncPool.Put(b) +} + +// add adds adds the entry and its decoded state to the end of the batch. +func (b *cmdAppBatch) add(e *raftpb.Entry, d decodedRaftEntry) { + s := b.cmdBuf.allocate() + s.decodedRaftEntry = d + s.e = e +} + +// decode decodes commands from toProcess until either all of the commands have +// been added to the batch or a non-trivial command is decoded. Non-trivial +// commands must always be in their own batch. If a non-trivial command is +// encountered the batch is returned immediately without adding the newly +// decoded command to the batch or removing it from remaining. +// It is the client's responsibility to deal with non-trivial commands. +// +// numEmptyEntries indicates the number of entries in the consumed portion of +// toProcess contained a zero-byte payload. +func (b *cmdAppBatch) decode( + ctx context.Context, toProcess []raftpb.Entry, decodeBuf *decodedRaftEntry, +) ( + foundNonTrivialEntry bool, + numEmptyEntries int, + remaining []raftpb.Entry, + errExpl string, + err error, +) { + for len(toProcess) > 0 { + e := &toProcess[0] + if len(e.Data) == 0 { + numEmptyEntries++ + } + if errExpl, err = decodeBuf.decode(ctx, e); err != nil { + return false, numEmptyEntries, nil, errExpl, err + } + // This is a non-trivial entry which needs to be processed alone. + foundNonTrivialEntry = !isTrivial(decodeBuf.replicatedResult(), + b.replicaState.UsingAppliedStateKey) + if foundNonTrivialEntry { + break + } + // We're going to process this entry in this batch so pop it from toProcess + // and add to appStates. + toProcess = toProcess[1:] + b.add(e, *decodeBuf) + } + return foundNonTrivialEntry, numEmptyEntries, toProcess, "", nil +} + +func (b *cmdAppBatch) reset() { + b.cmdBuf.clear() + *b = cmdAppBatch{ + decodeBuf: b.decodeBuf, // preserve the previously decoded entry + } +} diff --git a/pkg/storage/cmd_app_ctx.go b/pkg/storage/cmd_app_ctx.go new file mode 100644 index 000000000000..3b80f2697a14 --- /dev/null +++ b/pkg/storage/cmd_app_ctx.go @@ -0,0 +1,134 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft/raftpb" +) + +// cmdAppCtx stores the state required to apply a single raft +// entry to a replica. The state is accumulated in stages which occur in +// Replica.handleCommittedEntriesRaftMuLocked. From a high level, a command is +// decoded into an entryApplicationBatch, then if it was proposed locally the +// proposal is populated from the replica's proposals map, then the command +// is staged into the batch by writing its update to the batch's engine.Batch +// and applying its "trivial" side-effects to the batch's view of ReplicaState. +// Then the batch is committed, the side-effects are applied and the local +// result is processed. +type cmdAppCtx struct { + // e is the Entry being applied. + e *raftpb.Entry + decodedRaftEntry // decoded from e. + + // proposal is populated on the proposing Replica only and comes from the + // Replica's proposal map. + proposal *ProposalData + // ctx will be the proposal's context if proposed locally, otherwise it will + // be populated with the handleCommittedEntries ctx. + ctx context.Context + + // The below fields are set during stageRaftCommand when we validate that + // a command applies given the current lease in checkForcedErr. + leaseIndex uint64 + forcedErr *roachpb.Error + proposalRetry proposalReevaluationReason + mutationCount int // number of mutations in the WriteBatch, for writeStats + // splitMergeUnlock is acquired for splits and merges. + splitMergeUnlock func(*storagepb.ReplicatedEvalResult) + + // The below fields are set after the data has been written to the storage + // engine in prepareLocalResult. + localResult *result.LocalResult + response proposalResult +} + +func (cmd *cmdAppCtx) proposedLocally() bool { + return cmd.proposal != nil +} + +// decodedRaftEntry represents the deserialized content of a raftpb.Entry. +type decodedRaftEntry struct { + idKey storagebase.CmdIDKey + raftCmd storagepb.RaftCommand + *decodedConfChange // only non-nil for config changes +} + +// decodedConfChange represents the fields of a config change raft command. +type decodedConfChange struct { + cc raftpb.ConfChange + ccCtx ConfChangeContext +} + +func (d *decodedRaftEntry) replicatedResult() *storagepb.ReplicatedEvalResult { + return &d.raftCmd.ReplicatedEvalResult +} + +// decode decodes the entry e into the decodedRaftEntry. +func (d *decodedRaftEntry) decode( + ctx context.Context, e *raftpb.Entry, +) (errExpl string, err error) { + *d = decodedRaftEntry{} + // etcd raft sometimes inserts nil commands, ours are never nil. + // This case is handled upstream of this call. + if len(e.Data) == 0 { + return "", nil + } + switch e.Type { + case raftpb.EntryNormal: + return d.decodeNormalEntry(e) + case raftpb.EntryConfChange: + return d.decodeConfChangeEntry(e) + default: + log.Fatalf(ctx, "unexpected Raft entry: %v", e) + return "", nil // unreachable + } +} + +func (d *decodedRaftEntry) decodeNormalEntry(e *raftpb.Entry) (errExpl string, err error) { + var encodedCommand []byte + d.idKey, encodedCommand = DecodeRaftCommand(e.Data) + // An empty command is used to unquiesce a range and wake the + // leader. Clear commandID so it's ignored for processing. + if len(encodedCommand) == 0 { + d.idKey = "" + } else if err := protoutil.Unmarshal(encodedCommand, &d.raftCmd); err != nil { + const errExpl = "while unmarshalling entry" + return errExpl, errors.Wrap(err, errExpl) + } + return "", nil +} + +func (d *decodedRaftEntry) decodeConfChangeEntry(e *raftpb.Entry) (errExpl string, err error) { + d.decodedConfChange = &decodedConfChange{} + if err := protoutil.Unmarshal(e.Data, &d.cc); err != nil { + const errExpl = "while unmarshaling ConfChange" + return errExpl, errors.Wrap(err, errExpl) + } + if err := protoutil.Unmarshal(d.cc.Context, &d.ccCtx); err != nil { + const errExpl = "while unmarshaling ConfChangeContext" + return errExpl, errors.Wrap(err, errExpl) + } + if err := protoutil.Unmarshal(d.ccCtx.Payload, &d.raftCmd); err != nil { + const errExpl = "while unmarshaling RaftCommand" + return errExpl, errors.Wrap(err, errExpl) + } + d.idKey = storagebase.CmdIDKey(d.ccCtx.CommandID) + return "", nil +} diff --git a/pkg/storage/cmd_app_ctx_buf.go b/pkg/storage/cmd_app_ctx_buf.go new file mode 100644 index 000000000000..c16640da4f5e --- /dev/null +++ b/pkg/storage/cmd_app_ctx_buf.go @@ -0,0 +1,116 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import "sync" + +// cmdAppCtxBufNodeSize is the size of the arrays in an +// cmdAppStateBufNode. +// TODO(ajwerner): justify this number. +const cmdAppCtxBufNodeSize = 8 + +// cmdAppCtxBuf is an allocation-efficient buffer used during the +// application of raft entries. Initialization occurs lazily upon the first +// call to allocate but used cmdAppCtxBuf objects should be released +// explicitly with the clear() method to release the allocated buffers back +// to the pool. +type cmdAppCtxBuf struct { + len int32 + head, tail *cmdAppCtxBufNode +} + +// cmdAppCtxBufNode is a linked-list element in an +// cmdAppStateBuf. +type cmdAppCtxBufNode struct { + len int32 + buf [cmdAppCtxBufNodeSize]cmdAppCtx + next *cmdAppCtxBufNode +} + +var cmdAppStateBufNodeSyncPool = sync.Pool{ + New: func() interface{} { return new(cmdAppCtxBufNode) }, +} + +// allocate extends the length of buf by one and returns the newly +// added element. If this is the first call to allocate it will initialize buf. +// After a buf is initialized it should be explicitly destroyed. +func (buf *cmdAppCtxBuf) allocate() *cmdAppCtx { + if buf.tail == nil { // lazy initialization + n := cmdAppStateBufNodeSyncPool.Get().(*cmdAppCtxBufNode) + buf.head, buf.tail = n, n + } + if buf.tail.len == cmdAppCtxBufNodeSize { + newTail := cmdAppStateBufNodeSyncPool.Get().(*cmdAppCtxBufNode) + buf.tail.next = newTail + buf.tail = newTail + } + ret := &buf.tail.buf[buf.tail.len] + buf.tail.len++ + buf.len++ + return ret +} + +// truncate clears all of the entries currently in a buffer and returns any +// allocated buffers to the pool. +func (buf *cmdAppCtxBuf) clear() { + for buf.head != nil { + buf.len -= buf.head.len + oldHead := buf.head + newHead := oldHead.next + buf.head = newHead + *oldHead = cmdAppCtxBufNode{} + cmdAppStateBufNodeSyncPool.Put(oldHead) + } + *buf = cmdAppCtxBuf{} +} + +// last returns a pointer to the last element in the buffer. +func (buf *cmdAppCtxBuf) last() *cmdAppCtx { + return &buf.tail.buf[buf.tail.len-1] +} + +// cmdAppCtxBufIterator iterates through the entries in an +// cmdAppStateBuf. +type cmdAppCtxBufIterator struct { + idx int32 + buf *cmdAppCtxBuf + node *cmdAppCtxBufNode +} + +// init seeks the iterator to the front of buf. It returns true if buf is not +// empty and false if it is. +func (it *cmdAppCtxBufIterator) init(buf *cmdAppCtxBuf) (ok bool) { + *it = cmdAppCtxBufIterator{buf: buf, node: buf.head} + return it.buf.len > 0 +} + +// cur returns the cmdAppState currently pointed to by it. +func (it *cmdAppCtxBufIterator) cur() *cmdAppCtx { + return &it.node.buf[it.idx%cmdAppCtxBufNodeSize] +} + +// isLast returns true if it currently points to the last element in the buffer. +func (it *cmdAppCtxBufIterator) isLast() bool { + return it.idx+1 == it.buf.len +} + +// next moves it to point to the next element. It returns false if it.isLast() +// is true, indicating that there are no more elements in the buffer. +func (it *cmdAppCtxBufIterator) next() bool { + if it.isLast() { + return false + } + it.idx++ + if it.idx%cmdAppCtxBufNodeSize == 0 { + it.node = it.node.next + } + return true +} diff --git a/pkg/storage/cmd_app_ctx_buf_test.go b/pkg/storage/cmd_app_ctx_buf_test.go new file mode 100644 index 000000000000..99979f35271c --- /dev/null +++ b/pkg/storage/cmd_app_ctx_buf_test.go @@ -0,0 +1,54 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +// TestEntryApplicationStateBuf verifies the entryApplicationStateBuf behavior. +func TestApplicationStateBuf(t *testing.T) { + defer leaktest.AfterTest(t)() + var buf cmdAppCtxBuf + // numStates is chosen arbitrarily. + const numStates = 5*cmdAppCtxBufNodeSize + 1 + // Test that the len field is properly updated. + var states []*cmdAppCtx + for i := 0; i < numStates; i++ { + assert.Equal(t, i, int(buf.len)) + states = append(states, buf.allocate()) + assert.Equal(t, i+1, int(buf.len)) + } + // Test that last returns the correct value. + last := states[len(states)-1] + assert.Equal(t, last, buf.last()) + // Test the iterator. + var it cmdAppCtxBufIterator + i := 0 + for ok := it.init(&buf); ok; ok = it.next() { + assert.Equal(t, states[i], it.cur()) + i++ + } + assert.Equal(t, i, numStates) // make sure we saw them all + assert.True(t, it.isLast()) + // Test clear. + buf.clear() + assert.EqualValues(t, buf, cmdAppCtxBuf{}) + assert.Equal(t, 0, int(buf.len)) + assert.Panics(t, func() { buf.last() }) + assert.False(t, it.init(&buf)) + // Test clear on an empty buffer. + buf.clear() + assert.EqualValues(t, buf, cmdAppCtxBuf{}) +} diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go new file mode 100644 index 000000000000..a02b104c1100 --- /dev/null +++ b/pkg/storage/replica_application.go @@ -0,0 +1,848 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/stateloader" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/kr/pretty" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft/raftpb" +) + +// handleCommittedEntriesStats returns stats about what happened during the +// application of a set of raft entries. +// +// TODO(ajwerner): add metrics to go with these stats. +type handleCommittedEntriesStats struct { + batchesProcessed int + entriesProcessed int + stateAssertions int + numEmptyEntries int +} + +// handleCommittedEntriesRaftMuLocked deals with the complexities involved in +// moving the Replica's replicated state machine forward given committed raft +// entries. All changes to r.mu.state occur downstream of this call. +// +// The stats return value reflects the number of entries which are processed, +// the number of batches used to process them, the number of entries which +// contained no data (added by etcd raft), and the number of entries which +// required asserting the on-disk state. +// +// Errors returned from this method are fatal! The errExpl is provided as a +// non-sensitive cue of where the error occurred. +// TODO(ajwerner): replace errExpl with proper error composition using the new +// errors library. +// +// At a high level, this method receives committed entries which each contains +// the evaluation of a batch (at its heart a WriteBatch, to be applied to the +// underlying storage engine), which it decodes into batches of entries which +// are safe to apply atomically together. +// +// The pseudocode looks like: +// +// while entries: +// for each entry in entries: +// decode front into temp struct +// if entry is non-trivial and batch is not empty: +// break +// add decoded command to batch +// pop front from entries +// if entry is non-trivial: +// break +// prepare local commands +// for each entry in batch: +// check if failed +// if not: +// stage in batch +// commit batch to storage engine +// update Replica.state +// for each entry in batch: +// apply side-effects, ack client, release latches +// +// The processing of committed entries proceeds in 4 stages: decoding, +// local preparation, staging, and application. Commands may be applied together +// so long as their implied state change is "trivial" (see isTrivial). Once +// decoding has discovered a batch boundary, the commands are prepared by +// reading the current replica state from underneath the Replica.mu and +// determining whether any of the commands were proposed locally. Next each +// command is written to the engine.Batch and has the "trivial" component of +// its ReplicatedEvalResult applied to the batch's view of the ReplicaState. +// Finally the batch is written to the storage engine, its side effects on +// the Replica's state are applied, and the clients acked with the respective +// results. +func (r *Replica) handleCommittedEntriesRaftMuLocked( + ctx context.Context, committedEntries []raftpb.Entry, +) (stats handleCommittedEntriesStats, errExpl string, err error) { + var ( + haveNonTrivialEntry bool + toProcess = committedEntries + b = getCmdAppBatch() + ) + defer releaseCmdAppBatch(b) + for len(toProcess) > 0 { + if haveNonTrivialEntry { + // If the previous call to b.decode() informed us that it left a + // non-trivial entry in decodeBuf, use it. + haveNonTrivialEntry = false + b.add(&toProcess[0], b.decodeBuf) + toProcess = toProcess[1:] + } else { + // Decode zero or more trivial entries into b. + var numEmptyEntries int + haveNonTrivialEntry, numEmptyEntries, toProcess, errExpl, err = + b.decode(ctx, toProcess, &b.decodeBuf) + if err != nil { + return stats, errExpl, err + } + // If no trivial entries were decoded go back around and process the + // non-trivial entry. + if b.cmdBuf.len == 0 { + continue + } + stats.numEmptyEntries += numEmptyEntries + } + r.retrieveLocalProposals(ctx, b) + b.batch = r.store.engine.NewBatch() + // Stage each of the commands which will write them into the newly created + // engine.Batch and update b's view of the replicaState. + var it cmdAppCtxBufIterator + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + r.stageRaftCommand(cmd.ctx, cmd, b.batch, &b.replicaState, + it.isLast() /* writeAppliedState */) + // We permit trivial commands to update the truncated state but we need to + // track whether that happens so that the side-effects of truncation occur + // after the batch is committed to storage. + updatedTruncatedState := stageTrivialReplicatedEvalResult(cmd.ctx, + cmd.replicatedResult(), cmd.e.Index, cmd.leaseIndex, &b.replicaState) + b.updatedTruncatedState = b.updatedTruncatedState || updatedTruncatedState + } + if errExpl, err = r.applyCmdAppBatch(ctx, b, &stats); err != nil { + return stats, errExpl, err + } + } + return stats, "", nil +} + +// retrieveLocalProposals populates the proposal and ctx fields for each of the +// commands in b. +func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { + r.mu.Lock() + defer r.mu.Unlock() + b.replicaState = r.mu.state + // Copy stats as it gets updated in-place in applyRaftCommandToBatch. + b.replicaState.Stats = &b.stats + *b.replicaState.Stats = *r.mu.state.Stats + var it cmdAppCtxBufIterator + haveProposalQuota := r.mu.proposalQuota != nil + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + cmd.proposal = r.mu.proposals[cmd.idKey] + if cmd.proposedLocally() { + // We initiated this command, so use the caller-supplied context. + cmd.ctx = cmd.proposal.ctx + delete(r.mu.proposals, cmd.idKey) + // At this point we're not guaranteed to have proposalQuota initialized, + // the same is true for quotaReleaseQueues. Only queue the proposal's + // quota for release if the proposalQuota is initialized. + if haveProposalQuota { + r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) + } + } else { + cmd.ctx = ctx + } + } +} + +// stageRaftCommand handles the first phase of applying a command to the +// replica state machine. +// +// The proposal also contains auxiliary data which needs to be verified in order +// to decide whether the proposal should be applied: the command's MaxLeaseIndex +// must move the state machine's LeaseAppliedIndex forward, and the proposer's +// lease (or rather its sequence number) must match that of the state machine, +// and lastly the GCThreshold is validated. If any of the checks fail, the +// proposal's content is wiped and we apply an empty log entry instead. If an +// error occurs and the command was proposed locally, the error will be +// communicated to the waiting proposer. The two typical cases in which errors +// occur are lease mismatch (in which case the caller tries to send the command +// to the actual leaseholder) and violation of the LeaseAppliedIndex (in which +// case the proposal is retried if it was proposed locally). +// +// Assuming all checks were passed, the command is applied to the batch, +// which is done by the aptly named applyRaftCommandToBatch. +// +// For trivial proposals this is the whole story, but some commands trigger +// additional code in this method in this method via a side effect (in the +// proposal's ReplicatedEvalResult or, for local proposals, +// LocalEvalResult). These might, for example, trigger an update of the +// Replica's in-memory state to match updates to the on-disk state, or pass +// intents to the intent resolver. Some commands don't fit this simple schema +// and need to hook deeper into the code. Notably splits and merges need to +// acquire locks on their right-hand side Replicas and may need to add data to +// the WriteBatch before it is applied; similarly, changes to the disk layout of +// internal state typically require a migration which shows up here. Any of this +// logic however is deferred until after the batch has been written to the +// storage engine. +func (r *Replica) stageRaftCommand( + ctx context.Context, + cmd *cmdAppCtx, + batch engine.Batch, + replicaState *storagepb.ReplicaState, + writeAppliedState bool, +) { + if cmd.e.Index == 0 { + log.Fatalf(ctx, "processRaftCommand requires a non-zero index") + } + if log.V(4) { + log.Infof(ctx, "processing command %x: maxLeaseIndex=%d", + cmd.idKey, cmd.raftCmd.MaxLeaseIndex) + } + + var ts hlc.Timestamp + if cmd.idKey != "" { + ts = cmd.replicatedResult().Timestamp + } + + cmd.leaseIndex, cmd.proposalRetry, cmd.forcedErr = checkForcedErr(ctx, + cmd.idKey, cmd.raftCmd, cmd.proposal, cmd.proposedLocally(), replicaState) + if cmd.forcedErr == nil { + // Verify that the batch timestamp is after the GC threshold. This is + // necessary because not all commands declare read access on the GC + // threshold key, even though they implicitly depend on it. This means + // that access to this state will not be serialized by latching, + // so we must perform this check upstream and downstream of raft. + // See #14833. + // + // We provide an empty key span because we already know that the Raft + // command is allowed to apply within its key range. This is guaranteed + // by checks upstream of Raft, which perform the same validation, and by + // span latches, which assure that any modifications to the range's + // boundaries will be serialized with this command. Finally, the + // leaseAppliedIndex check in checkForcedErrLocked ensures that replays + // outside of the spanlatch manager's control which break this + // serialization ordering will already by caught and an error will be + // thrown. + cmd.forcedErr = roachpb.NewError(r.requestCanProceed(roachpb.RSpan{}, ts)) + } + + // applyRaftCommandToBatch will return "expected" errors, but may also indicate + // replica corruption (as of now, signaled by a replicaCorruptionError). + // We feed its return through maybeSetCorrupt to act when that happens. + if cmd.forcedErr != nil { + log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.forcedErr) + } else { + log.Event(ctx, "applying command") + + if splitMergeUnlock, err := r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil { + log.Eventf(ctx, "unable to acquire split lock: %s", err) + // Send a crash report because a former bug in the error handling might have + // been the root cause of #19172. + _ = r.store.stopper.RunAsyncTask(ctx, "crash report", func(ctx context.Context) { + log.SendCrashReport( + ctx, + &r.store.cfg.Settings.SV, + 0, // depth + "while acquiring split lock: %s", + []interface{}{err}, + log.ReportTypeError, + ) + }) + + cmd.forcedErr = roachpb.NewError(err) + } else if splitMergeUnlock != nil { + // Set the splitMergeUnlock on the cmdAppCtx to be called after the batch + // has been applied (see applyBatch). + cmd.splitMergeUnlock = splitMergeUnlock + } + } + + if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; cmd.forcedErr == nil && filter != nil { + var newPropRetry int + newPropRetry, cmd.forcedErr = filter(storagebase.ApplyFilterArgs{ + CmdID: cmd.idKey, + ReplicatedEvalResult: *cmd.replicatedResult(), + StoreID: r.store.StoreID(), + RangeID: r.RangeID, + }) + if cmd.proposalRetry == 0 { + cmd.proposalRetry = proposalReevaluationReason(newPropRetry) + } + } + + if cmd.forcedErr != nil { + // Apply an empty entry. + *cmd.replicatedResult() = storagepb.ReplicatedEvalResult{} + cmd.raftCmd.WriteBatch = nil + cmd.raftCmd.LogicalOpLog = nil + } + + // Update the node clock with the serviced request. This maintains + // a high water mark for all ops serviced, so that received ops without + // a timestamp specified are guaranteed one higher than any op already + // executed for overlapping keys. + // TODO(ajwerner): coalesce the clock update per batch. + r.store.Clock().Update(ts) + + { + err := r.applyRaftCommandToBatch(cmd.ctx, cmd, replicaState, batch, writeAppliedState) + + // applyRaftCommandToBatch returned an error, which usually indicates + // either a serious logic bug in CockroachDB or a disk + // corruption/out-of-space issue. Make sure that these fail with + // descriptive message so that we can differentiate the root causes. + if err != nil { + log.Errorf(ctx, "unable to update the state machine: %+v", err) + // Report the fatal error separately and only with the error, as that + // triggers an optimization for which we directly report the error to + // sentry (which in turn allows sentry to distinguish different error + // types). + log.Fatal(ctx, err) + } + } + + if deprecatedDelta := cmd.replicatedResult().DeprecatedDelta; deprecatedDelta != nil { + cmd.replicatedResult().Delta = deprecatedDelta.ToStatsDelta() + cmd.replicatedResult().DeprecatedDelta = nil + } + + // AddSSTable ingestions run before the actual batch gets written to the + // storage engine. This makes sure that when the Raft command is applied, + // the ingestion has definitely succeeded. Note that we have taken + // precautions during command evaluation to avoid having mutations in the + // WriteBatch that affect the SSTable. Not doing so could result in order + // reversal (and missing values) here. + // + // NB: any command which has an AddSSTable is non-trivial and will be + // applied in its own batch so it's not possible that any other commands + // which precede this command can shadow writes from this SSTable. + if cmd.replicatedResult().AddSSTable != nil { + copied := addSSTablePreApply( + ctx, + r.store.cfg.Settings, + r.store.engine, + r.raftMu.sideloaded, + cmd.e.Term, + cmd.e.Index, + *cmd.replicatedResult().AddSSTable, + r.store.limiters.BulkIOWriteRate, + ) + r.store.metrics.AddSSTableApplications.Inc(1) + if copied { + r.store.metrics.AddSSTableApplicationCopies.Inc(1) + } + cmd.replicatedResult().AddSSTable = nil + } + + if cmd.replicatedResult().Split != nil { + // Splits require a new HardState to be written to the new RHS + // range (and this needs to be atomic with the main batch). This + // cannot be constructed at evaluation time because it differs + // on each replica (votes may have already been cast on the + // uninitialized replica). Write this new hardstate to the batch too. + // See https://github.com/cockroachdb/cockroach/issues/20629 + splitPreApply(ctx, batch, cmd.replicatedResult().Split.SplitTrigger) + } + + if merge := cmd.replicatedResult().Merge; merge != nil { + // Merges require the subsumed range to be atomically deleted when the + // merge transaction commits. + rhsRepl, err := r.store.GetReplica(merge.RightDesc.RangeID) + if err != nil { + log.Fatal(ctx, err) + } + const destroyData = false + err = rhsRepl.preDestroyRaftMuLocked(ctx, batch, batch, merge.RightDesc.NextReplicaID, destroyData) + if err != nil { + log.Fatal(ctx, err) + } + } +} + +func checkForcedErr( + ctx context.Context, + idKey storagebase.CmdIDKey, + raftCmd storagepb.RaftCommand, + proposal *ProposalData, + proposedLocally bool, + replicaState *storagepb.ReplicaState, +) (uint64, proposalReevaluationReason, *roachpb.Error) { + leaseIndex := replicaState.LeaseAppliedIndex + isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest + var requestedLease roachpb.Lease + if isLeaseRequest { + requestedLease = *raftCmd.ReplicatedEvalResult.State.Lease + } + if idKey == "" { + // This is an empty Raft command (which is sent by Raft after elections + // to trigger reproposals or during concurrent configuration changes). + // Nothing to do here except making sure that the corresponding batch + // (which is bogus) doesn't get executed (for it is empty and so + // properties like key range are undefined). + return leaseIndex, proposalNoReevaluation, roachpb.NewErrorf("no-op on empty Raft entry") + } + + // Verify the lease matches the proposer's expectation. We rely on + // the proposer's determination of whether the existing lease is + // held, and can be used, or is expired, and can be replaced. + // Verify checks that the lease has not been modified since proposal + // due to Raft delays / reorderings. + // To understand why this lease verification is necessary, see comments on the + // proposer_lease field in the proto. + leaseMismatch := false + if raftCmd.DeprecatedProposerLease != nil { + // VersionLeaseSequence must not have been active when this was proposed. + // + // This does not prevent the lease race condition described below. The + // reason we don't fix this here as well is because fixing the race + // requires a new cluster version which implies that we'll already be + // using lease sequence numbers and will fall into the case below. + leaseMismatch = !raftCmd.DeprecatedProposerLease.Equivalent(*replicaState.Lease) + } else { + leaseMismatch = raftCmd.ProposerLeaseSequence != replicaState.Lease.Sequence + if !leaseMismatch && isLeaseRequest { + // Lease sequence numbers are a reflection of lease equivalency + // between subsequent leases. However, Lease.Equivalent is not fully + // symmetric, meaning that two leases may be Equivalent to a third + // lease but not Equivalent to each other. If these leases are + // proposed under that same third lease, neither will be able to + // detect whether the other has applied just by looking at the + // current lease sequence number because neither will will increment + // the sequence number. + // + // This can lead to inversions in lease expiration timestamps if + // we're not careful. To avoid this, if a lease request's proposer + // lease sequence matches the current lease sequence and the current + // lease sequence also matches the requested lease sequence, we make + // sure the requested lease is Equivalent to current lease. + if replicaState.Lease.Sequence == requestedLease.Sequence { + // It is only possible for this to fail when expiration-based + // lease extensions are proposed concurrently. + leaseMismatch = !replicaState.Lease.Equivalent(requestedLease) + } + + // This is a check to see if the lease we proposed this lease request against is the same + // lease that we're trying to update. We need to check proposal timestamps because + // extensions don't increment sequence numbers. Without this check a lease could + // be extended and then another lease proposed against the original lease would + // be applied over the extension. + if raftCmd.ReplicatedEvalResult.PrevLeaseProposal != nil && + (*raftCmd.ReplicatedEvalResult.PrevLeaseProposal != *replicaState.Lease.ProposedTS) { + leaseMismatch = true + } + } + } + if leaseMismatch { + log.VEventf( + ctx, 1, + "command proposed from replica %+v with lease #%d incompatible to %v", + raftCmd.ProposerReplica, raftCmd.ProposerLeaseSequence, *replicaState.Lease, + ) + if isLeaseRequest { + // For lease requests we return a special error that + // redirectOnOrAcquireLease() understands. Note that these + // requests don't go through the DistSender. + return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ + Existing: *replicaState.Lease, + Requested: requestedLease, + Message: "proposed under invalid lease", + }) + } + // We return a NotLeaseHolderError so that the DistSender retries. + nlhe := newNotLeaseHolderError( + replicaState.Lease, raftCmd.ProposerReplica.StoreID, replicaState.Desc) + nlhe.CustomMsg = fmt.Sprintf( + "stale proposal: command was proposed under lease #%d but is being applied "+ + "under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease) + return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe) + } + + if isLeaseRequest { + // Lease commands are ignored by the counter (and their MaxLeaseIndex is ignored). This + // makes sense since lease commands are proposed by anyone, so we can't expect a coherent + // MaxLeaseIndex. Also, lease proposals are often replayed, so not making them update the + // counter makes sense from a testing perspective. + // + // However, leases get special vetting to make sure we don't give one to a replica that was + // since removed (see #15385 and a comment in redirectOnOrAcquireLease). + if _, ok := replicaState.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok { + return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ + Existing: *replicaState.Lease, + Requested: requestedLease, + Message: "replica not part of range", + }) + } + } else if replicaState.LeaseAppliedIndex < raftCmd.MaxLeaseIndex { + // The happy case: the command is applying at or ahead of the minimal + // permissible index. It's ok if it skips a few slots (as can happen + // during rearrangement); this command will apply, but later ones which + // were proposed at lower indexes may not. Overall though, this is more + // stable and simpler than requiring commands to apply at their exact + // lease index: Handling the case in which MaxLeaseIndex > oldIndex+1 + // is otherwise tricky since we can't tell the client to try again + // (reproposals could exist and may apply at the right index, leading + // to a replay), and assigning the required index would be tedious + // seeing that it would have to rewind sometimes. + leaseIndex = raftCmd.MaxLeaseIndex + } else { + // The command is trying to apply at a past log position. That's + // unfortunate and hopefully rare; the client on the proposer will try + // again. Note that in this situation, the leaseIndex does not advance. + retry := proposalNoReevaluation + if proposedLocally { + log.VEventf( + ctx, 1, + "retry proposal %x: applied at lease index %d, required < %d", + proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, + ) + retry = proposalIllegalLeaseIndex + } + return leaseIndex, retry, roachpb.NewErrorf( + "command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex, + ) + } + return leaseIndex, proposalNoReevaluation, nil +} + +// applyRaftCommandToBatch applies a raft command from the replicated log to the +// current batch's view of the underlying state machine. When the state machine +// cannot be updated, an error (which is fatal!) is returned and must be treated +// that way by the caller. +func (r *Replica) applyRaftCommandToBatch( + ctx context.Context, + cmd *cmdAppCtx, + replicaState *storagepb.ReplicaState, + batch engine.Batch, + writeAppliedState bool, +) error { + writeBatch := cmd.raftCmd.WriteBatch + if writeBatch != nil && len(writeBatch.Data) > 0 { + mutationCount, err := engine.RocksDBBatchCount(writeBatch.Data) + if err != nil { + log.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err) + } else { + cmd.mutationCount = mutationCount + } + } + + // Exploit the fact that a split will result in a full stats + // recomputation to reset the ContainsEstimates flag. + // + // TODO(tschottdorf): We want to let the usual MVCCStats-delta + // machinery update our stats for the left-hand side. But there is no + // way to pass up an MVCCStats object that will clear out the + // ContainsEstimates flag. We should introduce one, but the migration + // makes this worth a separate effort (ContainsEstimates would need to + // have three possible values, 'UNCHANGED', 'NO', and 'YES'). + // Until then, we're left with this rather crude hack. + if cmd.replicatedResult().Split != nil { + replicaState.Stats.ContainsEstimates = false + } + ms := replicaState.Stats + + if cmd.e.Index != replicaState.RaftAppliedIndex+1 { + // If we have an out of order index, there's corruption. No sense in + // trying to update anything or running the command. Simply return + // a corruption error. + return errors.Errorf("applied index jumped from %d to %d", + replicaState.RaftAppliedIndex, cmd.e.Index) + } + + if writeBatch != nil { + if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil { + return errors.Wrap(err, "unable to apply WriteBatch") + } + } + + // The only remaining use of the batch is for range-local keys which we know + // have not been previously written within this batch. + // + // TODO(ajwerner): explore the costs and benefits of this use of Distinct(). + // This call requires flushing the batch's writes but should make subsequent + // writes somewhat cheaper. Early exploration showed no win but found that if + // a Distinct() batch could be used for all of command application it could + // be a win. At the time of writing that approach was deemed not safe. + writer := batch.Distinct() + + // Special-cased MVCC stats handling to exploit commutativity of stats delta + // upgrades. Thanks to commutativity, the spanlatch manager does not have to + // serialize on the stats key. + deltaStats := cmd.replicatedResult().Delta.ToStats() + usingAppliedStateKey := replicaState.UsingAppliedStateKey + needAppliedStateMigration := !usingAppliedStateKey && + cmd.replicatedResult().State != nil && + cmd.replicatedResult().State.UsingAppliedStateKey + if needAppliedStateMigration { + // The Raft command wants us to begin using the RangeAppliedState key + // and we haven't performed the migration yet. Delete the old keys + // that this new key is replacing. + err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats) + if err != nil { + return errors.Wrap(err, "unable to migrate to range applied state") + } + usingAppliedStateKey = true + } + + if !writeAppliedState { + // Don't write any applied state, regardless of the technique we'd be using. + } else if usingAppliedStateKey { + // Note that calling ms.Add will never result in ms.LastUpdateNanos + // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos + // across all deltaStats). + ms.Add(deltaStats) + + // Set the range applied state, which includes the last applied raft and + // lease index along with the mvcc stats, all in one key. + if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer, + cmd.e.Index, cmd.leaseIndex, ms); err != nil { + return errors.Wrap(err, "unable to set range applied state") + } + ms.Subtract(deltaStats) + } else { + // Advance the last applied index. We use a blind write in order to avoid + // reading the previous applied index keys on every write operation. This + // requires a little additional work in order maintain the MVCC stats. + var appliedIndexNewMS enginepb.MVCCStats + if err := r.raftMu.stateLoader.SetLegacyAppliedIndexBlind(ctx, writer, &appliedIndexNewMS, + cmd.e.Index, cmd.leaseIndex); err != nil { + return errors.Wrap(err, "unable to set applied index") + } + deltaStats.SysBytes += appliedIndexNewMS.SysBytes - + r.raftMu.stateLoader.CalcAppliedIndexSysBytes(replicaState.RaftAppliedIndex, replicaState.LeaseAppliedIndex) + + // Note that calling ms.Add will never result in ms.LastUpdateNanos + // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos + // across all deltaStats). + ms.Add(deltaStats) + if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, ms); err != nil { + return errors.Wrap(err, "unable to update MVCCStats") + } + ms.Subtract(deltaStats) + } + + // Close the Distinct() batch here now that we're done writing to it. + writer.Close() + + // NB: it is not sane to use the distinct engine when updating truncated state + // as multiple commands in the same batch could modify that state and the + // below code reads before it writes. + haveTruncatedState := cmd.replicatedResult().State != nil && + cmd.replicatedResult().State.TruncatedState != nil + if haveTruncatedState { + apply, err := handleTruncatedStateBelowRaft(ctx, replicaState.TruncatedState, + cmd.replicatedResult().State.TruncatedState, r.raftMu.stateLoader, batch) + if err != nil { + return err + } + if !apply { + // The truncated state was discarded, so make sure we don't apply + // it to our in-memory state. + cmd.replicatedResult().State.TruncatedState = nil + cmd.replicatedResult().RaftLogDelta = 0 + // TODO(ajwerner): consider moving this code. + // We received a truncation that doesn't apply to us, so we know that + // there's a leaseholder out there with a log that has earlier entries + // than ours. That leader also guided our log size computations by + // giving us RaftLogDeltas for past truncations, and this was likely + // off. Mark our Raft log size is not trustworthy so that, assuming + // we step up as leader at some point in the future, we recompute + // our numbers. + r.mu.Lock() + r.mu.raftLogSizeTrusted = false + r.mu.Unlock() + } + } + + start := timeutil.Now() + + var assertHS *raftpb.HardState + if util.RaceEnabled && cmd.replicatedResult().Split != nil { + rsl := stateloader.Make(cmd.replicatedResult().Split.RightDesc.RangeID) + oldHS, err := rsl.LoadHardState(ctx, batch) + if err != nil { + return errors.Wrap(err, "unable to load HardState") + } + assertHS = &oldHS + } + + if assertHS != nil { + // Load the HardState that was just committed (if any). + rsl := stateloader.Make(cmd.replicatedResult().Split.RightDesc.RangeID) + newHS, err := rsl.LoadHardState(ctx, batch) + if err != nil { + return errors.Wrap(err, "unable to load HardState") + } + // Assert that nothing moved "backwards". + if newHS.Term < assertHS.Term || + (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) { + log.Fatalf(ctx, "clobbered HardState: %s\n\npreviously: %s\noverwritten with: %s", + pretty.Diff(newHS, *assertHS), pretty.Sprint(*assertHS), pretty.Sprint(newHS)) + } + } + + // TODO(ajwerner): This metric no longer has anything to do with the "Commit" + // of anything. Unfortunately it's exposed in the admin ui of 19.1. We should + // remove it from the admin ui for 19.2 and then in 20.1 we can rename it to + // something more appropriate. + elapsed := timeutil.Since(start) + r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) + cmd.replicatedResult().Delta = deltaStats.ToStatsDelta() + return nil +} + +// applyCmdAppBatch handles the logic of writing a batch to the storage engine and +// applying it to the Replica state machine. This method clears b for reuse +// before returning. +func (r *Replica) applyCmdAppBatch( + ctx context.Context, b *cmdAppBatch, stats *handleCommittedEntriesStats, +) (errExpl string, err error) { + defer b.reset() + if log.V(4) { + log.Infof(ctx, "flushing batch %v of %d entries", b.replicaState, b.cmdBuf.len) + } + // Entry application is not done without syncing to disk. + // The atomicity guarantees of the batch and the fact that the applied state + // is stored in this batch, ensure that if the batch ends up not being durably + // committed then the entries in this batch will be applied again upon + // startup. + if err := b.batch.Commit(false); err != nil { + log.Fatalf(ctx, "failed to commit Raft entry batch: %v", err) + } + b.batch.Close() + b.batch = nil + // NB: we compute the triviality of the batch here again rather than storing + // it as it may have changed during processing. In particular, the upgrade of + // applied state will have already occurred. + var batchIsNonTrivial bool + // Non-trivial entries are always alone in a batch and thus are last. + if cmd := b.cmdBuf.last(); !isTrivial(cmd.replicatedResult(), b.replicaState.UsingAppliedStateKey) { + batchIsNonTrivial = true + // Deal with locking sometimes associated with complex commands. + if unlock := cmd.splitMergeUnlock; unlock != nil { + defer unlock(cmd.replicatedResult()) + cmd.splitMergeUnlock = nil + } + if cmd.replicatedResult().BlockReads { + r.readOnlyCmdMu.Lock() + defer r.readOnlyCmdMu.Unlock() + cmd.replicatedResult().BlockReads = false + } + } + // Now that the batch is committed we can go about applying the side effects + // of the update to the truncated state. Note that this is safe only if the + // new truncated state is durably on disk (i.e.) synced. See #38566. + // + // NB: even if multiple updates to TruncatedState occurred in this batch we + // coalesce their side effects and only consult the latest + // TruncatedState.Index. + var sideloadTruncationDelta int64 + if b.updatedTruncatedState { + truncState := b.replicaState.TruncatedState + r.store.raftEntryCache.Clear(r.RangeID, truncState.Index+1) + log.VEventf(ctx, 1, "truncating sideloaded storage up to (and including) index %d", truncState.Index) + if sideloadTruncationDelta, _, err = r.raftMu.sideloaded.TruncateTo(ctx, truncState.Index+1); err != nil { + // We don't *have* to remove these entries for correctness. Log a + // loud error, but keep humming along. + log.Errorf(ctx, "while removing sideloaded files during log truncation: %+v", err) + } + } + + // Iterate through the cmds to compute the raft log delta and writeStats. + var mutationCount int + var it cmdAppCtxBufIterator + raftLogDelta := -sideloadTruncationDelta + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + raftLogDelta += cmd.replicatedResult().RaftLogDelta + cmd.replicatedResult().RaftLogDelta = 0 + mutationCount += cmd.mutationCount + } + // Record the write activity, passing a 0 nodeID because replica.writeStats + // intentionally doesn't track the origin of the writes. + r.writeStats.recordCount(float64(mutationCount), 0 /* nodeID */) + // deltaStats will store the delta from the current state to the new state + // which will be used to update the metrics. + r.mu.Lock() + r.mu.state.RaftAppliedIndex = b.replicaState.RaftAppliedIndex + r.mu.state.LeaseAppliedIndex = b.replicaState.LeaseAppliedIndex + prevStats := *r.mu.state.Stats + *r.mu.state.Stats = *b.replicaState.Stats + if raftLogDelta != 0 { + r.mu.raftLogSize += raftLogDelta + if r.mu.raftLogSize < 0 { + r.mu.raftLogSize = 0 + } + r.mu.raftLogLastCheckSize += raftLogDelta + if r.mu.raftLogLastCheckSize < 0 { + r.mu.raftLogLastCheckSize = 0 + } + } + if b.updatedTruncatedState { + r.mu.state.TruncatedState = b.replicaState.TruncatedState + } + // Check the queuing conditions. + checkRaftLog := r.mu.raftLogSize-r.mu.raftLogLastCheckSize >= RaftLogQueueStaleSize + needsSplitBySize := r.needsSplitBySizeRLocked() + needsMergeBySize := r.needsMergeBySizeRLocked() + r.mu.Unlock() + deltaStats := *b.replicaState.Stats + deltaStats.Subtract(prevStats) + r.store.metrics.addMVCCStats(deltaStats) + // NB: the bootstrap store has a nil split queue. + // TODO(tbg): the above is probably a lie now. + if r.store.splitQueue != nil && needsSplitBySize && r.splitQueueThrottle.ShouldProcess(timeutil.Now()) { + r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + // The bootstrap store has a nil merge queue. + // TODO(tbg): the above is probably a lie now. + if r.store.mergeQueue != nil && needsMergeBySize && r.mergeQueueThrottle.ShouldProcess(timeutil.Now()) { + // TODO(tbg): for ranges which are small but protected from merges by + // other means (zone configs etc), this is called on every command, and + // fires off a goroutine each time. Make this trigger (and potentially + // the split one above, though it hasn't been observed to be as + // bothersome) less aggressive. + r.store.mergeQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + if checkRaftLog { + r.store.raftLogQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + for _, sc := range cmd.replicatedResult().SuggestedCompactions { + r.store.compactor.Suggest(cmd.ctx, sc) + } + cmd.replicatedResult().SuggestedCompactions = nil + isNonTrivial := batchIsNonTrivial && it.isLast() + if errExpl, err = r.handleRaftCommandResult(ctx, cmd, isNonTrivial, + b.replicaState.UsingAppliedStateKey); err != nil { + return errExpl, err + } + if isNonTrivial { + stats.stateAssertions++ + } + stats.entriesProcessed++ + } + stats.batchesProcessed++ + return "", nil +} diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go new file mode 100644 index 000000000000..869a8e04d8bc --- /dev/null +++ b/pkg/storage/replica_application_result.go @@ -0,0 +1,430 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/kr/pretty" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" +) + +// isTrivial determines whether the side-effects of a ReplicatedEvalResult are +// "trivial". A result is fundamentally considered "trivial" if it does not have +// side effects which rely on the written state of the replica exactly matching +// the in-memory state of the replica at the corresponding log position. +// Non-trivial commands must be the last entry in a batch so that after +// batch is applied the replica's written and in-memory state correspond to that +// log index. +// +// At the time of writing it is possible that the current conditions are too +// strict but they are certainly sufficient. +func isTrivial(r *storagepb.ReplicatedEvalResult, usingAppliedStateKey bool) (ret bool) { + // Check if there are any non-trivial State updates. + if r.State != nil { + stateWhitelist := *r.State + // An entry is non-trivial if an upgrade to UsingAppliedState is + // required. If we're already usingAppliedStateKey or this entry does + // not imply an upgrade then it is trivial. + if usingAppliedStateKey { + stateWhitelist.UsingAppliedStateKey = false + } + if stateWhitelist.TruncatedState != nil { + stateWhitelist.TruncatedState = nil + } + if stateWhitelist != (storagepb.ReplicaState{}) { + return false + } + } + // Set whitelist to the value of r and clear the whitelisted fields. + // If whitelist is zero-valued after clearing the whitelisted fields then + // it is trivial. + whitelist := *r + whitelist.Delta = enginepb.MVCCStatsDelta{} + whitelist.Timestamp = hlc.Timestamp{} + whitelist.DeprecatedDelta = nil + whitelist.PrevLeaseProposal = nil + whitelist.State = nil + whitelist.RaftLogDelta = 0 + whitelist.SuggestedCompactions = nil + return whitelist.Equal(storagepb.ReplicatedEvalResult{}) +} + +// stageTrivialReplicatedEvalResult applies the trivial portions of replicatedResult to +// the supplied replicaState and returns whether the change implied an update to +// the replica's truncated state. This function modifies replicaState but does +// not modify replicatedResult in order to give the TestingPostApplyFilter testing knob +// an opportunity to inspect the command's ReplicatedEvalResult. +func stageTrivialReplicatedEvalResult( + ctx context.Context, + replicatedResult *storagepb.ReplicatedEvalResult, + raftAppliedIndex, leaseAppliedIndex uint64, + replicaState *storagepb.ReplicaState, +) (truncatedStateUpdated bool) { + deltaStats := replicatedResult.Delta.ToStats() + replicaState.Stats.Add(deltaStats) + if raftAppliedIndex != 0 { + replicaState.RaftAppliedIndex = raftAppliedIndex + } + if leaseAppliedIndex != 0 { + replicaState.LeaseAppliedIndex = leaseAppliedIndex + } + haveState := replicatedResult.State != nil + truncatedStateUpdated = haveState && replicatedResult.State.TruncatedState != nil + if truncatedStateUpdated { + replicaState.TruncatedState = replicatedResult.State.TruncatedState + } + return truncatedStateUpdated +} + +// clearTrivialReplicatedEvalResultFields is used to zero out the fields of a +// ReplicatedEvalResult that have already been consumed when staging the +// corresponding command and applying it to the current batch's view of the +// ReplicaState. This function is called after a batch has been written to the +// storage engine. For trivial commands this function should result in a zero +// value replicatedResult. +func clearTrivialReplicatedEvalResultFields( + replicatedResult *storagepb.ReplicatedEvalResult, usingAppliedStateKey bool, +) { + // Fields for which no action is taken in this method are zeroed so that + // they don't trigger an assertion at the end of the application process + // (which checks that all fields were handled). + replicatedResult.IsLeaseRequest = false + replicatedResult.Timestamp = hlc.Timestamp{} + replicatedResult.PrevLeaseProposal = nil + // The state fields cleared here were already applied to the in-memory view of + // replica state for this batch. + if haveState := replicatedResult.State != nil; haveState { + replicatedResult.State.Stats = nil + replicatedResult.State.TruncatedState = nil + + // If we're already using the AppliedStateKey then there's nothing + // to do. This flag is idempotent so it's ok that we see this flag + // multiple times, but we want to make sure it doesn't cause us to + // perform repeated state assertions, so clear it before the + // shouldAssert determination. + // A reader might wonder if using the value of usingAppliedState key from + // after applying an entire batch is valid to determine whether this command + // implied a transition, but if it had implied a transition then the batch + // would not have been considered trivial and the current view of will still + // be false as complex state transitions are handled after this call. + if usingAppliedStateKey { + replicatedResult.State.UsingAppliedStateKey = false + } + // ReplicaState.Stats was previously non-nullable which caused nodes to + // send a zero-value MVCCStats structure. If the proposal was generated by + // an old node, we'll have decoded that zero-value structure setting + // ReplicaState.Stats to a non-nil value which would trigger the "unhandled + // field in ReplicatedEvalResult" assertion to fire if we didn't clear it. + // TODO(ajwerner): eliminate this case that likely can no longer occur as of + // at least 19.1. + if replicatedResult.State.Stats != nil && (*replicatedResult.State.Stats == enginepb.MVCCStats{}) { + replicatedResult.State.Stats = nil + } + if *replicatedResult.State == (storagepb.ReplicaState{}) { + replicatedResult.State = nil + } + } + replicatedResult.Delta = enginepb.MVCCStatsDelta{} +} + +// handleRaftCommandResult is called after the current cmd's batch has been +// committed to the storage engine and the trivial side-effects have been +// applied to the Replica's in-memory state. This method deals with applying +// non-trivial side effects, notifying waiting clients, releasing latches, +// informing raft about applied config changes, and asserting hard state as +// required. +func (r *Replica) handleRaftCommandResult( + ctx context.Context, cmd *cmdAppCtx, isNonTrivial, usingAppliedStateKey bool, +) (errExpl string, err error) { + // Set up the local result prior to handling the ReplicatedEvalResult to + // give testing knobs an opportunity to inspect it. + r.prepareLocalResult(cmd.ctx, cmd) + // Handle the ReplicatedEvalResult, executing any side effects of the last + // state machine transition. + // + // Note that this must happen after committing (the engine.Batch), but + // before notifying a potentially waiting client. + clearTrivialReplicatedEvalResultFields(cmd.replicatedResult(), usingAppliedStateKey) + if isNonTrivial { + r.handleComplexReplicatedEvalResult(cmd.ctx, *cmd.replicatedResult()) + } else if !cmd.replicatedResult().Equal(storagepb.ReplicatedEvalResult{}) { + log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v", + cmd.replicatedResult()) + } + + // NB: Perform state assertion before acknowledging the client. + // Some tests (TestRangeStatsInit) assumes that once the store has started + // and the first range has a lease that there will not be a later hard-state. + if isNonTrivial { + // Assert that the on-disk state doesn't diverge from the in-memory + // state as a result of the side effects. + r.mu.Lock() + r.assertStateLocked(ctx, r.store.Engine()) + r.mu.Unlock() + } + + if cmd.localResult != nil { + r.handleLocalEvalResult(cmd.ctx, *cmd.localResult) + } + r.finishRaftCommand(cmd.ctx, cmd) + switch cmd.e.Type { + case raftpb.EntryNormal: + if cmd.replicatedResult().ChangeReplicas != nil { + log.Fatalf(cmd.ctx, "unexpected replication change from command %s", &cmd.raftCmd) + } + case raftpb.EntryConfChange: + if cmd.replicatedResult().ChangeReplicas == nil { + cmd.cc = raftpb.ConfChange{} + } + if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + raftGroup.ApplyConfChange(cmd.cc) + return true, nil + }); err != nil { + const errExpl = "during ApplyConfChange" + return errExpl, errors.Wrap(err, errExpl) + } + } + return "", nil +} + +// handleComplexReplicatedEvalResult carries out the side-effects of non-trivial +// commands. It is run with the raftMu locked. It is illegal to pass a +// replicatedResult that does not imply any side-effects. +func (r *Replica) handleComplexReplicatedEvalResult( + ctx context.Context, replicatedResult storagepb.ReplicatedEvalResult, +) { + + // Assert that this replicatedResult implies at least one side-effect. + if replicatedResult.Equal(storagepb.ReplicatedEvalResult{}) { + log.Fatalf(ctx, "zero-value ReplicatedEvalResult passed to handleComplexReplicatedEvalResult") + } + + // Process Split or Merge. This needs to happen after stats update because + // of the ContainsEstimates hack. + if replicatedResult.Split != nil { + splitPostApply( + r.AnnotateCtx(ctx), + replicatedResult.Split.RHSDelta, + &replicatedResult.Split.SplitTrigger, + r, + ) + replicatedResult.Split = nil + } + + if replicatedResult.Merge != nil { + if err := r.store.MergeRange( + ctx, r, replicatedResult.Merge.LeftDesc, replicatedResult.Merge.RightDesc, replicatedResult.Merge.FreezeStart, + ); err != nil { + // Our in-memory state has diverged from the on-disk state. + log.Fatalf(ctx, "failed to update store after merging range: %s", err) + } + replicatedResult.Merge = nil + } + + // Update the remaining ReplicaState. + if replicatedResult.State != nil { + if newDesc := replicatedResult.State.Desc; newDesc != nil { + r.setDesc(ctx, newDesc) + replicatedResult.State.Desc = nil + } + + if newLease := replicatedResult.State.Lease; newLease != nil { + r.leasePostApply(ctx, *newLease, false /* permitJump */) + replicatedResult.State.Lease = nil + } + + if newThresh := replicatedResult.State.GCThreshold; newThresh != nil { + if (*newThresh != hlc.Timestamp{}) { + r.mu.Lock() + r.mu.state.GCThreshold = newThresh + r.mu.Unlock() + } + replicatedResult.State.GCThreshold = nil + } + + if newThresh := replicatedResult.State.TxnSpanGCThreshold; newThresh != nil { + if (*newThresh != hlc.Timestamp{}) { + r.mu.Lock() + r.mu.state.TxnSpanGCThreshold = newThresh + r.mu.Unlock() + } + replicatedResult.State.TxnSpanGCThreshold = nil + } + + if replicatedResult.State.UsingAppliedStateKey { + r.mu.Lock() + r.mu.state.UsingAppliedStateKey = true + r.mu.Unlock() + replicatedResult.State.UsingAppliedStateKey = false + } + + if (*replicatedResult.State == storagepb.ReplicaState{}) { + replicatedResult.State = nil + } + } + + if change := replicatedResult.ChangeReplicas; change != nil { + if change.ChangeType == roachpb.REMOVE_REPLICA && + r.store.StoreID() == change.Replica.StoreID { + // This wants to run as late as possible, maximizing the chances + // that the other nodes have finished this command as well (since + // processing the removal from the queue looks up the Range at the + // lease holder, being too early here turns this into a no-op). + r.store.replicaGCQueue.AddAsync(ctx, r, replicaGCPriorityRemoved) + } + replicatedResult.ChangeReplicas = nil + } + + if replicatedResult.ComputeChecksum != nil { + r.computeChecksumPostApply(ctx, *replicatedResult.ComputeChecksum) + replicatedResult.ComputeChecksum = nil + } + + if !replicatedResult.Equal(storagepb.ReplicatedEvalResult{}) { + log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(replicatedResult, storagepb.ReplicatedEvalResult{})) + } +} + +// prepareLocalResult is performed after the command has been committed to the +// engine but before its side-effects have been applied to the Replica's +// in-memory state. This method gives the command an opportunity to interact +// with testing knobs and to set up its local result if it was proposed +// locally. This is performed prior to handling the command's +// ReplicatedEvalResult because the process of handling the replicated eval +// result will zero-out the struct to ensure that is has properly performed all +// of the implied side-effects. +func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { + var pErr *roachpb.Error + if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; filter != nil { + var newPropRetry int + newPropRetry, pErr = filter(storagebase.ApplyFilterArgs{ + CmdID: cmd.idKey, + ReplicatedEvalResult: *cmd.replicatedResult(), + StoreID: r.store.StoreID(), + RangeID: r.RangeID, + }) + if cmd.proposalRetry == 0 { + cmd.proposalRetry = proposalReevaluationReason(newPropRetry) + } + // calling maybeSetCorrupt here is mostly for tests and looks. The + // interesting errors originate in applyRaftCommandToBatch, and they are + // already handled above. + pErr = r.maybeSetCorrupt(ctx, pErr) + } + if pErr == nil { + pErr = cmd.forcedErr + } + + if cmd.proposedLocally() { + if cmd.proposalRetry != proposalNoReevaluation && pErr == nil { + log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal) + } + if pErr != nil { + // A forced error was set (i.e. we did not apply the proposal, + // for instance due to its log position) or the Replica is now + // corrupted. + // If proposalRetry is set, we don't also return an error, as per the + // proposalResult contract. + if cmd.proposalRetry == proposalNoReevaluation { + cmd.response.Err = pErr + } + } else if cmd.proposal.Local.Reply != nil { + cmd.response.Reply = cmd.proposal.Local.Reply + } else { + log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal) + } + cmd.response.Intents = cmd.proposal.Local.DetachIntents() + cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + if pErr == nil { + cmd.localResult = cmd.proposal.Local + } + } + if pErr != nil && cmd.localResult != nil { + log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) + } + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEvent(ctx, 2, cmd.localResult.String()) + } +} + +// finishRaftCommand is called after a command's side effects have been applied +// in order to acknowledge clients and release latches. +func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) { + + // Provide the command's corresponding logical operations to the + // Replica's rangefeed. Only do so if the WriteBatch is nonnil, + // otherwise it's valid for the logical op log to be nil, which + // would shut down all rangefeeds. If no rangefeed is running, + // this call will be a noop. + if cmd.raftCmd.WriteBatch != nil { + r.handleLogicalOpLogRaftMuLocked(ctx, cmd.raftCmd.LogicalOpLog) + } else if cmd.raftCmd.LogicalOpLog != nil { + log.Fatalf(ctx, "nonnil logical op log with nil write batch: %v", cmd.raftCmd) + } + + // When set to true, recomputes the stats for the LHS and RHS of splits and + // makes sure that they agree with the state's range stats. + const expensiveSplitAssertion = false + + if expensiveSplitAssertion && cmd.replicatedResult().Split != nil { + split := cmd.replicatedResult().Split + lhsStatsMS := r.GetMVCCStats() + lhsComputedMS, err := rditer.ComputeStatsForRange(&split.LeftDesc, r.store.Engine(), lhsStatsMS.LastUpdateNanos) + if err != nil { + log.Fatal(ctx, err) + } + + rightReplica, err := r.store.GetReplica(split.RightDesc.RangeID) + if err != nil { + log.Fatal(ctx, err) + } + + rhsStatsMS := rightReplica.GetMVCCStats() + rhsComputedMS, err := rditer.ComputeStatsForRange(&split.RightDesc, r.store.Engine(), rhsStatsMS.LastUpdateNanos) + if err != nil { + log.Fatal(ctx, err) + } + + if diff := pretty.Diff(lhsStatsMS, lhsComputedMS); len(diff) > 0 { + log.Fatalf(ctx, "LHS split stats divergence: diff(claimed, computed) = %s", pretty.Diff(lhsStatsMS, lhsComputedMS)) + } + if diff := pretty.Diff(rhsStatsMS, rhsComputedMS); len(diff) > 0 { + log.Fatalf(ctx, "RHS split stats divergence diff(claimed, computed) = %s", pretty.Diff(rhsStatsMS, rhsComputedMS)) + } + } + + if cmd.proposedLocally() { + // If we failed to apply at the right lease index, try again with + // a new one. This is important for pipelined writes, since they + // don't have a client watching to retry, so a failure to + // eventually apply the proposal would be a uservisible error. + // TODO(nvanbenschoten): This reproposal is not tracked by the + // quota pool. We should fix that. + if cmd.proposalRetry == proposalIllegalLeaseIndex && + r.tryReproposeWithNewLeaseIndex(cmd.proposal) { + return + } + // Otherwise, signal the command's status to the client. + cmd.proposal.finishApplication(cmd.response) + } else if cmd.response.Err != nil { + log.VEventf(ctx, 1, "applying raft command resulted in error: %s", cmd.response.Err) + } +} diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index bc6ac6416e86..f1f236e41be9 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -531,254 +531,6 @@ func addSSTablePreApply( return copied } -func (r *Replica) handleReplicatedEvalResult( - ctx context.Context, - rResult storagepb.ReplicatedEvalResult, - raftAppliedIndex, leaseAppliedIndex uint64, -) (shouldAssert bool) { - // Fields for which no action is taken in this method are zeroed so that - // they don't trigger an assertion at the end of the method (which checks - // that all fields were handled). - { - rResult.IsLeaseRequest = false - rResult.Timestamp = hlc.Timestamp{} - rResult.PrevLeaseProposal = nil - } - - if rResult.BlockReads { - r.readOnlyCmdMu.Lock() - defer r.readOnlyCmdMu.Unlock() - rResult.BlockReads = false - } - - // Update MVCC stats and Raft portion of ReplicaState. - deltaStats := rResult.Delta.ToStats() - r.mu.Lock() - r.mu.state.Stats.Add(deltaStats) - if raftAppliedIndex != 0 { - r.mu.state.RaftAppliedIndex = raftAppliedIndex - } - if leaseAppliedIndex != 0 { - r.mu.state.LeaseAppliedIndex = leaseAppliedIndex - } - needsSplitBySize := r.needsSplitBySizeRLocked() - needsMergeBySize := r.needsMergeBySizeRLocked() - r.mu.Unlock() - - r.store.metrics.addMVCCStats(deltaStats) - rResult.Delta = enginepb.MVCCStatsDelta{} - - // NB: the bootstrap store has a nil split queue. - // TODO(tbg): the above is probably a lie now. - if r.store.splitQueue != nil && needsSplitBySize && r.splitQueueThrottle.ShouldProcess(timeutil.Now()) { - r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - - // The bootstrap store has a nil merge queue. - // TODO(tbg): the above is probably a lie now. - if r.store.mergeQueue != nil && needsMergeBySize && r.mergeQueueThrottle.ShouldProcess(timeutil.Now()) { - // TODO(tbg): for ranges which are small but protected from merges by - // other means (zone configs etc), this is called on every command, and - // fires off a goroutine each time. Make this trigger (and potentially - // the split one above, though it hasn't been observed to be as - // bothersome) less aggressive. - r.store.mergeQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - - // The above are always present. The following are not always present but - // should not trigger a ReplicaState assertion because they are either too - // frequent to do so or because they do not change the ReplicaState. - - if rResult.State != nil { - // Raft log truncation is too frequent to justify a replica state - // assertion. - if newTruncState := rResult.State.TruncatedState; newTruncState != nil { - rResult.State.TruncatedState = nil // for assertion - - r.mu.Lock() - r.mu.state.TruncatedState = newTruncState - r.mu.Unlock() - - // Clear any entries in the Raft log entry cache for this range up - // to and including the most recently truncated index. - r.store.raftEntryCache.Clear(r.RangeID, newTruncState.Index+1) - - // Truncate the sideloaded storage. Note that this is safe only if the new truncated state - // is durably on disk (i.e.) synced. This is true at the time of writing but unfortunately - // could rot. - { - log.Eventf(ctx, "truncating sideloaded storage up to (and including) index %d", newTruncState.Index) - if size, _, err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil { - // We don't *have* to remove these entries for correctness. Log a - // loud error, but keep humming along. - log.Errorf(ctx, "while removing sideloaded files during log truncation: %+v", err) - } else { - rResult.RaftLogDelta -= size - } - } - } - - // ReplicaState.Stats was previously non-nullable which caused nodes to - // send a zero-value MVCCStats structure. If the proposal was generated by - // an old node, we'll have decoded that zero-value structure setting - // ReplicaState.Stats to a non-nil value which would trigger the "unhandled - // field in ReplicatedEvalResult" assertion to fire if we didn't clear it. - if rResult.State.Stats != nil && (*rResult.State.Stats == enginepb.MVCCStats{}) { - rResult.State.Stats = nil - } - - if rResult.State.UsingAppliedStateKey { - r.mu.Lock() - // If we're already using the AppliedStateKey then there's nothing - // to do. This flag is idempotent so it's ok that we see this flag - // multiple times, but we want to make sure it doesn't cause us to - // perform repeated state assertions, so clear it before the - // shouldAssert determination. - if r.mu.state.UsingAppliedStateKey { - rResult.State.UsingAppliedStateKey = false - } - r.mu.Unlock() - } - - if (*rResult.State == storagepb.ReplicaState{}) { - rResult.State = nil - } - } - - if rResult.RaftLogDelta != 0 { - r.mu.Lock() - r.mu.raftLogSize += rResult.RaftLogDelta - r.mu.raftLogLastCheckSize += rResult.RaftLogDelta - // Ensure raftLog{,LastCheck}Size is not negative since it isn't persisted - // between server restarts. - if r.mu.raftLogSize < 0 { - r.mu.raftLogSize = 0 - } - if r.mu.raftLogLastCheckSize < 0 { - r.mu.raftLogLastCheckSize = 0 - } - r.mu.Unlock() - rResult.RaftLogDelta = 0 - } else { - // Check for whether to queue the range for Raft log truncation if this is - // not a Raft log truncation command itself. We don't want to check the - // Raft log for truncation on every write operation or even every operation - // which occurs after the Raft log exceeds RaftLogQueueStaleSize. The logic - // below queues the replica for possible Raft log truncation whenever an - // additional RaftLogQueueStaleSize bytes have been written to the Raft - // log. - r.mu.Lock() - checkRaftLog := r.mu.raftLogSize-r.mu.raftLogLastCheckSize >= RaftLogQueueStaleSize - if checkRaftLog { - r.mu.raftLogLastCheckSize = r.mu.raftLogSize - } - r.mu.Unlock() - if checkRaftLog { - r.store.raftLogQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - } - - for _, sc := range rResult.SuggestedCompactions { - r.store.compactor.Suggest(ctx, sc) - } - rResult.SuggestedCompactions = nil - - // The rest of the actions are "nontrivial" and may have large effects on the - // in-memory and on-disk ReplicaStates. If any of these actions are present, - // we want to assert that these two states do not diverge. - shouldAssert = !rResult.Equal(storagepb.ReplicatedEvalResult{}) - - // Process Split or Merge. This needs to happen after stats update because - // of the ContainsEstimates hack. - - if rResult.Split != nil { - splitPostApply( - r.AnnotateCtx(ctx), - rResult.Split.RHSDelta, - &rResult.Split.SplitTrigger, - r, - ) - rResult.Split = nil - } - - if rResult.Merge != nil { - if err := r.store.MergeRange( - ctx, r, rResult.Merge.LeftDesc, rResult.Merge.RightDesc, rResult.Merge.FreezeStart, - ); err != nil { - // Our in-memory state has diverged from the on-disk state. - log.Fatalf(ctx, "failed to update store after merging range: %+v", err) - } - rResult.Merge = nil - } - - // Update the remaining ReplicaState. - - if rResult.State != nil { - if newDesc := rResult.State.Desc; newDesc != nil { - r.setDesc(ctx, newDesc) - rResult.State.Desc = nil - } - - if newLease := rResult.State.Lease; newLease != nil { - r.leasePostApply(ctx, *newLease, false /* permitJump */) - rResult.State.Lease = nil - } - - if newThresh := rResult.State.GCThreshold; newThresh != nil { - if (*newThresh != hlc.Timestamp{}) { - r.mu.Lock() - r.mu.state.GCThreshold = newThresh - r.mu.Unlock() - } - rResult.State.GCThreshold = nil - } - - if newThresh := rResult.State.TxnSpanGCThreshold; newThresh != nil { - if (*newThresh != hlc.Timestamp{}) { - r.mu.Lock() - r.mu.state.TxnSpanGCThreshold = newThresh - r.mu.Unlock() - } - rResult.State.TxnSpanGCThreshold = nil - } - - if rResult.State.UsingAppliedStateKey { - r.mu.Lock() - r.mu.state.UsingAppliedStateKey = true - r.mu.Unlock() - rResult.State.UsingAppliedStateKey = false - } - - if (*rResult.State == storagepb.ReplicaState{}) { - rResult.State = nil - } - } - - if change := rResult.ChangeReplicas; change != nil { - if change.ChangeType == roachpb.REMOVE_REPLICA && - r.store.StoreID() == change.Replica.StoreID { - // This wants to run as late as possible, maximizing the chances - // that the other nodes have finished this command as well (since - // processing the removal from the queue looks up the Range at the - // lease holder, being too early here turns this into a no-op). - // Lock ordering dictates that we don't hold any mutexes when adding, - // so we fire it off in a task. - r.store.replicaGCQueue.AddAsync(ctx, r, replicaGCPriorityRemoved) - } - rResult.ChangeReplicas = nil - } - - if rResult.ComputeChecksum != nil { - r.computeChecksumPostApply(ctx, *rResult.ComputeChecksum) - rResult.ComputeChecksum = nil - } - - if !rResult.Equal(storagepb.ReplicatedEvalResult{}) { - log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(rResult, storagepb.ReplicatedEvalResult{})) - } - return shouldAssert -} - func (r *Replica) handleLocalEvalResult(ctx context.Context, lResult result.LocalResult) { // Fields for which no action is taken in this method are zeroed so that // they don't trigger an assertion at the end of the method (which checks @@ -861,25 +613,6 @@ func (r *Replica) handleLocalEvalResult(ctx context.Context, lResult result.Loca } } -func (r *Replica) handleEvalResultRaftMuLocked( - ctx context.Context, - lResult *result.LocalResult, - rResult storagepb.ReplicatedEvalResult, - raftAppliedIndex, leaseAppliedIndex uint64, -) { - shouldAssert := r.handleReplicatedEvalResult(ctx, rResult, raftAppliedIndex, leaseAppliedIndex) - if lResult != nil { - r.handleLocalEvalResult(ctx, *lResult) - } - if shouldAssert { - // Assert that the on-disk state doesn't diverge from the in-memory - // state as a result of the side effects. - r.mu.Lock() - r.assertStateLocked(ctx, r.store.Engine()) - r.mu.Unlock() - } -} - // proposalResult indicates the result of a proposal. Exactly one of // Reply and Err is set, and it represents the result of the proposal. type proposalResult struct { diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index c1515e125994..a563c0acc0f1 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -20,10 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -37,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/kr/pretty" "github.com/pkg/errors" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" @@ -375,7 +371,7 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { } type handleRaftReadyStats struct { - processed int + handleCommittedEntriesStats } // noSnap can be passed to handleRaftReady when no snapshot should be processed. @@ -699,92 +695,35 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.raftEntryCache.Add(r.RangeID, rd.Entries, true /* truncate */) r.sendRaftMessages(ctx, otherMsgs) r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") - applicationStart := timeutil.Now() - for _, e := range rd.CommittedEntries { - switch e.Type { - case raftpb.EntryNormal: - // NB: Committed entries are handed to us by Raft. Raft does not - // know about sideloading. Consequently the entries here are all - // already inlined. - - var commandID storagebase.CmdIDKey - var command storagepb.RaftCommand - - // Process committed entries. etcd raft occasionally adds a nil entry - // (our own commands are never empty). This happens in two situations: - // When a new leader is elected, and when a config change is dropped due - // to the "one at a time" rule. In both cases we may need to resubmit our - // pending proposals (In the former case we resubmit everything because - // we proposed them to a former leader that is no longer able to commit - // them. In the latter case we only need to resubmit pending config - // changes, but it's hard to distinguish so we resubmit everything - // anyway). We delay resubmission until after we have processed the - // entire batch of entries. - if len(e.Data) == 0 { - // Overwrite unconditionally since this is the most aggressive - // reproposal mode. - if !r.store.TestingKnobs().DisableRefreshReasonNewLeaderOrConfigChange { - refreshReason = reasonNewLeaderOrConfigChange - } - commandID = "" // special-cased value, command isn't used - } else { - var encodedCommand []byte - commandID, encodedCommand = DecodeRaftCommand(e.Data) - // An empty command is used to unquiesce a range and wake the - // leader. Clear commandID so it's ignored for processing. - if len(encodedCommand) == 0 { - commandID = "" - } else if err := protoutil.Unmarshal(encodedCommand, &command); err != nil { - const expl = "while unmarshalling entry" - return stats, expl, errors.Wrap(err, expl) - } - } - - if changedRepl := r.processRaftCommand(ctx, commandID, e.Term, e.Index, command); changedRepl { - log.Fatalf(ctx, "unexpected replication change from command %s", &command) - } - r.store.metrics.RaftCommandsApplied.Inc(1) - stats.processed++ - - case raftpb.EntryConfChange: - var cc raftpb.ConfChange - if err := protoutil.Unmarshal(e.Data, &cc); err != nil { - const expl = "while unmarshaling ConfChange" - return stats, expl, errors.Wrap(err, expl) - } - var ccCtx ConfChangeContext - if err := protoutil.Unmarshal(cc.Context, &ccCtx); err != nil { - const expl = "while unmarshaling ConfChangeContext" - return stats, expl, errors.Wrap(err, expl) + applicationStart := timeutil.Now() + if len(rd.CommittedEntries) > 0 { + var expl string + stats.handleCommittedEntriesStats, expl, err = + r.handleCommittedEntriesRaftMuLocked(ctx, rd.CommittedEntries) + if err != nil { + return stats, expl, err + } + // etcd raft occasionally adds a nil entry (our own commands are never + // empty). This happens in two situations: When a new leader is elected, and + // when a config change is dropped due to the "one at a time" rule. In both + // cases we may need to resubmit our pending proposals (In the former case + // we resubmit everything because we proposed them to a former leader that + // is no longer able to commit them. In the latter case we only need to + // resubmit pending config changes, but it's hard to distinguish so we + // resubmit everything anyway). We delay resubmission until after we have + // processed the entire batch of entries. + if stats.numEmptyEntries > 0 { + // Overwrite unconditionally since this is the most aggressive + // reproposal mode. + if !r.store.TestingKnobs().DisableRefreshReasonNewLeaderOrConfigChange { + refreshReason = reasonNewLeaderOrConfigChange } - var command storagepb.RaftCommand - if err := protoutil.Unmarshal(ccCtx.Payload, &command); err != nil { - const expl = "while unmarshaling RaftCommand" - return stats, expl, errors.Wrap(err, expl) - } - commandID := storagebase.CmdIDKey(ccCtx.CommandID) - if changedRepl := r.processRaftCommand( - ctx, commandID, e.Term, e.Index, command, - ); !changedRepl { - // If we did not apply the config change, tell raft that the config change was aborted. - cc = raftpb.ConfChange{} - } - stats.processed++ - - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { - raftGroup.ApplyConfChange(cc) - return true, nil - }); err != nil { - const expl = "during ApplyConfChange" - return stats, expl, errors.Wrap(err, expl) - } - default: - log.Fatalf(ctx, "unexpected Raft entry: %v", e) } } applicationElapsed := timeutil.Since(applicationStart).Nanoseconds() r.store.metrics.RaftApplyCommittedLatency.RecordValue(applicationElapsed) + if refreshReason != noReason { r.mu.Lock() r.refreshProposalsLocked(0, refreshReason) @@ -1196,151 +1135,6 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID } } -func (r *Replica) checkForcedErrLocked( - ctx context.Context, - idKey storagebase.CmdIDKey, - raftCmd storagepb.RaftCommand, - proposal *ProposalData, - proposedLocally bool, -) (uint64, proposalReevaluationReason, *roachpb.Error) { - leaseIndex := r.mu.state.LeaseAppliedIndex - - isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest - var requestedLease roachpb.Lease - if isLeaseRequest { - requestedLease = *raftCmd.ReplicatedEvalResult.State.Lease - } - if idKey == "" { - // This is an empty Raft command (which is sent by Raft after elections - // to trigger reproposals or during concurrent configuration changes). - // Nothing to do here except making sure that the corresponding batch - // (which is bogus) doesn't get executed (for it is empty and so - // properties like key range are undefined). - return leaseIndex, proposalNoReevaluation, roachpb.NewErrorf("no-op on empty Raft entry") - } - - // Verify the lease matches the proposer's expectation. We rely on - // the proposer's determination of whether the existing lease is - // held, and can be used, or is expired, and can be replaced. - // Verify checks that the lease has not been modified since proposal - // due to Raft delays / reorderings. - // To understand why this lease verification is necessary, see comments on the - // proposer_lease field in the proto. - leaseMismatch := false - if raftCmd.DeprecatedProposerLease != nil { - // VersionLeaseSequence must not have been active when this was proposed. - // - // This does not prevent the lease race condition described below. The - // reason we don't fix this here as well is because fixing the race - // requires a new cluster version which implies that we'll already be - // using lease sequence numbers and will fall into the case below. - leaseMismatch = !raftCmd.DeprecatedProposerLease.Equivalent(*r.mu.state.Lease) - } else { - leaseMismatch = raftCmd.ProposerLeaseSequence != r.mu.state.Lease.Sequence - if !leaseMismatch && isLeaseRequest { - // Lease sequence numbers are a reflection of lease equivalency - // between subsequent leases. However, Lease.Equivalent is not fully - // symmetric, meaning that two leases may be Equivalent to a third - // lease but not Equivalent to each other. If these leases are - // proposed under that same third lease, neither will be able to - // detect whether the other has applied just by looking at the - // current lease sequence number because neither will will increment - // the sequence number. - // - // This can lead to inversions in lease expiration timestamps if - // we're not careful. To avoid this, if a lease request's proposer - // lease sequence matches the current lease sequence and the current - // lease sequence also matches the requested lease sequence, we make - // sure the requested lease is Equivalent to current lease. - if r.mu.state.Lease.Sequence == requestedLease.Sequence { - // It is only possible for this to fail when expiration-based - // lease extensions are proposed concurrently. - leaseMismatch = !r.mu.state.Lease.Equivalent(requestedLease) - } - - // This is a check to see if the lease we proposed this lease request against is the same - // lease that we're trying to update. We need to check proposal timestamps because - // extensions don't increment sequence numbers. Without this check a lease could - // be extended and then another lease proposed against the original lease would - // be applied over the extension. - if raftCmd.ReplicatedEvalResult.PrevLeaseProposal != nil && - (*raftCmd.ReplicatedEvalResult.PrevLeaseProposal != *r.mu.state.Lease.ProposedTS) { - leaseMismatch = true - } - } - } - if leaseMismatch { - log.VEventf( - ctx, 1, - "command proposed from replica %+v with lease #%d incompatible to %v", - raftCmd.ProposerReplica, raftCmd.ProposerLeaseSequence, *r.mu.state.Lease, - ) - if isLeaseRequest { - // For lease requests we return a special error that - // redirectOnOrAcquireLease() understands. Note that these - // requests don't go through the DistSender. - return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ - Existing: *r.mu.state.Lease, - Requested: requestedLease, - Message: "proposed under invalid lease", - }) - } - // We return a NotLeaseHolderError so that the DistSender retries. - nlhe := newNotLeaseHolderError( - r.mu.state.Lease, raftCmd.ProposerReplica.StoreID, r.mu.state.Desc) - nlhe.CustomMsg = fmt.Sprintf( - "stale proposal: command was proposed under lease #%d but is being applied "+ - "under lease: %s", raftCmd.ProposerLeaseSequence, r.mu.state.Lease) - return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe) - } - - if isLeaseRequest { - // Lease commands are ignored by the counter (and their MaxLeaseIndex is ignored). This - // makes sense since lease commands are proposed by anyone, so we can't expect a coherent - // MaxLeaseIndex. Also, lease proposals are often replayed, so not making them update the - // counter makes sense from a testing perspective. - // - // However, leases get special vetting to make sure we don't give one to a replica that was - // since removed (see #15385 and a comment in redirectOnOrAcquireLease). - if _, ok := r.mu.state.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok { - return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ - Existing: *r.mu.state.Lease, - Requested: requestedLease, - Message: "replica not part of range", - }) - } - } else if r.mu.state.LeaseAppliedIndex < raftCmd.MaxLeaseIndex { - // The happy case: the command is applying at or ahead of the minimal - // permissible index. It's ok if it skips a few slots (as can happen - // during rearrangement); this command will apply, but later ones which - // were proposed at lower indexes may not. Overall though, this is more - // stable and simpler than requiring commands to apply at their exact - // lease index: Handling the case in which MaxLeaseIndex > oldIndex+1 - // is otherwise tricky since we can't tell the client to try again - // (reproposals could exist and may apply at the right index, leading - // to a replay), and assigning the required index would be tedious - // seeing that it would have to rewind sometimes. - leaseIndex = raftCmd.MaxLeaseIndex - } else { - // The command is trying to apply at a past log position. That's - // unfortunate and hopefully rare; the client on the proposer will try - // again. Note that in this situation, the leaseIndex does not advance. - retry := proposalNoReevaluation - if proposedLocally { - log.VEventf( - ctx, 1, - "retry proposal %x: applied at lease index %d, required < %d", - proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, - ) - retry = proposalIllegalLeaseIndex - } - return leaseIndex, retry, roachpb.NewErrorf( - "command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex, - ) - } - return leaseIndex, proposalNoReevaluation, nil -} - type snapTruncationInfo struct { index uint64 deadline time.Time @@ -1591,383 +1385,6 @@ func (m lastUpdateTimesMap) isFollowerActive( return now.Sub(lastUpdateTime) <= MaxQuotaReplicaLivenessDuration } -// processRaftCommand handles the complexities involved in moving the Raft -// state of a Replica forward. At a high level, it receives a proposal, which -// contains the evaluation of a batch (at its heart a WriteBatch, to be applied -// to the underlying storage engine), which it applies and for which it signals -// the client waiting for it (if it's waiting on this Replica). -// -// The proposal also contains auxiliary data which needs to be verified in order -// to decide whether the proposal should be applied: the command's MaxLeaseIndex -// must move the state machine's LeaseAppliedIndex forward, and the proposer's -// lease (or rather its sequence number) must match that of the state machine. -// Furthermore, the GCThreshold is validated and it is checked whether the -// request's key span is contained in the Replica's (it is unclear whether all -// of these checks are necessary). If any of the checks fail, the proposal's -// content is wiped and we apply an empty log entry instead, returning an error -// to the caller to handle. The two typical cases are the lease mismatch (in -// which case the caller tries to send the command to the actual leaseholder) -// and violations of the LeaseAppliedIndex (in which the caller tries again). -// -// Assuming all checks were passed, the command should be applied to the engine, -// which is done by the aptly named applyRaftCommand. -// -// For simple proposals this is the whole story, but some commands trigger -// additional code in this method. The standard way in which this is triggered -// is via a side effect communicated in the proposal's ReplicatedEvalResult and, -// for local proposals, the LocalEvalResult. These might, for example, trigger -// an update of the Replica's in-memory state to match updates to the on-disk -// state, or pass intents to the intent resolver. Some commands don't fit this -// simple schema and need to hook deeper into the code. Notably splits and merges -// need to acquire locks on their right-hand side Replicas and may need to add -// data to the WriteBatch before it is applied; similarly, changes to the disk -// layout of internal state typically require a migration which shows up here. -// -// This method returns true if the command successfully applied a replica -// change. -func (r *Replica) processRaftCommand( - ctx context.Context, - idKey storagebase.CmdIDKey, - term, raftIndex uint64, - raftCmd storagepb.RaftCommand, -) (changedRepl bool) { - if raftIndex == 0 { - log.Fatalf(ctx, "processRaftCommand requires a non-zero index") - } - - if log.V(4) { - log.Infof(ctx, "processing command %x: maxLeaseIndex=%d", idKey, raftCmd.MaxLeaseIndex) - } - - var ts hlc.Timestamp - if idKey != "" { - ts = raftCmd.ReplicatedEvalResult.Timestamp - } - - r.mu.Lock() - proposal, proposedLocally := r.mu.proposals[idKey] - - // TODO(tschottdorf): consider the Trace situation here. - if proposedLocally { - // We initiated this command, so use the caller-supplied context. - ctx = proposal.ctx - delete(r.mu.proposals, idKey) - - // At this point we're not guaranteed to have proposalQuota initialized, - // the same is true for quotaReleaseQueues. Only queue the proposal's - // quota for release if the proposalQuota is initialized. - if r.mu.proposalQuota != nil { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, proposal.quotaSize) - } - } - - leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally) - - r.mu.Unlock() - - if forcedErr == nil { - // Verify that the batch timestamp is after the GC threshold. This is - // necessary because not all commands declare read access on the GC - // threshold key, even though they implicitly depend on it. This means - // that access to this state will not be serialized by latching, - // so we must perform this check upstream and downstream of raft. - // See #14833. - // - // We provide an empty key span because we already know that the Raft - // command is allowed to apply within its key range. This is guaranteed - // by checks upstream of Raft, which perform the same validation, and by - // span latches, which assure that any modifications to the range's - // boundaries will be serialized with this command. Finally, the - // leaseAppliedIndex check in checkForcedErrLocked ensures that replays - // outside of the spanlatch manager's control which break this - // serialization ordering will already by caught and an error will be - // thrown. - forcedErr = roachpb.NewError(r.requestCanProceed(roachpb.RSpan{}, ts)) - } - - // applyRaftCommand will return "expected" errors, but may also indicate - // replica corruption (as of now, signaled by a replicaCorruptionError). - // We feed its return through maybeSetCorrupt to act when that happens. - if forcedErr != nil { - log.VEventf(ctx, 1, "applying command with forced error: %s", forcedErr) - } else { - log.Event(ctx, "applying command") - - if splitMergeUnlock, err := r.maybeAcquireSplitMergeLock(ctx, raftCmd); err != nil { - log.Eventf(ctx, "unable to acquire split lock: %s", err) - // Send a crash report because a former bug in the error handling might have - // been the root cause of #19172. - _ = r.store.stopper.RunAsyncTask(ctx, "crash report", func(ctx context.Context) { - log.SendCrashReport( - ctx, - &r.store.cfg.Settings.SV, - 0, // depth - "while acquiring split lock: %s", - []interface{}{err}, - log.ReportTypeError, - ) - }) - - forcedErr = roachpb.NewError(err) - } else if splitMergeUnlock != nil { - // Close over raftCmd to capture its value at execution time; we clear - // ReplicatedEvalResult on certain errors. - defer func() { - splitMergeUnlock(raftCmd.ReplicatedEvalResult) - }() - } - } - - var response proposalResult - var writeBatch *storagepb.WriteBatch - { - if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; forcedErr == nil && filter != nil { - var newPropRetry int - newPropRetry, forcedErr = filter(storagebase.ApplyFilterArgs{ - CmdID: idKey, - ReplicatedEvalResult: raftCmd.ReplicatedEvalResult, - StoreID: r.store.StoreID(), - RangeID: r.RangeID, - }) - if proposalRetry == 0 { - proposalRetry = proposalReevaluationReason(newPropRetry) - } - } - - if forcedErr != nil { - // Apply an empty entry. - raftCmd.ReplicatedEvalResult = storagepb.ReplicatedEvalResult{} - raftCmd.WriteBatch = nil - raftCmd.LogicalOpLog = nil - } - - // Update the node clock with the serviced request. This maintains - // a high water mark for all ops serviced, so that received ops without - // a timestamp specified are guaranteed one higher than any op already - // executed for overlapping keys. - r.store.Clock().Update(ts) - - var pErr *roachpb.Error - if raftCmd.WriteBatch != nil { - writeBatch = raftCmd.WriteBatch - } - - if deprecatedDelta := raftCmd.ReplicatedEvalResult.DeprecatedDelta; deprecatedDelta != nil { - raftCmd.ReplicatedEvalResult.Delta = deprecatedDelta.ToStatsDelta() - raftCmd.ReplicatedEvalResult.DeprecatedDelta = nil - } - - // AddSSTable ingestions run before the actual batch. This makes sure - // that when the Raft command is applied, the ingestion has definitely - // succeeded. Note that we have taken precautions during command - // evaluation to avoid having mutations in the WriteBatch that affect - // the SSTable. Not doing so could result in order reversal (and missing - // values) here. If the key range we are ingesting into isn't empty, - // we're not using AddSSTable but a plain WriteBatch. - if raftCmd.ReplicatedEvalResult.AddSSTable != nil { - copied := addSSTablePreApply( - ctx, - r.store.cfg.Settings, - r.store.engine, - r.raftMu.sideloaded, - term, - raftIndex, - *raftCmd.ReplicatedEvalResult.AddSSTable, - r.store.limiters.BulkIOWriteRate, - ) - r.store.metrics.AddSSTableApplications.Inc(1) - if copied { - r.store.metrics.AddSSTableApplicationCopies.Inc(1) - } - raftCmd.ReplicatedEvalResult.AddSSTable = nil - } - - if raftCmd.ReplicatedEvalResult.Split != nil { - // Splits require a new HardState to be written to the new RHS - // range (and this needs to be atomic with the main batch). This - // cannot be constructed at evaluation time because it differs - // on each replica (votes may have already been cast on the - // uninitialized replica). Transform the write batch to add the - // updated HardState. - // See https://github.com/cockroachdb/cockroach/issues/20629 - // - // This is not the most efficient, but it only happens on splits, - // which are relatively infrequent and don't write much data. - tmpBatch := r.store.engine.NewBatch() - if err := tmpBatch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - log.Fatal(ctx, err) - } - splitPreApply(ctx, tmpBatch, raftCmd.ReplicatedEvalResult.Split.SplitTrigger) - writeBatch.Data = tmpBatch.Repr() - tmpBatch.Close() - } - - if merge := raftCmd.ReplicatedEvalResult.Merge; merge != nil { - // Merges require the subsumed range to be atomically deleted when the - // merge transaction commits. - // - // This is not the most efficient, but it only happens on merges, - // which are relatively infrequent and don't write much data. - tmpBatch := r.store.engine.NewBatch() - if err := tmpBatch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - log.Fatal(ctx, err) - } - rhsRepl, err := r.store.GetReplica(merge.RightDesc.RangeID) - if err != nil { - log.Fatal(ctx, err) - } - const destroyData = false - err = rhsRepl.preDestroyRaftMuLocked(ctx, tmpBatch, tmpBatch, merge.RightDesc.NextReplicaID, destroyData) - if err != nil { - log.Fatal(ctx, err) - } - writeBatch.Data = tmpBatch.Repr() - tmpBatch.Close() - } - - { - var err error - raftCmd.ReplicatedEvalResult, err = r.applyRaftCommand( - ctx, idKey, raftCmd.ReplicatedEvalResult, raftIndex, leaseIndex, writeBatch) - - // applyRaftCommand returned an error, which usually indicates - // either a serious logic bug in CockroachDB or a disk - // corruption/out-of-space issue. Make sure that these fail with - // descriptive message so that we can differentiate the root causes. - if err != nil { - log.Errorf(ctx, "unable to update the state machine: %+v", err) - // Report the fatal error separately and only with the error, as that - // triggers an optimization for which we directly report the error to - // sentry (which in turn allows sentry to distinguish different error - // types). - log.Fatal(ctx, err) - } - } - - if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; pErr == nil && filter != nil { - var newPropRetry int - newPropRetry, pErr = filter(storagebase.ApplyFilterArgs{ - CmdID: idKey, - ReplicatedEvalResult: raftCmd.ReplicatedEvalResult, - StoreID: r.store.StoreID(), - RangeID: r.RangeID, - }) - if proposalRetry == 0 { - proposalRetry = proposalReevaluationReason(newPropRetry) - } - - } - - // calling maybeSetCorrupt here is mostly for tests and looks. The - // interesting errors originate in applyRaftCommand, and they are - // already handled above. - pErr = r.maybeSetCorrupt(ctx, pErr) - if pErr == nil { - pErr = forcedErr - } - - var lResult *result.LocalResult - if proposedLocally { - if proposalRetry != proposalNoReevaluation && pErr == nil { - log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal) - } - if pErr != nil { - // A forced error was set (i.e. we did not apply the proposal, - // for instance due to its log position) or the Replica is now - // corrupted. - // If proposalRetry is set, we don't also return an error, as per the - // proposalResult contract. - if proposalRetry == proposalNoReevaluation { - response.Err = pErr - } - } else if proposal.Local.Reply != nil { - response.Reply = proposal.Local.Reply - } else { - log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", proposal) - } - response.Intents = proposal.Local.DetachIntents() - response.EndTxns = proposal.Local.DetachEndTxns(pErr != nil) - if pErr == nil { - lResult = proposal.Local - } - } - if pErr != nil && lResult != nil { - log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) - } - if log.ExpensiveLogEnabled(ctx, 2) { - log.VEvent(ctx, 2, lResult.String()) - } - - // Handle the Result, executing any side effects of the last - // state machine transition. - // - // Note that this must happen after committing (the engine.Batch), but - // before notifying a potentially waiting client. - r.handleEvalResultRaftMuLocked(ctx, lResult, - raftCmd.ReplicatedEvalResult, raftIndex, leaseIndex) - - // Provide the command's corresponding logical operations to the - // Replica's rangefeed. Only do so if the WriteBatch is non-nil, - // otherwise it's valid for the logical op log to be nil, which - // would shut down all rangefeeds. If no rangefeed is running, - // this call will be a no-op. - if raftCmd.WriteBatch != nil { - r.handleLogicalOpLogRaftMuLocked(ctx, raftCmd.LogicalOpLog) - } else if raftCmd.LogicalOpLog != nil { - log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", raftCmd) - } - } - - // When set to true, recomputes the stats for the LHS and RHS of splits and - // makes sure that they agree with the state's range stats. - const expensiveSplitAssertion = false - - if expensiveSplitAssertion && raftCmd.ReplicatedEvalResult.Split != nil { - split := raftCmd.ReplicatedEvalResult.Split - lhsStatsMS := r.GetMVCCStats() - lhsComputedMS, err := rditer.ComputeStatsForRange(&split.LeftDesc, r.store.Engine(), lhsStatsMS.LastUpdateNanos) - if err != nil { - log.Fatal(ctx, err) - } - - rightReplica, err := r.store.GetReplica(split.RightDesc.RangeID) - if err != nil { - log.Fatal(ctx, err) - } - - rhsStatsMS := rightReplica.GetMVCCStats() - rhsComputedMS, err := rditer.ComputeStatsForRange(&split.RightDesc, r.store.Engine(), rhsStatsMS.LastUpdateNanos) - if err != nil { - log.Fatal(ctx, err) - } - - if diff := pretty.Diff(lhsStatsMS, lhsComputedMS); len(diff) > 0 { - log.Fatalf(ctx, "LHS split stats divergence: diff(claimed, computed) = %s", pretty.Diff(lhsStatsMS, lhsComputedMS)) - } - if diff := pretty.Diff(rhsStatsMS, rhsComputedMS); len(diff) > 0 { - log.Fatalf(ctx, "RHS split stats divergence diff(claimed, computed) = %s", pretty.Diff(rhsStatsMS, rhsComputedMS)) - } - } - - if proposedLocally { - // If we failed to apply at the right lease index, try again with - // a new one. This is important for pipelined writes, since they - // don't have a client watching to retry, so a failure to - // eventually apply the proposal would be a user-visible error. - // TODO(nvanbenschoten): This reproposal is not tracked by the - // quota pool. We should fix that. - if proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) { - return false - } - // Otherwise, signal the command's status to the client. - proposal.finishApplication(response) - } else if response.Err != nil { - log.VEventf(ctx, 1, "applying raft command resulted in error: %s", response.Err) - } - - return raftCmd.ReplicatedEvalResult.ChangeReplicas != nil -} - // tryReproposeWithNewLeaseIndex is used by processRaftCommand to // repropose commands that have gotten an illegal lease index error, // and that we know could not have applied while their lease index was @@ -2073,7 +1490,7 @@ func (r *Replica) maybeAcquireSnapshotMergeLock( // applying the command to perform any necessary cleanup. func (r *Replica) maybeAcquireSplitMergeLock( ctx context.Context, raftCmd storagepb.RaftCommand, -) (func(storagepb.ReplicatedEvalResult), error) { +) (func(*storagepb.ReplicatedEvalResult), error) { if split := raftCmd.ReplicatedEvalResult.Split; split != nil { return r.acquireSplitLock(ctx, &split.SplitTrigger) } else if merge := raftCmd.ReplicatedEvalResult.Merge; merge != nil { @@ -2084,7 +1501,7 @@ func (r *Replica) maybeAcquireSplitMergeLock( func (r *Replica) acquireSplitLock( ctx context.Context, split *roachpb.SplitTrigger, -) (func(storagepb.ReplicatedEvalResult), error) { +) (func(*storagepb.ReplicatedEvalResult), error) { rightRng, created, err := r.store.getOrCreateReplica(ctx, split.RightDesc.RangeID, 0, nil) if err != nil { return nil, err @@ -2101,7 +1518,7 @@ func (r *Replica) acquireSplitLock( // commands that have reproposals interacting with retries (i.e. we don't // treat splits differently). - return func(rResult storagepb.ReplicatedEvalResult) { + return func(rResult *storagepb.ReplicatedEvalResult) { if rResult.Split == nil && created && !rightRng.IsInitialized() { // An error occurred during processing of the split and the RHS is still // uninitialized. Mark the RHS destroyed and remove it from the replica's @@ -2127,7 +1544,7 @@ func (r *Replica) acquireSplitLock( func (r *Replica) acquireMergeLock( ctx context.Context, merge *roachpb.MergeTrigger, -) (func(storagepb.ReplicatedEvalResult), error) { +) (func(*storagepb.ReplicatedEvalResult), error) { // The merge lock is the right-hand replica's raftMu. The right-hand replica // is required to exist on this store. Otherwise, an incoming snapshot could // create the right-hand replica before the merge trigger has a chance to @@ -2145,199 +1562,11 @@ func (r *Replica) acquireMergeLock( log.Fatalf(ctx, "RHS of merge %s <- %s not present on store; found %s in place of the RHS", merge.LeftDesc, merge.RightDesc, rightDesc) } - return func(storagepb.ReplicatedEvalResult) { + return func(*storagepb.ReplicatedEvalResult) { rightRepl.raftMu.Unlock() }, nil } -// applyRaftCommand applies a raft command from the replicated log to the -// underlying state machine (i.e. the engine). When the state machine can not be -// updated, an error (which is likely fatal!) is returned and must be handled by -// the caller. -// The returned ReplicatedEvalResult replaces the caller's. -func (r *Replica) applyRaftCommand( - ctx context.Context, - idKey storagebase.CmdIDKey, - rResult storagepb.ReplicatedEvalResult, - raftAppliedIndex, leaseAppliedIndex uint64, - writeBatch *storagepb.WriteBatch, -) (storagepb.ReplicatedEvalResult, error) { - if writeBatch != nil && len(writeBatch.Data) > 0 { - // Record the write activity, passing a 0 nodeID because replica.writeStats - // intentionally doesn't track the origin of the writes. - mutationCount, err := engine.RocksDBBatchCount(writeBatch.Data) - if err != nil { - log.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err) - } else { - r.writeStats.recordCount(float64(mutationCount), 0 /* nodeID */) - } - } - - r.mu.Lock() - usingAppliedStateKey := r.mu.state.UsingAppliedStateKey - oldRaftAppliedIndex := r.mu.state.RaftAppliedIndex - oldLeaseAppliedIndex := r.mu.state.LeaseAppliedIndex - oldTruncatedState := r.mu.state.TruncatedState - - // Exploit the fact that a split will result in a full stats - // recomputation to reset the ContainsEstimates flag. - // - // TODO(tschottdorf): We want to let the usual MVCCStats-delta - // machinery update our stats for the left-hand side. But there is no - // way to pass up an MVCCStats object that will clear out the - // ContainsEstimates flag. We should introduce one, but the migration - // makes this worth a separate effort (ContainsEstimates would need to - // have three possible values, 'UNCHANGED', 'NO', and 'YES'). - // Until then, we're left with this rather crude hack. - if rResult.Split != nil { - r.mu.state.Stats.ContainsEstimates = false - } - ms := *r.mu.state.Stats - r.mu.Unlock() - - if raftAppliedIndex != oldRaftAppliedIndex+1 { - // If we have an out of order index, there's corruption. No sense in - // trying to update anything or running the command. Simply return - // a corruption error. - return storagepb.ReplicatedEvalResult{}, errors.Errorf("applied index jumped from %d to %d", - oldRaftAppliedIndex, raftAppliedIndex) - } - - haveTruncatedState := rResult.State != nil && rResult.State.TruncatedState != nil - var batch engine.Batch - if !haveTruncatedState { - batch = r.store.Engine().NewWriteOnlyBatch() - } else { - // When we update the truncated state, we may need to read the batch - // and can't use a WriteOnlyBatch. This is fine since log truncations - // are tiny batches. - batch = r.store.Engine().NewBatch() - } - defer batch.Close() - - if writeBatch != nil { - if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to apply WriteBatch") - } - } - - // The only remaining use of the batch is for range-local keys which we know - // have not been previously written within this batch. - writer := batch.Distinct() - - // Special-cased MVCC stats handling to exploit commutativity of stats delta - // upgrades. Thanks to commutativity, the spanlatch manager does not have to - // serialize on the stats key. - deltaStats := rResult.Delta.ToStats() - - if !usingAppliedStateKey && rResult.State != nil && rResult.State.UsingAppliedStateKey { - // The Raft command wants us to begin using the RangeAppliedState key - // and we haven't performed the migration yet. Delete the old keys - // that this new key is replacing. - err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats) - if err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to migrate to range applied state") - } - usingAppliedStateKey = true - } - - if usingAppliedStateKey { - // Note that calling ms.Add will never result in ms.LastUpdateNanos - // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos - // across all deltaStats). - ms.Add(deltaStats) - - // Set the range applied state, which includes the last applied raft and - // lease index along with the mvcc stats, all in one key. - if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer, - raftAppliedIndex, leaseAppliedIndex, &ms); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set range applied state") - } - } else { - // Advance the last applied index. We use a blind write in order to avoid - // reading the previous applied index keys on every write operation. This - // requires a little additional work in order maintain the MVCC stats. - var appliedIndexNewMS enginepb.MVCCStats - if err := r.raftMu.stateLoader.SetLegacyAppliedIndexBlind(ctx, writer, &appliedIndexNewMS, - raftAppliedIndex, leaseAppliedIndex); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set applied index") - } - deltaStats.SysBytes += appliedIndexNewMS.SysBytes - - r.raftMu.stateLoader.CalcAppliedIndexSysBytes(oldRaftAppliedIndex, oldLeaseAppliedIndex) - - // Note that calling ms.Add will never result in ms.LastUpdateNanos - // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos - // across all deltaStats). - ms.Add(deltaStats) - if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, &ms); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to update MVCCStats") - } - } - - if haveTruncatedState { - apply, err := handleTruncatedStateBelowRaft(ctx, oldTruncatedState, rResult.State.TruncatedState, r.raftMu.stateLoader, writer) - if err != nil { - return storagepb.ReplicatedEvalResult{}, err - } - if !apply { - // The truncated state was discarded, so make sure we don't apply - // it to our in-memory state. - rResult.State.TruncatedState = nil - rResult.RaftLogDelta = 0 - // We received a truncation that doesn't apply to us, so we know that - // there's a leaseholder out there with a log that has earlier entries - // than ours. That leader also guided our log size computations by - // giving us RaftLogDeltas for past truncations, and this was likely - // off. Mark our Raft log size is not trustworthy so that, assuming - // we step up as leader at some point in the future, we recompute - // our numbers. - r.mu.Lock() - r.mu.raftLogSizeTrusted = false - r.mu.Unlock() - } - } - - // TODO(peter): We did not close the writer in an earlier version of - // the code, which went undetected even though we used the batch after - // (though only to commit it). We should add an assertion to prevent that in - // the future. - writer.Close() - - start := timeutil.Now() - - var assertHS *raftpb.HardState - if util.RaceEnabled && rResult.Split != nil { - rsl := stateloader.Make(rResult.Split.RightDesc.RangeID) - oldHS, err := rsl.LoadHardState(ctx, r.store.Engine()) - if err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState") - } - assertHS = &oldHS - } - if err := batch.Commit(false); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "could not commit batch") - } - - if assertHS != nil { - // Load the HardState that was just committed (if any). - rsl := stateloader.Make(rResult.Split.RightDesc.RangeID) - newHS, err := rsl.LoadHardState(ctx, r.store.Engine()) - if err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState") - } - // Assert that nothing moved "backwards". - if newHS.Term < assertHS.Term || (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) { - log.Fatalf(ctx, "clobbered HardState: %s\n\npreviously: %s\noverwritten with: %s", - pretty.Diff(newHS, *assertHS), pretty.Sprint(*assertHS), pretty.Sprint(newHS)) - } - } - - elapsed := timeutil.Since(start) - r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) - rResult.Delta = deltaStats.ToStatsDelta() - return rResult, nil -} - // handleTruncatedStateBelowRaft is called when a Raft command updates the truncated // state. This isn't 100% trivial for two reasons: // - in 19.1 we're making the TruncatedState key unreplicated, so there's a migration @@ -2358,7 +1587,7 @@ func handleTruncatedStateBelowRaft( ctx context.Context, oldTruncatedState, newTruncatedState *roachpb.RaftTruncatedState, loader stateloader.StateLoader, - distinctEng engine.ReadWriter, + batch engine.ReadWriter, ) (_apply bool, _ error) { // If this is a log truncation, load the resulting unreplicated or legacy // replicated truncated state (in that order). If the migration is happening @@ -2368,7 +1597,7 @@ func handleTruncatedStateBelowRaft( // Either way, we'll update it below. // // See VersionUnreplicatedRaftTruncatedState for details. - truncStatePostApply, truncStateIsLegacy, err := loader.LoadRaftTruncatedState(ctx, distinctEng) + truncStatePostApply, truncStateIsLegacy, err := loader.LoadRaftTruncatedState(ctx, batch) if err != nil { return false, errors.Wrap(err, "loading truncated state") } @@ -2390,7 +1619,7 @@ func handleTruncatedStateBelowRaft( // NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to // avoid allocating when constructing Raft log keys (16 bytes). unsafeKey := prefixBuf.RaftLogKey(idx) - if err := distinctEng.Clear(engine.MakeMVCCMetadataKey(unsafeKey)); err != nil { + if err := batch.Clear(engine.MakeMVCCMetadataKey(unsafeKey)); err != nil { return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState) } } @@ -2413,7 +1642,7 @@ func handleTruncatedStateBelowRaft( _ = cluster.VersionUnreplicatedRaftTruncatedState if err := engine.MVCCPutProto( - ctx, distinctEng, nil /* ms */, prefixBuf.RaftTruncatedStateKey(), + ctx, batch, nil /* ms */, prefixBuf.RaftTruncatedStateKey(), hlc.Timestamp{}, nil /* txn */, newTruncatedState, ); err != nil { return false, errors.Wrap(err, "unable to migrate RaftTruncatedState") diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 97e421f405c2..f59e7e956f10 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -900,8 +900,9 @@ func TestLeaseReplicaNotInDesc(t *testing.T) { }, } tc.repl.mu.Lock() - _, _, pErr := tc.repl.checkForcedErrLocked( + _, _, pErr := checkForcedErr( context.Background(), makeIDKey(), raftCmd, nil /* proposal */, false, /* proposedLocally */ + &tc.repl.mu.state, ) tc.repl.mu.Unlock() if _, isErr := pErr.GetDetail().(*roachpb.LeaseRejectedError); !isErr { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index c9b0af947974..bfea43aff073 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -3610,8 +3610,8 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { // processing time means we'll have starved local replicas of ticks and // remote replicas will likely start campaigning. if elapsed >= defaultReplicaRaftMuWarnThreshold { - log.Warningf(ctx, "handle raft ready: %.1fs [processed=%d]", - elapsed.Seconds(), stats.processed) + log.Warningf(ctx, "handle raft ready: %.1fs [applied=%d, batches=%d, state_assertions=%d]", + elapsed.Seconds(), stats.entriesProcessed, stats.batchesProcessed, stats.stateAssertions) } if !r.IsInitialized() { // Only an uninitialized replica can have a placeholder since, by diff --git a/pkg/storage/track_raft_protos.go b/pkg/storage/track_raft_protos.go index 313ab5cac8d8..23c1028bd867 100644 --- a/pkg/storage/track_raft_protos.go +++ b/pkg/storage/track_raft_protos.go @@ -34,7 +34,7 @@ func funcName(f interface{}) string { // instrumentation and returns the list of downstream-of-raft protos. func TrackRaftProtos() func() []reflect.Type { // Grab the name of the function that roots all raft operations. - processRaftFunc := funcName((*Replica).processRaftCommand) + stageRaftFunc := funcName((*Replica).stageRaftCommand) // We only need to track protos that could cause replica divergence // by being written to disk downstream of raft. whitelist := []string{ @@ -104,7 +104,7 @@ func TrackRaftProtos() func() []reflect.Type { break } - if strings.Contains(f.Function, processRaftFunc) { + if strings.Contains(f.Function, stageRaftFunc) { belowRaftProtos.Lock() belowRaftProtos.inner[t] = struct{}{} belowRaftProtos.Unlock()